Demo Streaming CDC end-to-end¶
Este ejemplo demuestra el flujo completo:
Postgres operational
↓ (Debezium CDC + logical replication)
Kafka topic `ops.operational.orders`
↓ (Flink consumer)
Iceberg bronze table
↓ (dbt incremental merge)
Iceberg silver
↓ (dbt + contract enforcement)
Iceberg gold
↓
Trino / BI
Pasos¶
1. Asegurar stack levantado¶
2. Registrar Debezium connector¶
Verifica en Kafka UI (http://localhost:8082) que los topics se crearon:
- ops.operational.customers
- ops.operational.products
- ops.operational.orders
3. Generar carga en Postgres¶
docker compose -f infrastructure/docker-compose.yml exec postgres \
psql -U admin -d orders_db -c "
INSERT INTO operational.orders (customer_id, product_id, quantity, amount, status)
SELECT
(random() * 4 + 1)::int,
(random() * 3 + 1)::int,
(random() * 10 + 1)::int,
round((random() * 500 + 10)::numeric, 2),
(ARRAY['pending','completed','shipped'])[ceil(random()*3)]
FROM generate_series(1, 200);
"
4. Levantar Flink job (Kafka → Iceberg)¶
docker compose -f infrastructure/docker-compose.yml exec flink-jobmanager \
./bin/sql-client.sh -f /workspace/pipelines/streaming/flink_kafka_to_iceberg.sql
5. Verificar Bronze¶
docker compose -f infrastructure/docker-compose.yml exec trino \
trino --catalog iceberg --execute "
SELECT COUNT(*) FROM bronze_ops.orders_raw;
SELECT * FROM bronze_ops.orders_raw ORDER BY _ingested_at DESC LIMIT 5;
"
6. Correr dbt para promover a Silver y Gold¶
7. Consultar Gold¶
docker compose -f infrastructure/docker-compose.yml exec trino \
trino --catalog iceberg --execute "
SELECT
order_date,
segment,
country,
SUM(amount) AS revenue,
COUNT(*) AS orders
FROM gold_sales.fct_orders f
JOIN gold_sales.dim_customer c ON f.customer_sk = c.customer_sk
WHERE c.is_current = TRUE
GROUP BY 1, 2, 3
ORDER BY 1 DESC
LIMIT 20;
"
Troubleshooting¶
- Connector falla con "publication does not exist": revisar que
postgres-init/01_init.sqlse haya ejecutado. - Topics no aparecen: verificar
docker logs kafka-connect. - Flink job falla con S3 errors: verificar acceso MinIO desde flink container.
- dbt no encuentra tablas: confirmar profiles.yml apunta a
http://trino:8080desde container olocalhost:8081desde host.