Guía 03 — Streaming y Real-Time
Arquitectura objetivo
Fuentes ──► Kafka (con Schema Registry) ──► Flink ──► Iceberg (streaming)
│
├──► Online store (Redis)
└──► Alertas / APIs
Kafka — configuración recomendada
Topics
- Naming:
<domain>.<entity>.<event>.v<n> (ej. sales.order.created.v1).
- Particiones: dimensionar por throughput esperado x2 headroom. Reglas:
- 1 partition ≈ 10 MB/s consumer.
- Mínimo 6 particiones para topics importantes.
- Reconsiderar trimestralmente.
- Retention: 7 días default; topics CDC mantener 14-30 días para reprocesamiento.
- Cleanup policy:
delete para events, compact para state.
Schema Registry obligatorio
- Avro (preferido) o Protobuf.
- Compatibility mode:
BACKWARD por defecto.
- JSON solo en bordes externos (webhooks, public APIs).
Producer
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_serializer = AvroSerializer(schema_registry, schema_str=ORDER_SCHEMA)
producer = Producer({
"bootstrap.servers": "kafka:9092",
"compression.type": "zstd",
"acks": "all",
"linger.ms": 50,
"enable.idempotence": True,
})
producer.produce(
topic="sales.order.created.v1",
key=str(order["customer_id"]),
value=avro_serializer(order, ctx),
)
producer.flush()
Flink — patrones clave
Iceberg sink (committing cada 60s)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(60_000) # commits Iceberg = checkpoints
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql("""
CREATE TABLE iceberg_sink (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(18,2),
created_at TIMESTAMP_LTZ(3)
) WITH (
'connector'='iceberg',
'catalog-name'='polaris',
'catalog-type'='rest',
'uri'='http://polaris:8181',
'warehouse'='s3://lakehouse/'
)
""")
CDC join con Iceberg lookup
-- Flink SQL
INSERT INTO enriched_orders
SELECT
o.order_id,
o.customer_id,
c.segment, -- lookup desde Iceberg
o.amount,
o.created_at
FROM kafka_orders o
LEFT JOIN iceberg_customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;
Windowing (agregaciones)
-- Tumbling window 5 min
INSERT INTO sales_5min
SELECT
TUMBLE_START(created_at, INTERVAL '5' MINUTE) AS window_start,
region,
SUM(amount) AS revenue
FROM kafka_orders
GROUP BY TUMBLE(created_at, INTERVAL '5' MINUTE), region;
CDC con Debezium
Postgres connector config
{
"name": "postgres-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${env:DEBEZIUM_PASSWORD}",
"database.dbname": "orders_db",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.orders,public.customers",
"topic.prefix": "ops",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Tópicos generados
ops.public.orders ← CDC events para orders
ops.public.customers ← CDC events para customers
- Formato envelope Debezium:
{op, before, after, source, ts_ms}.
Spark Structured Streaming (micro-batch)
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_avro, expr
spark = SparkSession.builder.appName("orders-bronze").getOrCreate()
raw = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "sales.order.created.v1")
.option("startingOffsets", "latest")
.load()
)
parsed = raw.select(
from_avro("value", schema_str=ORDER_SCHEMA, options={"mode": "PERMISSIVE"}).alias("o")
).select("o.*").withColumn("_ingested_at", expr("current_timestamp()"))
(parsed.writeStream
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", "s3://checkpoints/orders_bronze/")
.trigger(processingTime="5 minutes")
.toTable("bronze_sales.orders")
)
Streaming Iceberg — best practices
- Commit cadence: 30-60s (más frecuente = más metadata overhead).
- Compaction: job hourly de
rewrite_data_files o continuous via Flink.
- Equality deletes vs position deletes: Iceberg v3 con deletion vectors es la opción default.
- Watermarks: definir lateness tolerance; eventos fuera de ventana → DLQ.
- Exactly-once: habilitar checkpoints + idempotent producers.
Monitoreo
- Lag: consumer lag por topic/partition.
- Throughput: events/sec, bytes/sec.
- Latencia end-to-end: medir desde event time hasta visible en Iceberg.
- Checkpoints: duration y success rate.
- State size: crecimiento RocksDB.
Anti-patrones streaming
- ❌ JSON sin schema. → Avro/Proto + registry.
- ❌ 1 topic per consumer. → 1 topic por evento, múltiples consumers.
- ❌ Streaming "porque sí" sin SLA. → Sólo si SLA <5min.
- ❌ Sin DLQ. → Topic
*.dlq por cada pipeline.
- ❌ Particiones fijas y no monitoreadas. → Revisión trimestral.