Saltar a contenido

Demo Streaming CDC end-to-end

Este ejemplo demuestra el flujo completo:

Postgres operational
    ↓ (Debezium CDC + logical replication)
Kafka topic `ops.operational.orders`
    ↓ (Flink consumer)
Iceberg bronze table
    ↓ (dbt incremental merge)
Iceberg silver
    ↓ (dbt + contract enforcement)
Iceberg gold
Trino / BI

Pasos

1. Asegurar stack levantado

./scripts/setup.sh

2. Registrar Debezium connector

./pipelines/ingestion/register-debezium-connector.sh

Verifica en Kafka UI (http://localhost:8082) que los topics se crearon: - ops.operational.customers - ops.operational.products - ops.operational.orders

3. Generar carga en Postgres

docker compose -f infrastructure/docker-compose.yml exec postgres \
  psql -U admin -d orders_db -c "
    INSERT INTO operational.orders (customer_id, product_id, quantity, amount, status)
    SELECT
      (random() * 4 + 1)::int,
      (random() * 3 + 1)::int,
      (random() * 10 + 1)::int,
      round((random() * 500 + 10)::numeric, 2),
      (ARRAY['pending','completed','shipped'])[ceil(random()*3)]
    FROM generate_series(1, 200);
  "
docker compose -f infrastructure/docker-compose.yml exec flink-jobmanager \
  ./bin/sql-client.sh -f /workspace/pipelines/streaming/flink_kafka_to_iceberg.sql

5. Verificar Bronze

docker compose -f infrastructure/docker-compose.yml exec trino \
  trino --catalog iceberg --execute "
    SELECT COUNT(*) FROM bronze_ops.orders_raw;
    SELECT * FROM bronze_ops.orders_raw ORDER BY _ingested_at DESC LIMIT 5;
  "

6. Correr dbt para promover a Silver y Gold

make dbt
make dbt-test

7. Consultar Gold

docker compose -f infrastructure/docker-compose.yml exec trino \
  trino --catalog iceberg --execute "
    SELECT
      order_date,
      segment,
      country,
      SUM(amount) AS revenue,
      COUNT(*) AS orders
    FROM gold_sales.fct_orders f
    JOIN gold_sales.dim_customer c ON f.customer_sk = c.customer_sk
    WHERE c.is_current = TRUE
    GROUP BY 1, 2, 3
    ORDER BY 1 DESC
    LIMIT 20;
  "

Troubleshooting

  • Connector falla con "publication does not exist": revisar que postgres-init/01_init.sql se haya ejecutado.
  • Topics no aparecen: verificar docker logs kafka-connect.
  • Flink job falla con S3 errors: verificar acceso MinIO desde flink container.
  • dbt no encuentra tablas: confirmar profiles.yml apunta a http://trino:8080 desde container o localhost:8081 desde host.