Saltar a contenido

Guía 05 — ML, IA y RAG Enterprise

Stack ML/AI 2026

   Datos Gold (Iceberg) ──┬──► Feature Store (Feast/Tecton)
                          ├──► Embedding Pipeline ──► Vector DB
                          └──► Training Data ──► MLflow

MLflow setup

Tracking + Registry

import mlflow

mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("churn_prediction_v3")

with mlflow.start_run(run_name="xgboost_baseline"):
    mlflow.log_param("max_depth", 6)
    mlflow.log_metric("auc", 0.892)
    mlflow.log_metric("precision_at_10", 0.71)
    mlflow.sklearn.log_model(
        model,
        "model",
        registered_model_name="churn_prediction",
        input_example=X_train.head(5),
        signature=mlflow.models.infer_signature(X_train, y_pred)
    )

Promoción a producción

client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="churn_prediction",
    version=12,
    stage="Production",
    archive_existing_versions=True
)

Feature Store con Feast

Repo structure

feature_store/
├── feature_store.yaml
├── features/
│   ├── customer_features.py
│   └── order_features.py
└── data_sources.py

Definición

from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta

customer = Entity(name="customer_id", join_keys=["customer_id"])

customer_source = FileSource(
    path="s3://lakehouse/gold/customer_features/",
    timestamp_field="event_ts",
    file_format="parquet",
)

customer_fv = FeatureView(
    name="customer_aggregates",
    entities=[customer],
    ttl=timedelta(days=7),
    schema=[
        Field(name="total_orders_30d", dtype=Int64),
        Field(name="avg_order_value", dtype=Float32),
        Field(name="segment", dtype=String),
    ],
    source=customer_source,
    online=True,
)

Servir online

from feast import FeatureStore

store = FeatureStore(repo_path="feature_store/")
features = store.get_online_features(
    features=[
        "customer_aggregates:total_orders_30d",
        "customer_aggregates:avg_order_value",
    ],
    entity_rows=[{"customer_id": 12345}],
).to_dict()

Vector Database

Selección

Volumen Latencia Recomendación
<50M <100ms pgvector + pgvectorscale
50M-1B <50ms Qdrant, Weaviate, Milvus
>1B / multi-tenant <30ms Pinecone, Zilliz Cloud

pgvector + pgvectorscale

CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS vectorscale;

CREATE TABLE document_chunks (
  id BIGSERIAL PRIMARY KEY,
  document_id BIGINT NOT NULL,
  chunk_index INT NOT NULL,
  content TEXT NOT NULL,
  metadata JSONB,
  embedding VECTOR(1536),  -- text-embedding-3-small
  created_at TIMESTAMP DEFAULT now()
);

-- Índice HNSW + DiskANN (pgvectorscale)
CREATE INDEX ON document_chunks
USING diskann (embedding vector_cosine_ops);

-- Búsqueda híbrida (vector + text)
SELECT
  id, content, metadata,
  embedding <=> $1::vector AS distance,
  ts_rank(to_tsvector('spanish', content), plainto_tsquery('spanish', $2)) AS bm25_score
FROM document_chunks
WHERE metadata->>'tenant_id' = $3
ORDER BY (embedding <=> $1::vector) + (1 - ts_rank(to_tsvector('spanish', content), plainto_tsquery('spanish', $2))) ASC
LIMIT 20;

RAG Hybrid Pipeline

Indexación

from openai import OpenAI
import psycopg
from psycopg.rows import dict_row

openai = OpenAI()

def embed(text: str) -> list[float]:
    resp = openai.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    return resp.data[0].embedding

def index_document(doc_id: int, chunks: list[dict]):
    with psycopg.connect("postgresql://...") as conn:
        with conn.cursor() as cur:
            for chunk in chunks:
                emb = embed(chunk["content"])
                cur.execute("""
                    INSERT INTO document_chunks
                    (document_id, chunk_index, content, metadata, embedding)
                    VALUES (%s, %s, %s, %s, %s)
                """, (doc_id, chunk["index"], chunk["content"], chunk["meta"], emb))
        conn.commit()

Retrieval con reranker

from sentence_transformers import CrossEncoder

reranker = CrossEncoder("BAAI/bge-reranker-v2-m3")

def retrieve(query: str, top_k: int = 5, fetch_k: int = 30) -> list[dict]:
    query_emb = embed(query)
    with psycopg.connect("postgresql://...", row_factory=dict_row) as conn:
        with conn.cursor() as cur:
            cur.execute("""
                SELECT id, content, metadata,
                       embedding <=> %s::vector AS dist,
                       ts_rank(to_tsvector('spanish', content),
                               plainto_tsquery('spanish', %s)) AS bm25
                FROM document_chunks
                ORDER BY (embedding <=> %s::vector) ASC
                LIMIT %s
            """, (query_emb, query, query_emb, fetch_k))
            candidates = cur.fetchall()

    pairs = [(query, c["content"]) for c in candidates]
    scores = reranker.predict(pairs)
    ranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
    return [c for c, _ in ranked[:top_k]]

