Guía 05 — ML, IA y RAG Enterprise¶
Stack ML/AI 2026¶
Datos Gold (Iceberg) ──┬──► Feature Store (Feast/Tecton)
├──► Embedding Pipeline ──► Vector DB
└──► Training Data ──► MLflow
MLflow setup¶
Tracking + Registry¶
import mlflow
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("churn_prediction_v3")
with mlflow.start_run(run_name="xgboost_baseline"):
mlflow.log_param("max_depth", 6)
mlflow.log_metric("auc", 0.892)
mlflow.log_metric("precision_at_10", 0.71)
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="churn_prediction",
input_example=X_train.head(5),
signature=mlflow.models.infer_signature(X_train, y_pred)
)
Promoción a producción¶
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="churn_prediction",
version=12,
stage="Production",
archive_existing_versions=True
)
Feature Store con Feast¶
Repo structure¶
feature_store/
├── feature_store.yaml
├── features/
│ ├── customer_features.py
│ └── order_features.py
└── data_sources.py
Definición¶
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta
customer = Entity(name="customer_id", join_keys=["customer_id"])
customer_source = FileSource(
path="s3://lakehouse/gold/customer_features/",
timestamp_field="event_ts",
file_format="parquet",
)
customer_fv = FeatureView(
name="customer_aggregates",
entities=[customer],
ttl=timedelta(days=7),
schema=[
Field(name="total_orders_30d", dtype=Int64),
Field(name="avg_order_value", dtype=Float32),
Field(name="segment", dtype=String),
],
source=customer_source,
online=True,
)
Servir online¶
from feast import FeatureStore
store = FeatureStore(repo_path="feature_store/")
features = store.get_online_features(
features=[
"customer_aggregates:total_orders_30d",
"customer_aggregates:avg_order_value",
],
entity_rows=[{"customer_id": 12345}],
).to_dict()
Vector Database¶
Selección¶
| Volumen | Latencia | Recomendación |
|---|---|---|
| <50M | <100ms | pgvector + pgvectorscale |
| 50M-1B | <50ms | Qdrant, Weaviate, Milvus |
| >1B / multi-tenant | <30ms | Pinecone, Zilliz Cloud |
pgvector + pgvectorscale¶
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS vectorscale;
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT NOT NULL,
chunk_index INT NOT NULL,
content TEXT NOT NULL,
metadata JSONB,
embedding VECTOR(1536), -- text-embedding-3-small
created_at TIMESTAMP DEFAULT now()
);
-- Índice HNSW + DiskANN (pgvectorscale)
CREATE INDEX ON document_chunks
USING diskann (embedding vector_cosine_ops);
-- Búsqueda híbrida (vector + text)
SELECT
id, content, metadata,
embedding <=> $1::vector AS distance,
ts_rank(to_tsvector('spanish', content), plainto_tsquery('spanish', $2)) AS bm25_score
FROM document_chunks
WHERE metadata->>'tenant_id' = $3
ORDER BY (embedding <=> $1::vector) + (1 - ts_rank(to_tsvector('spanish', content), plainto_tsquery('spanish', $2))) ASC
LIMIT 20;
RAG Hybrid Pipeline¶
Indexación¶
from openai import OpenAI
import psycopg
from psycopg.rows import dict_row
openai = OpenAI()
def embed(text: str) -> list[float]:
resp = openai.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return resp.data[0].embedding
def index_document(doc_id: int, chunks: list[dict]):
with psycopg.connect("postgresql://...") as conn:
with conn.cursor() as cur:
for chunk in chunks:
emb = embed(chunk["content"])
cur.execute("""
INSERT INTO document_chunks
(document_id, chunk_index, content, metadata, embedding)
VALUES (%s, %s, %s, %s, %s)
""", (doc_id, chunk["index"], chunk["content"], chunk["meta"], emb))
conn.commit()
Retrieval con reranker¶
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("BAAI/bge-reranker-v2-m3")
def retrieve(query: str, top_k: int = 5, fetch_k: int = 30) -> list[dict]:
query_emb = embed(query)
with psycopg.connect("postgresql://...", row_factory=dict_row) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, content, metadata,
embedding <=> %s::vector AS dist,
ts_rank(to_tsvector('spanish', content),
plainto_tsquery('spanish', %s)) AS bm25
FROM document_chunks
ORDER BY (embedding <=> %s::vector) ASC
LIMIT %s
""", (query_emb, query, query_emb, fetch_k))
candidates = cur.fetchall()
pairs = [(query, c["content"]) for c in candidates]
scores = reranker.predict(pairs)
ranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
return [c for c, _ in ranked[:top_k]]
Generation con citas¶
SYSTEM_PROMPT = """Eres un asistente experto. Responde SOLO con información de las fuentes proporcionadas.
Cita cada afirmación con [N] referenciando la fuente. Si no hay información suficiente, di "No tengo información sobre eso en las fuentes provistas."
"""
def answer(query: str) -> dict:
chunks = retrieve(query, top_k=5)
sources = "\n\n".join([
f"[{i+1}] {c['content']}\n(fuente: {c['metadata']['source']})"
for i, c in enumerate(chunks)
])
resp = openai.chat.completions.create(
model="gpt-4.1",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Fuentes:\n{sources}\n\nPregunta: {query}"},
],
temperature=0.1,
)
return {
"answer": resp.choices[0].message.content,
"sources": [{"id": c["id"], "meta": c["metadata"]} for c in chunks],
}
Evaluación con RAGAS¶
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision
from datasets import Dataset
dataset = Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts_list,
"ground_truth": ground_truths,
})
result = evaluate(
dataset,
metrics=[faithfulness, answer_relevancy, context_precision],
)
print(result)
# {'faithfulness': 0.89, 'answer_relevancy': 0.92, 'context_precision': 0.78}
GraphRAG (uso avanzado)¶
Para dominios complejos (regulatorio, médico, financiero) donde RAG plano no captura relaciones.
import neo4j
driver = neo4j.GraphDatabase.driver("bolt://neo4j:7687", auth=("neo4j", "password"))
def graph_retrieve(query: str) -> list[dict]:
# 1. Extraer entidades de la query con LLM
entities = llm_extract_entities(query)
# 2. Cypher hybrid: vector neighborhood + grafo expansion
with driver.session() as session:
result = session.run("""
CALL db.index.vector.queryNodes('chunk_embeddings', 10, $emb) YIELD node, score
MATCH (node)-[:MENTIONS]->(e:Entity)
WHERE e.name IN $entity_names
WITH node, e, score
MATCH (e)-[r*1..2]-(related)
RETURN node.content as content, collect(DISTINCT related.name) as related_entities, score
ORDER BY score DESC LIMIT 5
""", emb=embed(query), entity_names=entities)
return [dict(record) for record in result]
Agentic patterns¶
Agente Data Quality con MCP¶
from anthropic import Anthropic
client = Anthropic()
TOOLS = [
{
"name": "query_warehouse",
"description": "Execute SQL against the Trino warehouse",
"input_schema": {
"type": "object",
"properties": {"sql": {"type": "string"}},
"required": ["sql"]
}
},
{
"name": "open_jira",
"description": "Open a Jira ticket for a data quality incident",
"input_schema": {
"type": "object",
"properties": {
"summary": {"type": "string"},
"owner": {"type": "string"},
"severity": {"enum": ["P0","P1","P2","P3"]}
}
}
},
]
def agent_diagnose(anomaly: dict):
messages = [{
"role": "user",
"content": f"Anomaly detected in {anomaly['table']}: {anomaly['description']}. Diagnose root cause."
}]
while True:
resp = client.messages.create(
model="claude-opus-4-7",
max_tokens=4000,
tools=TOOLS,
messages=messages
)
if resp.stop_reason == "end_turn":
return resp.content[-1].text
# process tool calls...
Métricas ML/AI¶
- Model performance: AUC, F1, MAPE según task.
- Drift: PSI, KL divergence sobre features y predicciones.
- Latency: P95 inference <100ms (real-time), <1s (batch).
- Cost per prediction: monitorear LLM API spend per query.
- RAG faithfulness: >85% golden set.
- Hallucination rate: <5%.
- Tiempo idea → producción: <6 semanas (gate Fase 4).
Anti-patrones IA¶
- ❌ RAG sin evaluación continua → hallucinaciones invisibles.
- ❌ Embeddings without versionado → cambio modelo, re-indexar todo.
- ❌ Sin guardrails → prompt injection, PII leak.
- ❌ LLM en hot path sin fallback → outage.
- ❌ Fine-tune sin baseline → desperdicio compute.
- ❌ Agentes sin human-in-loop en acciones destructivas.