Saltar a contenido

Guía 03 — Streaming y Real-Time

Arquitectura objetivo

Fuentes ──► Kafka (con Schema Registry) ──► Flink ──► Iceberg (streaming)
                                              ├──► Online store (Redis)
                                              └──► Alertas / APIs

Kafka — configuración recomendada

Topics

  • Naming: <domain>.<entity>.<event>.v<n> (ej. sales.order.created.v1).
  • Particiones: dimensionar por throughput esperado x2 headroom. Reglas:
  • 1 partition ≈ 10 MB/s consumer.
  • Mínimo 6 particiones para topics importantes.
  • Reconsiderar trimestralmente.
  • Retention: 7 días default; topics CDC mantener 14-30 días para reprocesamiento.
  • Cleanup policy: delete para events, compact para state.

Schema Registry obligatorio

  • Avro (preferido) o Protobuf.
  • Compatibility mode: BACKWARD por defecto.
  • JSON solo en bordes externos (webhooks, public APIs).

Producer

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_serializer = AvroSerializer(schema_registry, schema_str=ORDER_SCHEMA)

producer = Producer({
    "bootstrap.servers": "kafka:9092",
    "compression.type": "zstd",
    "acks": "all",
    "linger.ms": 50,
    "enable.idempotence": True,
})

producer.produce(
    topic="sales.order.created.v1",
    key=str(order["customer_id"]),
    value=avro_serializer(order, ctx),
)
producer.flush()

Iceberg sink (committing cada 60s)

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(60_000)  # commits Iceberg = checkpoints
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)

t_env = StreamTableEnvironment.create(env)
t_env.execute_sql("""
CREATE TABLE iceberg_sink (
  order_id BIGINT,
  customer_id BIGINT,
  amount DECIMAL(18,2),
  created_at TIMESTAMP_LTZ(3)
) WITH (
  'connector'='iceberg',
  'catalog-name'='polaris',
  'catalog-type'='rest',
  'uri'='http://polaris:8181',
  'warehouse'='s3://lakehouse/'
)
""")

CDC join con Iceberg lookup

-- Flink SQL
INSERT INTO enriched_orders
SELECT
  o.order_id,
  o.customer_id,
  c.segment,           -- lookup desde Iceberg
  o.amount,
  o.created_at
FROM kafka_orders o
LEFT JOIN iceberg_customer_dim FOR SYSTEM_TIME AS OF o.proc_time AS c
  ON o.customer_id = c.customer_id;

Windowing (agregaciones)

-- Tumbling window 5 min
INSERT INTO sales_5min
SELECT
  TUMBLE_START(created_at, INTERVAL '5' MINUTE) AS window_start,
  region,
  SUM(amount) AS revenue
FROM kafka_orders
GROUP BY TUMBLE(created_at, INTERVAL '5' MINUTE), region;

CDC con Debezium

Postgres connector config

{
  "name": "postgres-orders-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${env:DEBEZIUM_PASSWORD}",
    "database.dbname": "orders_db",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.orders,public.customers",
    "topic.prefix": "ops",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "string",
    "time.precision.mode": "connect",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Tópicos generados

  • ops.public.orders ← CDC events para orders
  • ops.public.customers ← CDC events para customers
  • Formato envelope Debezium: {op, before, after, source, ts_ms}.

Spark Structured Streaming (micro-batch)

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_avro, expr

spark = SparkSession.builder.appName("orders-bronze").getOrCreate()

raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "sales.order.created.v1")
    .option("startingOffsets", "latest")
    .load()
)

parsed = raw.select(
    from_avro("value", schema_str=ORDER_SCHEMA, options={"mode": "PERMISSIVE"}).alias("o")
).select("o.*").withColumn("_ingested_at", expr("current_timestamp()"))

(parsed.writeStream
    .format("iceberg")
    .outputMode("append")
    .option("checkpointLocation", "s3://checkpoints/orders_bronze/")
    .trigger(processingTime="5 minutes")
    .toTable("bronze_sales.orders")
)

Streaming Iceberg — best practices

  1. Commit cadence: 30-60s (más frecuente = más metadata overhead).
  2. Compaction: job hourly de rewrite_data_files o continuous via Flink.
  3. Equality deletes vs position deletes: Iceberg v3 con deletion vectors es la opción default.
  4. Watermarks: definir lateness tolerance; eventos fuera de ventana → DLQ.
  5. Exactly-once: habilitar checkpoints + idempotent producers.

Monitoreo

  • Lag: consumer lag por topic/partition.
  • Throughput: events/sec, bytes/sec.
  • Latencia end-to-end: medir desde event time hasta visible en Iceberg.
  • Checkpoints: duration y success rate.
  • State size: crecimiento RocksDB.

Anti-patrones streaming

  1. ❌ JSON sin schema. → Avro/Proto + registry.
  2. ❌ 1 topic per consumer. → 1 topic por evento, múltiples consumers.
  3. ❌ Streaming "porque sí" sin SLA. → Sólo si SLA <5min.
  4. ❌ Sin DLQ. → Topic *.dlq por cada pipeline.
  5. ❌ Particiones fijas y no monitoreadas. → Revisión trimestral.