Generation con citas

SYSTEM_PROMPT = """Eres un asistente experto. Responde SOLO con información de las fuentes proporcionadas.
Cita cada afirmación con [N] referenciando la fuente. Si no hay información suficiente, di "No tengo información sobre eso en las fuentes provistas."
"""

def answer(query: str) -> dict:
    chunks = retrieve(query, top_k=5)
    sources = "\n\n".join([
        f"[{i+1}] {c['content']}\n(fuente: {c['metadata']['source']})"
        for i, c in enumerate(chunks)
    ])
    resp = openai.chat.completions.create(
        model="gpt-4.1",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Fuentes:\n{sources}\n\nPregunta: {query}"},
        ],
        temperature=0.1,
    )
    return {
        "answer": resp.choices[0].message.content,
        "sources": [{"id": c["id"], "meta": c["metadata"]} for c in chunks],
    }

Evaluación con RAGAS

from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision
from datasets import Dataset

dataset = Dataset.from_dict({
    "question": questions,
    "answer": answers,
    "contexts": contexts_list,
    "ground_truth": ground_truths,
})

result = evaluate(
    dataset,
    metrics=[faithfulness, answer_relevancy, context_precision],
)
print(result)
# {'faithfulness': 0.89, 'answer_relevancy': 0.92, 'context_precision': 0.78}

GraphRAG (uso avanzado)

Para dominios complejos (regulatorio, médico, financiero) donde RAG plano no captura relaciones.

import neo4j

driver = neo4j.GraphDatabase.driver("bolt://neo4j:7687", auth=("neo4j", "password"))

def graph_retrieve(query: str) -> list[dict]:
    # 1. Extraer entidades de la query con LLM
    entities = llm_extract_entities(query)

    # 2. Cypher hybrid: vector neighborhood + grafo expansion
    with driver.session() as session:
        result = session.run("""
            CALL db.index.vector.queryNodes('chunk_embeddings', 10, $emb) YIELD node, score
            MATCH (node)-[:MENTIONS]->(e:Entity)
            WHERE e.name IN $entity_names
            WITH node, e, score
            MATCH (e)-[r*1..2]-(related)
            RETURN node.content as content, collect(DISTINCT related.name) as related_entities, score
            ORDER BY score DESC LIMIT 5
        """, emb=embed(query), entity_names=entities)
        return [dict(record) for record in result]

Agentic patterns

Agente Data Quality con MCP

from anthropic import Anthropic

client = Anthropic()

TOOLS = [
    {
        "name": "query_warehouse",
        "description": "Execute SQL against the Trino warehouse",
        "input_schema": {
            "type": "object",
            "properties": {"sql": {"type": "string"}},
            "required": ["sql"]
        }
    },
    {
        "name": "open_jira",
        "description": "Open a Jira ticket for a data quality incident",
        "input_schema": {
            "type": "object",
            "properties": {
                "summary": {"type": "string"},
                "owner": {"type": "string"},
                "severity": {"enum": ["P0","P1","P2","P3"]}
            }
        }
    },
]

def agent_diagnose(anomaly: dict):
    messages = [{
        "role": "user",
        "content": f"Anomaly detected in {anomaly['table']}: {anomaly['description']}. Diagnose root cause."
    }]
    while True:
        resp = client.messages.create(
            model="claude-opus-4-7",
            max_tokens=4000,
            tools=TOOLS,
            messages=messages
        )
        if resp.stop_reason == "end_turn":
            return resp.content[-1].text
        # process tool calls...

Métricas ML/AI

  • Model performance: AUC, F1, MAPE según task.
  • Drift: PSI, KL divergence sobre features y predicciones.
  • Latency: P95 inference <100ms (real-time), <1s (batch).
  • Cost per prediction: monitorear LLM API spend per query.
  • RAG faithfulness: >85% golden set.
  • Hallucination rate: <5%.
  • Tiempo idea → producción: <6 semanas (gate Fase 4).

Anti-patrones IA

  1. ❌ RAG sin evaluación continua → hallucinaciones invisibles.
  2. ❌ Embeddings without versionado → cambio modelo, re-indexar todo.
  3. ❌ Sin guardrails → prompt injection, PII leak.
  4. ❌ LLM en hot path sin fallback → outage.
  5. ❌ Fine-tune sin baseline → desperdicio compute.
  6. ❌ Agentes sin human-in-loop en acciones destructivas.