Skip to content

Orchestration API Reference

orchestration.analyzer

Task Analyzer — detección de tipo de tarea con caché LRU.

Classes

TaskAnalyzer

Source code in orchestration/analyzer.py
class TaskAnalyzer:
    @staticmethod
    async def analyze_task(query: str, is_follow_up: bool = False) -> dict[str, Any]:
        """Precise detection with LRU cache."""
        cache_key = f"followup:{query}" if is_follow_up else query
        cached = _task_cache.get(cache_key)
        if cached is not None:
            logger.debug("TaskAnalyzer → resultado desde caché")
            return cached

        follow_context = ""
        if is_follow_up:
            follow_context = (
                "\n⚠️ CONTEXTO: Esta es una conversación DE CONTINUACIÓN sobre un proyecto existente. "
                "Los archivos ya fueron creados en turnos anteriores. "
                "El usuario quiere MODIFICAR, EXTENDER o CORREGIR código ya existente. "
                "La complejidad debe ser menor que si fuera un proyecto nuevo.\n"
            )

        prompt = f"""Responde **ÚNICAMENTE** con un JSON válido. Sin texto extra.

TAREA: "{query}"
{follow_context}
{{
    "primary_type": "simple_conversation|creativo|analista|ejecutor|planificador|investigador|mixed",
    "complexity": "simple|medium|complex",
    "is_direct_code_execution": true/false,
    "requires_full_orchestration": true/false
}}

Reglas estrictas:
- Solo marca "simple_conversation" y "requires_full_orchestration": false si es saludo, pregunta directa sobre el usuario o conversación muy casual.
- Si la tarea pide generar, listar, elegir, proponer, analizar o múltiples pasos → "requires_full_orchestration": true
"""

        try:
            response = await models.call(
                messages=[{"role": "user", "content": prompt}],
                role="fast",
                temperature=0.0,
            )

            raw = clean_llm_response(response)
            data = parse_json_from_llm(raw)
            data = data if isinstance(data, dict) else {}

            data.setdefault("primary_type", "mixed")
            data.setdefault("complexity", "medium")
            data.setdefault("is_direct_code_execution", False)
            data.setdefault("estimated_steps", 2)
            data.setdefault("requires_synthesis", True)

            # Save to cache
            _task_cache.set(cache_key, data)

            logger.info(
                f"✅ TaskAnalyzer completado → {data.get('primary_type')} | "
                f"orchestration={data.get('requires_full_orchestration')} | "
                f"code_execution={data.get('is_direct_code_execution')}"
            )
            return data

        except Exception as e:
            logger.error(f"Error en TaskAnalyzer: {e}", exc_info=True)
            return {
                "primary_type": "mixed",
                "requires_full_orchestration": True,
                "is_direct_code_execution": False,
                "estimated_steps": 3,
            }
Functions
analyze_task async staticmethod
analyze_task(
    query: str, is_follow_up: bool = False
) -> dict[str, Any]

Precise detection with LRU cache.

Source code in orchestration/analyzer.py
    @staticmethod
    async def analyze_task(query: str, is_follow_up: bool = False) -> dict[str, Any]:
        """Precise detection with LRU cache."""
        cache_key = f"followup:{query}" if is_follow_up else query
        cached = _task_cache.get(cache_key)
        if cached is not None:
            logger.debug("TaskAnalyzer → resultado desde caché")
            return cached

        follow_context = ""
        if is_follow_up:
            follow_context = (
                "\n⚠️ CONTEXTO: Esta es una conversación DE CONTINUACIÓN sobre un proyecto existente. "
                "Los archivos ya fueron creados en turnos anteriores. "
                "El usuario quiere MODIFICAR, EXTENDER o CORREGIR código ya existente. "
                "La complejidad debe ser menor que si fuera un proyecto nuevo.\n"
            )

        prompt = f"""Responde **ÚNICAMENTE** con un JSON válido. Sin texto extra.

TAREA: "{query}"
{follow_context}
{{
    "primary_type": "simple_conversation|creativo|analista|ejecutor|planificador|investigador|mixed",
    "complexity": "simple|medium|complex",
    "is_direct_code_execution": true/false,
    "requires_full_orchestration": true/false
}}

Reglas estrictas:
- Solo marca "simple_conversation" y "requires_full_orchestration": false si es saludo, pregunta directa sobre el usuario o conversación muy casual.
- Si la tarea pide generar, listar, elegir, proponer, analizar o múltiples pasos → "requires_full_orchestration": true
"""

        try:
            response = await models.call(
                messages=[{"role": "user", "content": prompt}],
                role="fast",
                temperature=0.0,
            )

            raw = clean_llm_response(response)
            data = parse_json_from_llm(raw)
            data = data if isinstance(data, dict) else {}

            data.setdefault("primary_type", "mixed")
            data.setdefault("complexity", "medium")
            data.setdefault("is_direct_code_execution", False)
            data.setdefault("estimated_steps", 2)
            data.setdefault("requires_synthesis", True)

            # Save to cache
            _task_cache.set(cache_key, data)

            logger.info(
                f"✅ TaskAnalyzer completado → {data.get('primary_type')} | "
                f"orchestration={data.get('requires_full_orchestration')} | "
                f"code_execution={data.get('is_direct_code_execution')}"
            )
            return data

        except Exception as e:
            logger.error(f"Error en TaskAnalyzer: {e}", exc_info=True)
            return {
                "primary_type": "mixed",
                "requires_full_orchestration": True,
                "is_direct_code_execution": False,
                "estimated_steps": 3,
            }

Functions

orchestration.decomposer

Task Decomposer - Versión FINAL robusta y refinada (Fase 6)

Functions

decompose_task async

decompose_task(
    query: str,
    is_follow_up: bool = False,
    conversation_history: list[dict] | None = None,
    project_root: str | None = None,
) -> list[str]

Descompone la tarea en 2-5 subtareas claras y accionables

Source code in orchestration/decomposer.py
async def decompose_task(
    query: str,
    is_follow_up: bool = False,
    conversation_history: list[dict] | None = None,
    project_root: str | None = None,
) -> list[str]:
    """Descompone la tarea en 2-5 subtareas claras y accionables"""
    from llm.prompts import DECOMPOSE_TASK_PROMPT

    project_context = _build_project_context(project_root)
    prompt = DECOMPOSE_TASK_PROMPT.format(query=query, project_context=project_context)

    if is_follow_up:
        history_context = ""
        if conversation_history:
            last_msgs = conversation_history[-6:]
            history_context = "\n".join(
                f"[{m['role']}]: {str(m.get('content', ''))[:200]}"
                for m in last_msgs
                if m.get("role") in ("user", "assistant")
            )
        prompt = (
            "⚠️ CONTEXTO IMPORTANTE: Esta es una conversación DE CONTINUACIÓN. "
            "El proyecto YA EXISTE en disco con archivos creados previamente. "
            "NO crees subtareas para crear archivos que ya existen. "
            "Enfócate en MODIFICAR, EXTENDER o CORREGIR lo existente. "
            "Subtareas sugeridas: leer archivos existentes, hacer cambios puntuales, "
            "ejecutar tests, verificar.\n\n"
            f"Historial reciente de la conversación:\n{history_context}\n\n" + prompt
        )

    # Rate limiter awareness: request fewer subtasks if rate is low
    try:
        from core.rate_limiter import get_rate_limiter

        rl = get_rate_limiter()
        remaining = await rl.remaining()
        if remaining < 10:
            prompt += "\nIMPORTANTE: Genera máximo 2 subtareas, el rate de API está bajo."
    except Exception:
        pass

    try:
        response = await models.call(
            messages=[{"role": "user", "content": prompt}],
            role="reasoning",
            temperature=0.1,
        )

        raw = clean_llm_response(response)
        logger.debug(f"Respuesta cruda de decompose_task: {raw[:700]}...")

        data = parse_json_from_llm(raw)

        # Extraer subtareas
        subtasks = []
        if data and isinstance(data.get("subtasks"), list):
            subtasks = data["subtasks"]
        else:
            # Fallback regex fuerte
            subtasks = (
                re.findall(r"[-•]\s*(.+?)(?=\n|$)", raw)
                or re.findall(r"\d+\.\s*(.+?)(?=\n|$)", raw)
                or re.findall(r'["\'](.+?)["\']', raw)
            )

        # Limpieza y filtrado final
        final_subtasks = []
        for s in subtasks:
            if isinstance(s, dict):
                clean = str(s.get("description", s.get("task", str(s)))).strip()
            else:
                clean = str(s).strip()
            if len(clean) > 8 and clean.lower() not in {"subtasks", "subtask", ""}:
                final_subtasks.append(clean)

        # Safety: always return at least 2 subtasks for developer tasks
        if len(final_subtasks) < 2:
            logger.warning("Decompose_task generó <2 subtareas → forzando división")
            if len(final_subtasks) == 1:
                final_subtasks.append(
                    f"Verificar y validar que {final_subtasks[0][:80]} funciona correctamente"
                )
            else:
                final_subtasks = [query[:100], f"Verificar el resultado de: {query[:80]}"]

        # Max limit from feature flags
        from core.config import settings

        max_subtasks = settings.max_subtasks
        final_subtasks = final_subtasks[:max_subtasks]

        logger.info(f"✅ Decompose_task generó {len(final_subtasks)} subtareas")
        return final_subtasks

    except Exception as e:
        logger.error(f"Error grave en decompose_task: {e}", exc_info=True)
        # Safe fallback with safety floor (minimum 2 subtasks)
        return [query[:100], f"Verificar el resultado de: {query[:80]}"]

decompose_task_with_phases async

decompose_task_with_phases(
    query: str,
    is_follow_up: bool = False,
    conversation_history: list[dict] | None = None,
    project_root: str | None = None,
) -> dict

Descompone la tarea en fases con subtareas, para blackboard multi-phase.

Returns:

Type Description
dict

{"phases": [...], "strategy": "sequential"}

dict

or fallback: {"phases": [{"subtasks": [...]}]} (single phase)

Source code in orchestration/decomposer.py
async def decompose_task_with_phases(
    query: str,
    is_follow_up: bool = False,
    conversation_history: list[dict] | None = None,
    project_root: str | None = None,
) -> dict:
    """Descompone la tarea en fases con subtareas, para blackboard multi-phase.

    Returns:
        {"phases": [...], "strategy": "sequential"}
        or fallback: {"phases": [{"subtasks": [...]}]} (single phase)
    """
    from llm.prompts import DECOMPOSE_TASK_WITH_PHASES_PROMPT

    project_context = _build_project_context(project_root)
    prompt = DECOMPOSE_TASK_WITH_PHASES_PROMPT.format(query=query, project_context=project_context)

    if is_follow_up:
        history_context = ""
        if conversation_history:
            last_msgs = conversation_history[-6:]
            history_context = "\n".join(
                f"[{m['role']}]: {str(m.get('content', ''))[:200]}"
                for m in last_msgs
                if m.get("role") in ("user", "assistant")
            )
        prompt = (
            "⚠️ CONTEXTO IMPORTANTE: Esta es una conversación DE CONTINUACIÓN. "
            "El proyecto YA EXISTE en disco. Usa máximo 2 fases. "
            "Enfócate en MODIFICAR lo existente.\n\n"
            f"Historial reciente:\n{history_context}\n\n" + prompt
        )

    # Rate limiter awareness
    try:
        from core.rate_limiter import get_rate_limiter

        rl = get_rate_limiter()
        remaining = await rl.remaining()
        if remaining < 5:
            prompt += (
                "\nIMPORTANTE: Genera máximo 2 fases con 2 subtareas cada una (API rate bajo)."
            )
    except Exception:
        pass

    try:
        response = await models.call(
            messages=[{"role": "user", "content": prompt}],
            role="reasoning",
            temperature=0.1,
        )

        raw = clean_llm_response(response)
        data = parse_json_from_llm(raw)

        if data and isinstance(data.get("phases"), list):
            phases = data["phases"]
            # Validate and clean phases
            valid_phases: list[dict] = []
            for p in phases:
                if isinstance(p, dict) and p.get("subtasks"):
                    subtasks = [
                        str(s).strip() if isinstance(s, str) else str(s.get("description", s))
                        for s in p["subtasks"]
                    ]
                    valid_phases.append(
                        {
                            "phase": str(p.get("phase", "default")),
                            "order": int(p.get("order", len(valid_phases) + 1)),
                            "description": str(p.get("description", "")),
                            "subtasks": subtasks,
                        }
                    )
            if valid_phases:
                logger.info(f"✅ decompose_task_with_phases: {len(valid_phases)} fases")
                return {"phases": valid_phases, "strategy": data.get("strategy", "sequential")}

    except Exception as e:
        logger.warning(f"decompose_task_with_phases falló, usando single-phase: {e}")

    # Fallback: single phase with decompose_task
    subtasks = await decompose_task(query, is_follow_up, conversation_history, project_root)
    return {
        "phases": [
            {
                "phase": "default",
                "order": 1,
                "description": "Implementación principal",
                "subtasks": subtasks,
            }
        ],
        "strategy": "sequential",
    }

orchestration.router

Agent Router — selecciona el mejor agente para una tarea.

Classes

Functions

orchestration.supervisor

WorkflowSupervisor - Versión Dinámica basada en perfiles de agentes

Classes

WorkflowSupervisor

Source code in orchestration/supervisor.py
class WorkflowSupervisor:
    @staticmethod
    async def review_and_correct(
        task_analyzer_result: dict,
        router_selections: list[str],
        subtasks: list[str],
        allowed_agents: list[str] | None = None,
    ) -> list[str]:
        """
        Review and correct agent selections based on keyword matching.
        Controlled by AUTO_FIX_LEVEL: 0=report only, 1=flag, 2=auto-correct.
        """
        fix_level = settings.auto_fix_level

        # Level 0: no review at all — return selections as-is
        if fix_level == 0:
            logger.debug("AUTO_FIX_LEVEL=0 — skipping supervisor review")
            return router_selections

        # Get all available agents with their keywords
        all_agents = list(agents_registry.list_agents().keys())

        # Filtrar si hay lista blanca
        if allowed_agents is not None:
            all_agents = [a for a in all_agents if a in allowed_agents]
            if not all_agents:
                logger.warning(
                    "No hay agentes permitidos. Se conservan las selecciones del router."
                )
                return router_selections

        agent_keywords = {}
        for agent_name in all_agents:
            profile = agents_registry.get_profile(agent_name)
            if profile:
                agent_keywords[agent_name] = profile.get("keywords", [])

        corrected = router_selections.copy()

        # Ensure corrected has at least as many entries as subtasks
        while len(corrected) < len(subtasks):
            corrected.append(all_agents[0] if all_agents else settings.fallback_agent)

        for i, task in enumerate(subtasks):
            task_lower = task.lower()

            # Preserve analyst for verification tasks (read-only, should not modify files)
            router_sel = router_selections[i] if i < len(router_selections) else ""
            if router_sel == "analista" and any(
                kw in task_lower
                for kw in (
                    "verificar",
                    "validar",
                    "revisar",
                    "probar",
                    "test",
                    "prueba",
                    "comprobar",
                )
            ):
                continue  # keep router selection — analyst is correct for verification

            best_agent = None
            best_score = -1

            # Evaluate each agent by keyword match
            for agent, keywords in agent_keywords.items():
                if not keywords:
                    continue
                score = sum(1 for kw in keywords if kw in task_lower)
                if score > best_score:
                    best_score = score
                    best_agent = agent

            # If an agent with matches was found, use it
            if best_agent is not None and best_score > 0:
                corrected[i] = best_agent
            else:
                # If no matches, keep the router decision (which is already valid)
                # If the router returned something unregistered, use the first allowed agent
                router_sel = router_selections[i] if i < len(router_selections) else ""
                if router_sel not in all_agents:
                    corrected[i] = all_agents[0] if all_agents else settings.fallback_agent

        logger.info(f"✅ Supervisor finalizó → Agentes corregidos: {corrected}")

        # Level 1: flag issues but return original selections
        if fix_level == 1:
            for i in range(len(corrected)):
                router_sel = router_selections[i] if i < len(router_selections) else ""
                subtask_text = subtasks[i][:50] if i < len(subtasks) else ""
                if i < len(router_selections) and corrected[i] != router_selections[i]:
                    logger.info(
                        f"🔍 Supervisor flag (fix_level=1): subtask {i} "
                        f"'{subtask_text}...' would change "
                        f"from '{router_sel}' to '{corrected[i]}'"
                    )
            return router_selections

        return corrected
Functions
review_and_correct async staticmethod
review_and_correct(
    task_analyzer_result: dict,
    router_selections: list[str],
    subtasks: list[str],
    allowed_agents: list[str] | None = None,
) -> list[str]

Review and correct agent selections based on keyword matching. Controlled by AUTO_FIX_LEVEL: 0=report only, 1=flag, 2=auto-correct.

Source code in orchestration/supervisor.py
@staticmethod
async def review_and_correct(
    task_analyzer_result: dict,
    router_selections: list[str],
    subtasks: list[str],
    allowed_agents: list[str] | None = None,
) -> list[str]:
    """
    Review and correct agent selections based on keyword matching.
    Controlled by AUTO_FIX_LEVEL: 0=report only, 1=flag, 2=auto-correct.
    """
    fix_level = settings.auto_fix_level

    # Level 0: no review at all — return selections as-is
    if fix_level == 0:
        logger.debug("AUTO_FIX_LEVEL=0 — skipping supervisor review")
        return router_selections

    # Get all available agents with their keywords
    all_agents = list(agents_registry.list_agents().keys())

    # Filtrar si hay lista blanca
    if allowed_agents is not None:
        all_agents = [a for a in all_agents if a in allowed_agents]
        if not all_agents:
            logger.warning(
                "No hay agentes permitidos. Se conservan las selecciones del router."
            )
            return router_selections

    agent_keywords = {}
    for agent_name in all_agents:
        profile = agents_registry.get_profile(agent_name)
        if profile:
            agent_keywords[agent_name] = profile.get("keywords", [])

    corrected = router_selections.copy()

    # Ensure corrected has at least as many entries as subtasks
    while len(corrected) < len(subtasks):
        corrected.append(all_agents[0] if all_agents else settings.fallback_agent)

    for i, task in enumerate(subtasks):
        task_lower = task.lower()

        # Preserve analyst for verification tasks (read-only, should not modify files)
        router_sel = router_selections[i] if i < len(router_selections) else ""
        if router_sel == "analista" and any(
            kw in task_lower
            for kw in (
                "verificar",
                "validar",
                "revisar",
                "probar",
                "test",
                "prueba",
                "comprobar",
            )
        ):
            continue  # keep router selection — analyst is correct for verification

        best_agent = None
        best_score = -1

        # Evaluate each agent by keyword match
        for agent, keywords in agent_keywords.items():
            if not keywords:
                continue
            score = sum(1 for kw in keywords if kw in task_lower)
            if score > best_score:
                best_score = score
                best_agent = agent

        # If an agent with matches was found, use it
        if best_agent is not None and best_score > 0:
            corrected[i] = best_agent
        else:
            # If no matches, keep the router decision (which is already valid)
            # If the router returned something unregistered, use the first allowed agent
            router_sel = router_selections[i] if i < len(router_selections) else ""
            if router_sel not in all_agents:
                corrected[i] = all_agents[0] if all_agents else settings.fallback_agent

    logger.info(f"✅ Supervisor finalizó → Agentes corregidos: {corrected}")

    # Level 1: flag issues but return original selections
    if fix_level == 1:
        for i in range(len(corrected)):
            router_sel = router_selections[i] if i < len(router_selections) else ""
            subtask_text = subtasks[i][:50] if i < len(subtasks) else ""
            if i < len(router_selections) and corrected[i] != router_selections[i]:
                logger.info(
                    f"🔍 Supervisor flag (fix_level=1): subtask {i} "
                    f"'{subtask_text}...' would change "
                    f"from '{router_sel}' to '{corrected[i]}'"
                )
        return router_selections

    return corrected

orchestration.aggregator

ResultAggregator - Síntesis FINAL optimizada (con protección contra vacíos)

Classes

ResultAggregator

Handles intelligent result aggregation and synthesis

Source code in orchestration/aggregator.py
class ResultAggregator:
    """Handles intelligent result aggregation and synthesis"""

    @staticmethod
    async def aggregate_results(
        query: str,
        results: dict,
        G: Any,
        task_analysis: dict,
        files_written: list[str] | None = None,
        project_root: str | None = None,
        workspace: str = "main",
    ) -> str:
        if not results:
            return "⚠️ No se generaron resultados."

        # User correction detection (high-specificity words,
        # avoiding common terms like "no" that cause false positives)
        if any(
            kw in query.lower()
            for kw in ["corrige", "equivocado", "error en", "arregla", "fix", "mal hecho"]
        ):
            from core.memory.manager import memory as memory_manager

            await memory_manager.save_user_correction(query, "corrección guardada")

        # Construir resultados de forma clara (1 o N subtareas)
        results_text = ""
        for node, data in sorted(results.items()):
            task_desc = G.nodes[node].get("task", f"Subtarea {node}")
            content = str(data.get("result", data)).strip()
            if content:
                label = (
                    f"--- SUBTAREA {node + 1}: {task_desc} ---"
                    if len(results) > 1
                    else f"--- Resultado: {task_desc} ---"
                )
                results_text += f"\n\n{label}\n{content}\n"

        if not results_text.strip():
            return "⚠️ No se pudo procesar la información de las subtareas."

        files_block = ""
        if files_written:
            files_block = (
                "\nArchivos realmente creados/modificados en el proyecto:\n"
                + "\n".join(f"- {f}" for f in files_written)
                + "\n\n⚠️ REGLAS ESTRICTAS para mencionar archivos:\n"
                "- SOLO menciona archivos que aparezcan en esta lista.\n"
                "- NO inventes nombres de archivo ni estructuras de directorios.\n"
                "- NO digas que el proyecto está 'incompleto' o que 'falta' algún archivo.\n"
                "- NO sugieras crear archivos adicionales.\n"
                "- Describe SOLO lo que existe, sin evaluar ni juzgar.\n"
            )

        # Read actual file contents from disk so the aggregator has the REAL code
        actual_files_text = ""
        if files_written and project_root and workspace:
            try:
                from core.path_resolver import paths as _paths

                for fname in files_written[:10]:
                    resolved = _paths.memory_dir(workspace) / project_root / fname
                    if resolved.exists():
                        content = resolved.read_text(encoding="utf-8")
                        if len(content) > 6000:
                            content = content[:6000] + "\n... [truncado]"
                        actual_files_text += (
                            f"\n--- Contenido REAL de '{fname}' en disco ---\n{content}\n"
                        )
            except Exception:
                pass

        prompt = f"""Combina los resultados de las subtareas en **UNA sola respuesta final** coherente, clara y útil.

Consulta original del usuario:
{query}

Resultados de las subtareas:
{results_text}
{actual_files_text}
{files_block}
Instrucciones OBLIGATORIAS para la respuesta final:
- Usa TODA la información relevante de las subtareas.
- Elimina completamente cualquier repetición.
- Estructura la respuesta de forma natural y fácil de leer (usa párrafos cortos y viñetas cuando ayude).
- Sé directo, profesional y conciso.
- No uses frases introductorias como "Según los resultados", "En resumen", "Aquí tienes la síntesis", etc.
- No agregues encabezados como "Síntesis" o "Respuesta final".
- Si hay información contradictoria o incompleta, menciónalo de forma honesta y útil.
- Termina con un cierre práctico o recomendación cuando corresponda."""

        try:
            if project_root:
                from orchestration.loop import execute_agent_loop

                loop_result = await execute_agent_loop(
                    task=prompt,
                    agent_type="developer",
                    allowed_tools=["file_manager"],
                    project_root=project_root,
                    workspace=workspace,
                )
                final = clean_llm_response(
                    loop_result.get("result", str(loop_result))
                    if isinstance(loop_result, dict)
                    else str(loop_result)
                )
            else:
                response = await models.call(
                    messages=[{"role": "user", "content": prompt}],
                    role="fast",
                    temperature=0.3,
                )
                final = clean_llm_response(response)

            # Strong protection against empty or useless responses
            bad_phrases = [
                "no se incluye información",
                "no hay información",
                "no pude",
                "no tengo suficiente",
                "no se proporcionó",
            ]
            if not final.strip() or any(phrase in final.lower() for phrase in bad_phrases):
                logger.warning("Síntesis LLM vacía o inútil → usando fallback estructurado")
                output = f"**Consulta:** {query}\n\n"
                if files_written:
                    output += "**Archivos creados:**\n"
                    output += "\n".join(f"- {f}" for f in files_written) + "\n\n"
                for node, data in results.items():
                    task_desc = G.nodes[node].get("task", f"Subtarea {node}")
                    output += f"### {task_desc}\n{str(data.get('result', data)).strip()}\n\n"
                return output

            return final

        except Exception as e:
            logger.error(f"Error en síntesis: {e}")
            output = f"**Consulta:** {query}\n\n"
            if files_written:
                output += "**Archivos creados:**\n"
                output += "\n".join(f"- {f}" for f in files_written) + "\n\n"
            for node, data in results.items():
                task_desc = G.nodes[node].get("task", f"Subtarea {node}")
                output += f"### {task_desc}\n{str(data.get('result', data)).strip()}\n\n"
            return output

Functions

orchestration.finalizer

Workflow Finalizer — conversation persistence, export, and structured profile extraction.

Classes

Functions

orchestration.loop

Agent Loop — task execution with native function calling.

Core intelligence
  1. CodebaseIndexer: el agente entiende tu código antes de actuar
  2. ContextManager: comprime el historial para no exceder la ventana
  3. ReAct Pattern: razonamiento → acción → observación → ajuste
  4. Self-Reflection: detecta estancamiento y hace early exit
  5. Memoria FAISS: inyecta contexto de tareas similares pasadas

Classes

AgentLoopConfig dataclass

Injectable configuration for execute_agent_loop.

Sustituye el acceso directo a constants globales y a kairos/settings, facilitando el testing y la inyección de dependencias.

Source code in orchestration/loop.py
@dataclass
class AgentLoopConfig:
    """Injectable configuration for execute_agent_loop.

    Sustituye el acceso directo a constants globales y a kairos/settings,
    facilitando el testing y la inyección de dependencias.
    """

    max_agent_iterations: int = 15
    max_stall_iterations: int = 2
    context_compression_threshold: float = 0.7
    context_compression_enabled: bool = True

    @classmethod
    def from_settings(cls) -> "AgentLoopConfig":
        """Crea una config con los valores por defecto del sistema."""
        return cls(
            max_agent_iterations=getattr(settings, "max_agent_iterations", 8),
            max_stall_iterations=2,
            context_compression_threshold=getattr(settings, "context_compression_threshold", 0.7),
            context_compression_enabled=settings.context_compression,
        )
Functions
from_settings classmethod
from_settings() -> AgentLoopConfig

Crea una config con los valores por defecto del sistema.

Source code in orchestration/loop.py
@classmethod
def from_settings(cls) -> "AgentLoopConfig":
    """Crea una config con los valores por defecto del sistema."""
    return cls(
        max_agent_iterations=getattr(settings, "max_agent_iterations", 8),
        max_stall_iterations=2,
        context_compression_threshold=getattr(settings, "context_compression_threshold", 0.7),
        context_compression_enabled=settings.context_compression,
    )

Functions

execute_agent_loop async

execute_agent_loop(
    task: str,
    agent_type: str | None = None,
    history: list | None = None,
    allowed_tools: list | None = None,
    project_root: str | None = None,
    workspace: str = "main",
    extra_context: str = "",
    on_stream_chunk=None,
    session: Session | None = None,
    events=None,
    config: AgentLoopConfig | None = None,
) -> dict

Ejecuta una tarea usando el Agent Loop con function-calling nativo.

Improvements: - CodebaseIndexer: busca código relevante del proyecto antes de actuar

Parameters:

Name Type Description Default
session Session | None

Session unificada (contexto + eventos). Si se provee, los eventos (bash output, etc.) se emiten automáticamente.

None
config AgentLoopConfig | None

Configuración inyectable (opcional). Si es None, usa valores por defecto del sistema.

None
Source code in orchestration/loop.py
async def execute_agent_loop(
    task: str,
    agent_type: str | None = None,
    history: list | None = None,
    allowed_tools: list | None = None,
    project_root: str | None = None,
    workspace: str = "main",
    extra_context: str = "",
    on_stream_chunk=None,
    session: Session | None = None,
    events=None,
    config: AgentLoopConfig | None = None,
) -> dict:
    """Ejecuta una tarea usando el Agent Loop con function-calling nativo.

    Improvements:
    - CodebaseIndexer: busca código relevante del proyecto antes de actuar

    Args:
        session: Session unificada (contexto + eventos). Si se provee,
                 los eventos (bash output, etc.) se emiten automáticamente.
        config: Configuración inyectable (opcional). Si es None, usa valores
                por defecto del sistema.
    """
    if config is None:
        config = AgentLoopConfig.from_settings()

    events = events if events is not None else (session.events if session else None)

    if events:
        await emit_stats(
            events,
            {
                "status": "Agent Loop started",
                "current_agent": agent_type or "agent",
                "total_tools": len(allowed_tools) if allowed_tools else 0,
            },
        )

    # 2.1 + 2.5 — Construir contexto enriquecido
    enriched_context = await _build_extra_context(task, project_root, workspace, extra_context)

    # Inject tool skills and kits into context
    skills_context = _load_tool_skills(allowed_tools)
    kits_context = _load_tool_kits(allowed_tools)
    if skills_context or kits_context:
        enriched_context = (
            (kits_context + "\n\n" + skills_context if kits_context else skills_context)
            + "\n\n"
            + enriched_context
        )

    tools_defs = build_tool_definitions(allowed_tools)
    tool_instructions_text = build_tool_instructions(allowed_tools, project_root, plan_mode=False)

    messages = list(history) if history else []

    # 2.3 — System prompt with ReAct pattern
    # Inject user profile from FAISS memory
    from core.memory.manager import memory as memory_manager

    profile_context = ""
    user_profile = memory_manager.get_user_profile()
    if user_profile and any(user_profile.values()):
        summary = memory_manager.get_user_summary()
        if summary:
            profile_context = f"\n[PERFIL DEL USUARIO]:\n{summary}\n"

    system_msg = (
        f"Eres un agente de desarrollo de software experto. Trabajas con el patrón ReAct:\n"
        f"1. RAZONA: analiza la tarea y el contexto disponible.\n"
        f"2. ACTÚA: usa las herramientas apropiadas para avanzar.\n"
        f"3. OBSERVA: evalúa el resultado de cada acción.\n"
        f"4. AJUSTA: si el resultado no es el esperado, cambia de estrategia.\n\n"
        f"Workspace: {workspace}\n"
        f"Project root: {project_root or 'N/A'}\n"
        f"{enriched_context}\n"
        f"{profile_context}\n"
        "Reglas importantes:\n"
        "- Los paths en file_manager son relativos al project root. NO antepongas directorios como 'code_projects/'.\n"
        "  Ejemplo: para crear 'api_tareas/main.py', usa path='api_tareas/main.py', NO 'code_projects/api_tareas/main.py'.\n"
        "- NUNCA uses paths absolutos como '/home/user/code_projects/...'. Todos los paths son relativos al project root.\n"
        "- Antes de escribir código, LEELO primero con file_manager(action='read', path='archivo.py').\n"
        "- Después de escribir, VERIFICA que el archivo existe con file_manager(action='read', path='archivo.py').\n"
        "- bash_manager SIEMPRE requiere el parámetro 'command'. Sin él, la herramienta falla.\n"
        "  Ejemplo correcto: command='pytest tests/'. NO llames bash_manager() sin command.\n"
        "- code_exec es un sandbox RESTRINGIDO. SOLO puedes usar: math, random, collections,\n"
        "  datetime, re, json, numpy (como 'np'), matplotlib (como 'plt').\n"
        "  NO uses 'import subprocess', 'import io', 'import os', 'import sys' — están bloqueados.\n"
        "  Para ejecutar scripts o tests, usa bash_manager o test_runner, NO code_exec.\n"
        "  Para VER el resultado de code_exec, usa print(...) o deja el valor como ÚLTIMA expresión\n"
        "  (ej: termina el código con 'np.mean(arr)'). Sin eso, no habrá salida visible.\n"
        "- Si recibes contexto compartido de otros agentes (Shared Context), LEELO primero.\n"
        "  Puede contener resultados previos que eviten trabajo duplicado.\n"
        "- Si una acción falla, NO la repitas. Prueba otra estrategia.\n"
        "- Cuando la tarea esté completa, responde con un RESUMEN de lo hecho.\n"
        "- Si te estancas, explica por qué y sugiere alternativas."
    )

    messages.insert(0, {"role": "system", "content": system_msg})
    messages.append({"role": "user", "content": task})

    actions_taken = 0
    final_result = ""
    files_written: list[str] = []
    consecutive_stalls = 0
    repeat_tracker: dict[str, int] = {}

    for iteration in range(1, config.max_agent_iterations + 1):
        if events and iteration > 1:
            await emit_stats(
                events,
                {
                    "status": f"Agent iteration {iteration}/{config.max_agent_iterations}",
                    "current_agent": agent_type or "agent",
                    "actions_taken": actions_taken,
                    "files_written": len(files_written),
                },
            )

        # 2.2 — Comprimir historial si el contexto crece demasiado
        estimated_tokens = ContextManager.estimate_tokens(messages)
        max_tokens = ContextManager._max_tokens()
        if estimated_tokens > max_tokens * config.context_compression_threshold:
            if config.context_compression_enabled:
                target = int(max_tokens * 0.5)
                logger.info(
                    "Comprimiendo contexto: %d tokens -> objetivo %d",
                    estimated_tokens,
                    target,
                )
                from core.cache_manager import cache_manager

                # Filter orphaned tool messages (missing tool_call_id causes DeepSeek 400)
                messages = [m for m in messages if m.get("role") != "tool" or m.get("tool_call_id")]

                messages = cache_manager.stabilize_messages(messages, max_tokens=target)

        use_native_tools = len(tools_defs) > 0

        if use_native_tools and on_stream_chunk:
            # ── Streaming con function-calling nativo ──
            stream = models.call_stream(
                messages=messages,
                role="agent",
                tools=tools_defs,
                tool_choice="auto",
            )
            streamed_text, streamed_tool_calls, _, reasoning = await _accumulate_stream(
                stream, on_stream_chunk
            )

            if streamed_tool_calls:
                # Build assistant_msg from accumulated stream tool_calls
                assistant_msg: dict = {
                    "role": "assistant",
                    "content": streamed_text or None,
                    "tool_calls": [],
                }
                if reasoning:
                    assistant_msg["reasoning_content"] = reasoning
                for tc in streamed_tool_calls:
                    assistant_msg["tool_calls"].append(
                        {
                            "id": tc["id"],
                            "type": "function",
                            "function": {
                                "name": tc["function"]["name"],
                                "arguments": tc["function"]["arguments"],
                            },
                        }
                    )
                messages.append(assistant_msg)

                # Ejecutar cada tool call
                parsed = []
                for tc in streamed_tool_calls:
                    tool_name = tc["function"]["name"]
                    if not tool_name:
                        continue
                    try:
                        args = json.loads(tc["function"]["arguments"])
                    except (json.JSONDecodeError, TypeError):
                        args = {}
                    parsed.append({"name": tool_name, "id": tc["id"], "arguments": args})

                if not parsed:
                    continue

                result = await _execute_tool_calls_and_check_stall(
                    parsed,
                    messages,
                    files_written,
                    actions_taken,
                    False,
                    consecutive_stalls,
                    iteration,
                    config,
                    project_root,
                    workspace,
                    events,
                    repeat_tracker,
                )
                if isinstance(result, dict):
                    return result
                (actions_taken, _, files_written, consecutive_stalls, early) = result
                if early:
                    return early
                continue

            # No tool calls — final response via streaming
            final_result = streamed_text.strip()
            final_result = clean_llm_response(final_result)
            break

        elif use_native_tools:
            response = await models.call(
                messages=messages,
                role="agent",
                tools=tools_defs,
                tool_choice="auto",
            )
            tool_calls = tool_calls_from_response(response)

            if tool_calls:
                assistant_msg = {
                    "role": "assistant",
                    "content": None,
                    "tool_calls": [],
                }
                choice = response.choices[0] if hasattr(response, "choices") else None
                if choice and hasattr(choice, "message"):
                    reasoning: str | None = getattr(choice.message, "reasoning_content", None)  # type: ignore[no-redef]
                    if reasoning:
                        assistant_msg["reasoning_content"] = reasoning

                for tc in tool_calls:
                    func = tc.function if hasattr(tc, "function") else tc.get("function", {})
                    call_id = getattr(
                        tc, "id", f"call_{iteration}_{len(assistant_msg['tool_calls'])}"
                    )
                    call_name = func.name if hasattr(func, "name") else func.get("name", "")
                    try:
                        call_args = (
                            func.arguments
                            if hasattr(func, "arguments")
                            else json.dumps(func.get("arguments", {}))
                        )
                    except Exception:
                        logger.warning("Error serializando argumentos de tool call", exc_info=True)
                        call_args = "{}"

                    assistant_msg["tool_calls"].append(
                        {
                            "id": call_id,
                            "type": "function",
                            "function": {
                                "name": call_name,
                                "arguments": call_args,
                            },
                        }
                    )
                messages.append(assistant_msg)

                parsed = []
                for i, tc in enumerate(tool_calls):
                    func = tc.function if hasattr(tc, "function") else tc.get("function", {})
                    tool_name = func.name if hasattr(func, "name") else func.get("name", "")
                    if not tool_name:
                        continue
                    call_id = assistant_msg["tool_calls"][i]["id"]
                    try:
                        arguments = (
                            json.loads(func.arguments)
                            if hasattr(func, "arguments")
                            else func.get("arguments", {})
                        )
                        if isinstance(arguments, str):
                            arguments = json.loads(arguments)
                    except (json.JSONDecodeError, TypeError):
                        arguments = {}
                    parsed.append({"name": tool_name, "id": call_id, "arguments": arguments})

                if not parsed:
                    continue

                result = await _execute_tool_calls_and_check_stall(
                    parsed,
                    messages,
                    files_written,
                    actions_taken,
                    False,
                    consecutive_stalls,
                    iteration,
                    config,
                    project_root,
                    workspace,
                    events,
                    repeat_tracker,
                )
                if isinstance(result, dict):
                    return result
                (actions_taken, _, files_written, consecutive_stalls, early) = result
                if early:
                    return early
                continue

            # No tool calls — LLM dio respuesta final
            choice = response.choices[0] if hasattr(response, "choices") else None
            content = ""
            if choice and hasattr(choice, "message"):
                content = choice.message.content or ""
            final_result = str(content) if content else str(response)
            final_result = clean_llm_response(final_result)
            break
    else:
        final_result = (
            f"⚠️ Límite de {config.max_agent_iterations} iteraciones alcanzado.\n"
            f"Acciones ejecutadas: {actions_taken}. Archivos modificados: {len(files_written)}.\n"
            "La tarea podría necesitar descomponerse en partes más pequeñas."
        )

    if events:
        await emit_stats(
            events,
            {
                "status": "completed",
                "current_agent": agent_type or "agent",
                "actions_taken": actions_taken,
                "iterations": iteration,
                "files_written": len(files_written),
            },
        )

    return {
        "status": "completed",
        "result": final_result,
        "actions_taken": actions_taken,
        "iterations": iteration,
        "files_written": files_written,
    }

orchestration.runner

WorkflowRunner — timeout, cancellation, and safe execution wrapper.

Classes

WorkflowTimeoutError

Bases: TimeoutError

Raised when a workflow phase exceeds its time limit.

Source code in orchestration/runner.py
class WorkflowTimeoutError(asyncio.TimeoutError):
    """Raised when a workflow phase exceeds its time limit."""

WorkflowCancelledError

Bases: CancelledError

Raised when a workflow is cancelled by the user.

Source code in orchestration/runner.py
class WorkflowCancelledError(asyncio.CancelledError):
    """Raised when a workflow is cancelled by the user."""

CircuitBreakerOpenError

Bases: Exception

Raised when the circuit breaker is open for a provider.

Source code in orchestration/runner.py
class CircuitBreakerOpenError(Exception):
    """Raised when the circuit breaker is open for a provider."""

WorkflowRunner

Wraps a Session to provide timeout, cancellation, and safe execution.

Usage

runner = WorkflowRunner(session) result = await runner.with_timeout(phase_coro, 30, phase="decompose")

Source code in orchestration/runner.py
class WorkflowRunner:
    """Wraps a Session to provide timeout, cancellation, and safe execution.

    Usage:
        runner = WorkflowRunner(session)
        result = await runner.with_timeout(phase_coro, 30, phase="decompose")
    """

    def __init__(self, session):
        self.session = session
        self._phase_times: dict[str, float] = {}

    @property
    def cancelled(self) -> bool:
        return self.session.is_cancelled

    def check_cancelled(self) -> None:
        """Raise if the workflow has been cancelled."""
        if self.cancelled:
            raise WorkflowCancelledError("Workflow cancelled by user")

    async def with_timeout(
        self,
        coro: Coroutine[Any, Any, T],
        timeout_seconds: float,
        *,
        phase: str = "unknown",
        fallback: str = "",
    ) -> WorkflowResult:
        """Execute a coroutine with a timeout. Returns WorkflowResult."""
        self.check_cancelled()

        phase_start = time.monotonic()
        try:
            result = await asyncio.wait_for(coro, timeout=timeout_seconds)
            elapsed = time.monotonic() - phase_start
            self._phase_times[phase] = elapsed
            logger.debug(f"Phase '{phase}' completed in {elapsed:.1f}s")

            if isinstance(result, WorkflowResult):
                return result
            content = str(result) if result is not None else ""
            return success(content, phase=phase, elapsed=elapsed)

        except TimeoutError:
            logger.warning(f"Phase '{phase}' timed out after {timeout_seconds}s")
            return timeout(partial_content=fallback, timeout_seconds=timeout_seconds)

        except WorkflowCancelledError:
            raise

        except Exception as e:
            elapsed = time.monotonic() - phase_start
            logger.error(f"Phase '{phase}' failed: {e}")
            return failure(str(e), partial_content=fallback, phase=phase, elapsed=elapsed)

    async def safe_call(
        self,
        coro: Coroutine[Any, Any, T],
        *,
        fallback: T | None = None,
        error_tag: str = "",
    ) -> T | None:
        """Execute a coroutine, return fallback on error. Does NOT timeout."""
        try:
            return await coro
        except WorkflowCancelledError:
            raise
        except Exception as e:
            tag = f" [{error_tag}]" if error_tag else ""
            logger.warning(f"safe_call failed{tag}: {e}")
            return fallback

    def elapsed(self) -> float:
        """Total wall-clock time across all phases."""
        return sum(self._phase_times.values())

    def phase_stats(self) -> dict[str, float]:
        return dict(self._phase_times)
Functions
check_cancelled
check_cancelled() -> None

Raise if the workflow has been cancelled.

Source code in orchestration/runner.py
def check_cancelled(self) -> None:
    """Raise if the workflow has been cancelled."""
    if self.cancelled:
        raise WorkflowCancelledError("Workflow cancelled by user")
with_timeout async
with_timeout(
    coro: Coroutine[Any, Any, T],
    timeout_seconds: float,
    *,
    phase: str = "unknown",
    fallback: str = ""
) -> WorkflowResult

Execute a coroutine with a timeout. Returns WorkflowResult.

Source code in orchestration/runner.py
async def with_timeout(
    self,
    coro: Coroutine[Any, Any, T],
    timeout_seconds: float,
    *,
    phase: str = "unknown",
    fallback: str = "",
) -> WorkflowResult:
    """Execute a coroutine with a timeout. Returns WorkflowResult."""
    self.check_cancelled()

    phase_start = time.monotonic()
    try:
        result = await asyncio.wait_for(coro, timeout=timeout_seconds)
        elapsed = time.monotonic() - phase_start
        self._phase_times[phase] = elapsed
        logger.debug(f"Phase '{phase}' completed in {elapsed:.1f}s")

        if isinstance(result, WorkflowResult):
            return result
        content = str(result) if result is not None else ""
        return success(content, phase=phase, elapsed=elapsed)

    except TimeoutError:
        logger.warning(f"Phase '{phase}' timed out after {timeout_seconds}s")
        return timeout(partial_content=fallback, timeout_seconds=timeout_seconds)

    except WorkflowCancelledError:
        raise

    except Exception as e:
        elapsed = time.monotonic() - phase_start
        logger.error(f"Phase '{phase}' failed: {e}")
        return failure(str(e), partial_content=fallback, phase=phase, elapsed=elapsed)
safe_call async
safe_call(
    coro: Coroutine[Any, Any, T],
    *,
    fallback: T | None = None,
    error_tag: str = ""
) -> T | None

Execute a coroutine, return fallback on error. Does NOT timeout.

Source code in orchestration/runner.py
async def safe_call(
    self,
    coro: Coroutine[Any, Any, T],
    *,
    fallback: T | None = None,
    error_tag: str = "",
) -> T | None:
    """Execute a coroutine, return fallback on error. Does NOT timeout."""
    try:
        return await coro
    except WorkflowCancelledError:
        raise
    except Exception as e:
        tag = f" [{error_tag}]" if error_tag else ""
        logger.warning(f"safe_call failed{tag}: {e}")
        return fallback
elapsed
elapsed() -> float

Total wall-clock time across all phases.

Source code in orchestration/runner.py
def elapsed(self) -> float:
    """Total wall-clock time across all phases."""
    return sum(self._phase_times.values())

orchestration.context

Workflow context and events — decoupled from any UI framework.

WorkflowContext y WorkflowEvents viven en orchestration/ para que tanto la CLI como la GUI PySide6 puedan usarlos sin dependencias mutuas.

Classes

WorkflowContext dataclass

Contexto inmutable del workflow — sin objetos UI.

Source code in orchestration/context.py
@dataclass
class WorkflowContext:
    """Contexto inmutable del workflow — sin objetos UI."""

    query: str
    mode: str = "chat"
    conversation_history: list[dict] = field(default_factory=list)
    current_pdf_text: str = ""
    workspace: str = "main"
    project_root: str | None = None
    active_workflow: str = "default"
    force_agent: str | None = None
    allowed_tools: list[str] | None = None
    settings: Any = None
    agents_registry: Any = None
    enc: Any = None
    conversation_id: int | None = None
    is_follow_up: bool = False
    cancelled: bool = False
    last_clarification: str = ""
    blackboard: Any = None

WorkflowEvents dataclass

Callbacks que el orchestrator dispara. La UI los implementa.

Todos son async, opcionales (None = se ignora el evento). Ningún objeto UI aquí — solo callbacks tipados.

Source code in orchestration/context.py
@dataclass
class WorkflowEvents:
    """Callbacks que el orchestrator dispara. La UI los implementa.

    Todos son async, opcionales (None = se ignora el evento).
    Ningún objeto UI aquí — solo callbacks tipados.
    """

    on_system_message: Callable[[str], Awaitable[None]] | None = None
    on_assistant_message: Callable[[str], Awaitable[None]] | None = None
    on_user_message: Callable[[str], Awaitable[None]] | None = None
    on_stream_chunk: Callable[[str], Awaitable[None]] | None = None
    on_diagram_update: Callable[[str, Any], Awaitable[None]] | None = None
    on_stats_update: Callable[[dict], Awaitable[None]] | None = None
    on_ui_refresh: Callable[[], Awaitable[None]] | None = None
    on_approval_required: Callable[[str, dict[str, Any]], Awaitable[bool]] | None = None
    on_agent_message: Callable[[str, str, str], Awaitable[None]] | None = None

Session dataclass

Agrupa contexto y eventos para simplificar firmas de funciones.

Reemplaza el patrón (ctx, events) disperso por toda la cadena de orquestación.

Source code in orchestration/context.py
@dataclass
class Session:
    """Agrupa contexto y eventos para simplificar firmas de funciones.

    Reemplaza el patrón (ctx, events) disperso por toda la cadena de orquestación.
    """

    context: WorkflowContext
    events: WorkflowEvents

    def cancel(self) -> None:
        """Mark the workflow as cancelled."""
        self.context.cancelled = True

    @property
    def is_cancelled(self) -> bool:
        return self.context.cancelled
Functions
cancel
cancel() -> None

Mark the workflow as cancelled.

Source code in orchestration/context.py
def cancel(self) -> None:
    """Mark the workflow as cancelled."""
    self.context.cancelled = True

orchestration.events

Capa de eventos — re-exporta desde orchestration/context.py.

Mantenido por backward compat. El código nuevo debe importar desde orchestration.context.

Classes

orchestration.diagram

Diagram Manager — gestión de estado del workflow en vivo. Uses StatusRenderer (HTML) instead of Mermaid. No external dependencies.

Functions

update_live_diagram async

update_live_diagram(G: Any, events: Any) -> str | None

Actualiza el estado en vivo disparando eventos.

Parameters:

Name Type Description Default
G Any

Grafo NetworkX del workflow (None = sin diagrama).

required
events Any

WorkflowEvents con on_diagram_update y on_ui_refresh.

required

Returns:

Type Description
str | None

El HTML generado, o None si no hay diagrama.

Source code in orchestration/diagram.py
async def update_live_diagram(G: Any, events: Any) -> str | None:
    """Actualiza el estado en vivo disparando eventos.

    Args:
        G: Grafo NetworkX del workflow (None = sin diagrama).
        events: WorkflowEvents con on_diagram_update y on_ui_refresh.

    Returns:
        El HTML generado, o None si no hay diagrama.
    """
    try:
        if G is None:
            logger.debug("Modo conversación simple: diagrama omitido (G=None)")
            return None

        if events is None:
            logger.warning("events es None, no se puede actualizar diagrama")
            return None

        logger.debug(
            "Actualizando diagrama - Nodos: %d | Estados: %s",
            len(G.nodes),
            [G.nodes[n].get("status", "pending") for n in G.nodes],
        )

        html = render_status(G)
        # Persist the snapshot off the event loop to avoid blocking the UI pump.
        await asyncio.to_thread(save_status_snapshot, html)

        if events.on_diagram_update is not None:
            await events.on_diagram_update(html, G)

        if events.on_ui_refresh is not None:
            await events.on_ui_refresh()

        logger.debug("✅ Diagrama actualizado correctamente")
        return html

    except Exception as e:
        logger.error("Error crítico actualizando diagrama: %s", e, exc_info=True)
        return None

orchestration.loader

Functions

list_workflows

list_workflows(
    workspace_name: str | None = None,
) -> list[str]

Return workflow names available in the workspace. Workspace-local workflows take priority; global templates are fallback.

Source code in orchestration/loader.py
def list_workflows(workspace_name: str | None = None) -> list[str]:
    """Return workflow names available in the workspace.
    Workspace-local workflows take priority; global templates are fallback."""
    workflows: set[str] = set()

    # 1. Workspace-local workflows (primary source)
    if workspace_name:
        local_dir = paths.workspace_workflows_dir(workspace_name)
        if local_dir.exists():
            for f in local_dir.glob("*.yaml"):
                workflows.add(f.stem)

    # 2. Fallback: global templates only if workspace has no workflows
    if not workflows:
        if GLOBAL_TEMPLATES_DIR.exists():
            for f in GLOBAL_TEMPLATES_DIR.glob("*.yaml"):
                workflows.add(f.stem)

    return sorted(workflows)

load_workflow_template

load_workflow_template(
    workspace_name: str | None = None,
    workflow_name: str = "development",
) -> dict

Carga la plantilla de workflow indicada. Busca primero en el workspace local y luego en global.

Source code in orchestration/loader.py
def load_workflow_template(
    workspace_name: str | None = None, workflow_name: str = "development"
) -> dict:
    """
    Carga la plantilla de workflow indicada.
    Busca primero en el workspace local y luego en global.
    """
    local_template = None
    if workspace_name:
        local_path = paths.workspace_workflows_dir(workspace_name) / f"{workflow_name}.yaml"
        if local_path.exists():
            try:
                with open(local_path, encoding="utf-8") as f:
                    local_template = yaml.safe_load(f)
                logger.info(
                    f"✅ Plantilla '{workflow_name}' cargada desde workspace '{workspace_name}'"
                )
            except Exception as e:
                logger.error(f"Error cargando plantilla local {local_path}: {e}")

    if local_template is None:
        global_path = GLOBAL_TEMPLATES_DIR / f"{workflow_name}.yaml"
        if global_path.exists():
            try:
                with open(global_path, encoding="utf-8") as f:
                    local_template = yaml.safe_load(f)
                logger.info(f"✅ Plantilla '{workflow_name}' global cargada")
            except Exception as e:
                logger.error(f"Error cargando plantilla global {global_path}: {e}")

    return local_template or {}

orchestration.result_types

Standardized workflow result types — Success, Failure, Timeout.

Classes

WorkflowResult dataclass

Base result from any workflow execution.

Source code in orchestration/result_types.py
@dataclass
class WorkflowResult:
    """Base result from any workflow execution."""

    success: bool
    content: str = ""
    error: str | None = None
    timeout: bool = False
    metadata: dict[str, Any] = field(default_factory=dict)

orchestration.status

Status Renderer — genera HTML progresivo del workflow.

Replaces mermaid_helper.py. No external dependencies, no HTTP, no remote rendering. El HTML se muestra en QTextBrowser (desktop) o como tabla Rich (CLI usa el grafo directamente).

Functions

render

render(G: Any) -> str

Genera HTML con tarjetas de estado para cada subtarea del workflow.

Parameters:

Name Type Description Default
G Any

Grafo NetworkX con nodos que tienen 'task', 'agent', 'status'.

required

Returns:

Type Description
str

String HTML listo para QTextBrowser.setHtml() o consola.

Source code in orchestration/status.py
def render(G: Any) -> str:
    """Genera HTML con tarjetas de estado para cada subtarea del workflow.

    Args:
        G: Grafo NetworkX con nodos que tienen 'task', 'agent', 'status'.

    Returns:
        String HTML listo para QTextBrowser.setHtml() o consola.
    """
    if G is None or G.number_of_nodes() == 0:
        return "<p style='color:#888; text-align:center'>Workflow vacío</p>"

    cards: list[str] = []
    for node in G.nodes():
        data = G.nodes[node]
        task = _clean_text(str(data.get("task", f"Subtarea {node}")), max_len=120)
        agent = str(data.get("agent", "?"))
        status = str(data.get("status", "pending"))

        icon = STATUS_ICONS.get(status, "⚫")
        color = STATUS_COLORS.get(status, "#888")
        status_label = status.upper()

        cards.append(
            CARD_TEMPLATE.format(
                color=color, icon=icon, status_label=status_label, task=task, agent=agent
            )
        )

    return HTML_TEMPLATE.format(cards="".join(cards))

save_status_snapshot

save_status_snapshot(
    html: str, filename: str | None = None
) -> Path

Guarda el HTML del workflow en charts/ como respaldo.

Source code in orchestration/status.py
def save_status_snapshot(html: str, filename: str | None = None) -> Path:
    """Guarda el HTML del workflow en charts/ como respaldo."""
    if filename is None:
        filename = f"workflow_{os.getpid()}_{int(time.time())}.html"
    path = paths.charts_dir() / filename
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(html, encoding="utf-8")
    logger.debug("Workflow status guardado: %s", path)
    return path

orchestration.utils

Workflow Utils - Funciones compartidas de limpieza y scorecard

Functions

clean_generated_code

clean_generated_code(raw_code: str) -> str

Ultra-aggressive cleanup for generated code

Source code in orchestration/utils.py
def clean_generated_code(raw_code: str) -> str:
    """Ultra-aggressive cleanup for generated code"""
    code = raw_code.strip()
    code = re.sub(r"```(?:python)?\s*", "", code)
    code = re.sub(r"```\s*$", "", code)
    code = re.sub(r"^.*?Aquí.*?(?:código|code)[:\s]*", "", code, flags=re.IGNORECASE | re.DOTALL)
    code = re.sub(r"^.*?El código es[:\s]*", "", code, flags=re.IGNORECASE | re.DOTALL)
    code = re.sub(r"^.*?Python simple[:\s]*", "", code, flags=re.IGNORECASE | re.DOTALL)
    code = re.sub(
        r'if\s+__name__\s*==\s*["\']__main__["\']\s*:[\s\S]*?$', "", code, flags=re.IGNORECASE
    )
    return code.strip()

generate_scorecard

generate_scorecard(
    results: dict,
    G: Any,
    final_content: str,
    query: str,
    task_analysis: dict,
    start_time: float,
    enc: Any = None,
) -> dict

Genera scorecard con tokens reales de ToolOrchestrator

Source code in orchestration/utils.py
def generate_scorecard(
    results: dict,
    G: Any,
    final_content: str,
    query: str,
    task_analysis: dict,
    start_time: float,
    enc: Any = None,
) -> dict:
    """Genera scorecard con tokens reales de ToolOrchestrator"""
    duration = round(time.time() - start_time, 2)

    total_tokens = 0
    for r in results.values():
        if isinstance(r, dict) and "result" in r:
            result_data = r["result"]
            if isinstance(result_data, dict) and "tokens_used" in result_data:
                total_tokens += result_data["tokens_used"]
            else:
                total_tokens += len(str(result_data)) // 4

    return {
        "subtasks": len(results),
        "completadas": sum(1 for r in results.values() if r.get("status") == "completed"),
        "recuperadas": 0,
        "fallidas": sum(1 for r in results.values() if r.get("status") == "failed"),
        "tokens": total_tokens,
        "tiempo": f"{duration}s",
        "calidad": "Alta",
        "tipo_tarea": task_analysis.get("primary_type", "executive"),
        "complejidad": task_analysis.get("complexity", "simple"),
    }

orchestration.executor.plan

Plan Executor — genera y ejecuta planes de acciones desde el LLM.

orchestration.executor.subtask

Subtask Executor — main coordinator for individual task execution. Usa Agent Loop con function-calling nativo para ejecutar subtareas.

Decoupled: receives WorkflowEvents instead of framework-specific objects.

Classes

Functions

execute_subtask_safe async

execute_subtask_safe(
    node: int,
    task: str,
    G: nx.DiGraph,
    conversation_history: list,
    current_pdf_text: str,
    ctx: WorkflowContext,
    events: WorkflowEvents,
    forced_agent: str | None = None,
    task_analysis: dict | None = None,
)

Execute a subtask with safety verification and post-checks.

Receives ctx (WorkflowContext) + events (WorkflowEvents) en vez de page, mermaid_image, settings, etc. individuales.

Source code in orchestration/executor/subtask.py
async def execute_subtask_safe(
    node: int,
    task: str,
    G: nx.DiGraph,
    conversation_history: list,
    current_pdf_text: str,
    ctx: WorkflowContext,
    events: WorkflowEvents,
    forced_agent: str | None = None,
    task_analysis: dict | None = None,
):
    """Execute a subtask with safety verification and post-checks.

    Receives ctx (WorkflowContext) + events (WorkflowEvents)
    en vez de page, mermaid_image, settings, etc. individuales.
    """
    try:
        if not await undercover.check_query(task):
            await emit_system(events, "❌ Solicitud bloqueada por razones de seguridad.")
            G.nodes[node]["status"] = "failed"
            return {
                "node": node,
                "task": task,
                "result": "Bloqueada por seguridad",
                "status": "failed",
            }

        await emit_system(events, f"🚀 Ejecutando subtarea {node + 1}: {task[:80]}...")
        G.nodes[node]["status"] = "running"
        await update_live_diagram(G, events)

        best_agent, task = await _resolve_agent_and_task(
            task, conversation_history, forced_agent, ctx.agents_registry
        )

        agent_profile = ctx.agents_registry.get_profile(best_agent)
        agent_tools = agent_profile.get("tools", []) if agent_profile else []

        # Expand tool groups to real tool names BEFORE filtering
        from tools.specs import expand_allowed_tools

        expanded_agent_tools = expand_allowed_tools(agent_tools) or []

        # Filter against workflow allowlist with prefix/component matching
        if ctx.allowed_tools is None:
            allowed_tools = expanded_agent_tools
        else:
            workflow_tools: list[str] = ctx.allowed_tools  # type: ignore[assignment]
            allowed_tools = []
            for tool_name in expanded_agent_tools:
                if tool_matches_allowlist(tool_name, workflow_tools):
                    allowed_tools.append(tool_name)

        is_dev_task = "file_manager" in agent_tools or "git_manager" in agent_tools

        # Safety Net only fires for non-analysis agents.
        # Analysis agents (type: analysis) never fabricate files.
        agent_type = agent_profile.get("type", "development") if agent_profile else "development"
        _safety_net_allowed = agent_type != "analysis"

        extra_context = ""
        if task_analysis:
            primary_type = task_analysis.get("primary_type", "")
            if primary_type:
                extra_context += f"Tipo de tarea: {primary_type}\n"
            requirements = task_analysis.get("requirements", "")
            if requirements:
                extra_context += f"Requisitos: {requirements}\n"

        # Inject blackboard context if available (multi-phase support)
        if ctx.blackboard is not None:
            bb_ctx = await ctx.blackboard.get_agent_context()
            if bb_ctx:
                extra_context += (
                    "\n⚠️ BLACKBOARD — Resultados de subtareas anteriores:\n"
                    + bb_ctx
                    + "\n\nUsa esta información para evitar trabajo duplicado.\n"
                )

        from orchestration.context import Session

        result = await execute_agent_loop(
            task=task,
            agent_type=best_agent,
            history=conversation_history,
            allowed_tools=allowed_tools,
            project_root=ctx.project_root,
            workspace=ctx.workspace,
            extra_context=extra_context,
            on_stream_chunk=events.on_stream_chunk if events else None,
            session=Session(context=ctx, events=events) if events else None,
        )

        if result.get("status") == "clarification_needed":
            return {
                "node": node,
                "status": "clarification_needed",
                "clarification_question": result["clarification_question"],
                "clarification_options": result.get("clarification_options", []),
                "paused_loop_state": result["paused_loop_state"],
            }

        final_answer = result["result"]
        files_written = result.get("files_written", [])

        # ─── 🛟 Safety Net: if the agent didn't create files, try direct ───
        if not files_written and _safety_net_allowed and ctx.project_root:
            logger.info(f"🛟 Safety Net activado para subtarea: {task[:80]}")
            await emit_system(
                events, "🛟 El agente no pudo crear archivos. Intentando creación directa..."
            )
            path, content = await _direct_file_creation(task, ctx.project_root, ctx.workspace)
            if path:
                files_written.append(path)
                final_answer = f"✅ Archivo creado directamente: {path}\n\n{content}"
                await emit_system(events, f"🛟 Archivo creado directamente: {path}")

        # 4.3 — Per-subtask functional verification
        verification_report = None
        if is_dev_task and files_written:
            try:
                from orchestration.executor.verify import _run_functional_verification

                verification_report = await _run_functional_verification(
                    task=task,
                    best_agent=best_agent,
                    allowed_tools=allowed_tools,  # type: ignore[arg-type]
                    project_root=ctx.project_root,
                    intended_files=files_written,
                    add_system_message=lambda msg: emit_system(events, msg),
                    workspace=ctx.workspace,
                )
                if verification_report:
                    await emit_system(events, f"🔍 Verificación: {verification_report[:200]}")  # type: ignore[index]
            except Exception:
                logger.debug("Verificación por subtarea omitida", exc_info=True)

        if is_dev_task:
            post_check = await _post_execution_checks(
                [],  # type: ignore[arg-type]
                [],
                True,
                files_written,
                task,
                ctx.project_root,
                is_dev_task,
                ctx.workspace,
                best_agent,
                allowed_tools,  # type: ignore[arg-type]
                lambda msg: emit_system(events, msg),
            )
            if post_check:
                final_answer += "\n\n" + post_check

        agent_status = result.get("status", "completed")
        is_stalled = (
            "estancado" in final_answer.lower() or agent_status == "stalled"
        ) and not files_written

        if is_stalled:
            await emit_system(
                events,
                f"⚠️ Subtarea {node + 1}: el agente encontró dificultades, pero se continuará con verificación.",
            )
            await emit_assistant(
                events,
                f"**✅ Subtarea {node + 1} completada**\n\nEl agente no pudo completar esta subtarea automáticamente. Se aplicará verificación global para corregirlo.",
            )
        else:
            await emit_assistant(events, f"**✅ Subtarea {node + 1} completada**\n\n{final_answer}")
        G.nodes[node]["status"] = "completed"
        await update_live_diagram(G, events)
        await memory_manager.write(f"workflow_subtask_{node}", final_answer, validated=True)
        return {
            "node": node,
            "task": task,
            "result": final_answer,
            "status": "completed",
            "files_written": files_written,
        }

    except Exception as e:
        logger.error("Error en subtarea %d: %s", node, e, exc_info=True)
        G.nodes[node]["status"] = "failed"
        error_msg = f"❌ Error en subtarea {node}: {str(e)[:200]}"
        if events is not None:
            await emit_assistant(events, error_msg)
        return {"node": node, "task": task, "result": error_msg, "status": "failed"}

orchestration.executor.verify

Verification — post-execution functional verification and file checking.

Functions

orchestration.executor.post

Post-Execution — automatic commit, file verification, and test retry.

Functions

orchestration.workflows.orchestrator

Workflow Orchestrator — dispatches tasks through 4 execution routes.

Receives a Session (WorkflowContext + WorkflowEvents) and delegates routing to _dispatch_route. All UI interaction flows through the events bridge, keeping the orchestrator framework-agnostic.

Classes

WorkflowOrchestrator

Source code in orchestration/workflows/orchestrator.py
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
class WorkflowOrchestrator:

    @staticmethod
    async def _run_direct_tool(
        direct_tool: dict,
        query: str,
        start_time: float,
        conversation_history: list,
        events: WorkflowEvents,
        conversation_id: int | None = None,
    ) -> str:
        workspaces = get_global_workspaces()
        params = direct_tool["params"]
        params["workspace"] = workspaces.current
        params["action"] = direct_tool["action"]

        tool_result = await safe_tool_call(
            tool_name=direct_tool["tool_name"],
            parameters=params,
            role="agent",
        )

        if tool_result.get("success"):
            raw_output = tool_result.get("output", str(tool_result))
            if isinstance(raw_output, dict) and "output" in raw_output:
                output = raw_output["output"]
            elif isinstance(raw_output, str):
                output = raw_output
            else:
                output = str(raw_output)
        else:
            output = f"❌ Error en herramienta: {tool_result.get('output', 'desconocido')}"

        await emit_system(
            events,
            f"🛠️ Herramienta ejecutada: {direct_tool['tool_name']} {direct_tool['action']}",
        )

        elapsed = round(time.monotonic() - start_time, 1)
        await emit_stats(
            events,
            {
                "subtasks_total": 0,
                "subtasks_completed": 0,
                "tokens_used": 0,
                "elapsed_time": f"{elapsed}s",
                "current_agent": "—",
                "status": "Completado (tool directa)",
            },
        )
        await finalize_workflow(
            query=query,
            final_output=output,
            conversation_history=conversation_history,
            conversation_id=conversation_id,
            scorecard={"subtasks": 0, "tokens": 0, "tiempo": f"{elapsed}s"},
            subtasks_list=[f"Tool directa: {direct_tool['tool_name']}"],
            task_analysis={"primary_type": "tool_direct", "requires_full_orchestration": False},
            G=None,
            events=events,
        )
        return output

    @staticmethod
    async def _run_tdd_loop(
        query: str,
        project_root: str | None,
        workspace: str,
        workflow_allowed_tools: list | None,
        conversation_history: list,
        start_time: float,
        events: WorkflowEvents,
        conversation_id: int | None = None,
    ) -> str:
        from orchestration.workflows.tdd import execute_tdd_loop

        logger.info("🧪 Modo TDD activado — ejecutando ciclo TDD autónomo")
        await emit_system(events, "🧪 Modo TDD: ejecutando tests y corrigiendo...")
        await emit_stats(events, {"status": "TDD Loop", "current_agent": "TDD Agent"})

        tdd_result = await execute_tdd_loop(
            task=query,
            workspace=workspace,
            project_root=project_root,
            allowed_tools=workflow_allowed_tools,
            agent_type=settings.default_agent,
            conversation_history=conversation_history,
        )

        final_content = tdd_result["result"]
        elapsed = round(time.monotonic() - start_time, 1)

        scorecard = {
            "subtasks": tdd_result["iterations"],
            "completadas": 1 if tdd_result["status"] == "completed" else 0,
            "recuperadas": 0,
            "fallidas": 0 if tdd_result["status"] == "completed" else 1,
            "tokens": len(final_content) // 4,
            "tiempo": f"{elapsed}s",
            "calidad": "Alta" if tdd_result["status"] == "completed" else "Requiere revisión",
            "tipo_tarea": "tdd",
            "complejidad": "media",
        }

        await emit_stats(
            events,
            {
                "subtasks_total": tdd_result["iterations"],
                "subtasks_completed": 1,
                "tokens_used": scorecard["tokens"],
                "elapsed_time": scorecard["tiempo"],
                "current_agent": "—",
                "status": "Completado" if tdd_result["status"] == "completed" else "Fallido",
                "files_written": tdd_result.get("files_modified", []),
            },
        )

        await finalize_workflow(
            query=query,
            final_output=final_content,
            conversation_history=conversation_history,
            conversation_id=conversation_id,
            scorecard=scorecard,
            subtasks_list=[f"TDD iteración {i}" for i in range(1, tdd_result["iterations"] + 1)],
            task_analysis={"primary_type": "tdd", "requirements": query},
            G=None,
            events=events,
            project_root=project_root,
            workspace=workspace,
            files_written=tdd_result.get("files_modified", []),
        )
        return final_content

    @staticmethod
    async def run_full_workflow(
        session: Session,
    ) -> str | None:
        """Execute the full workflow pipeline.

        Receives a Session with unified context and events. Delegates routing
        to _dispatch_route to keep cyclomatic complexity low.

        Returns:
            str: final response, or None if the request was blocked.
        """
        ctx = session.context
        events = session.events
        query = ctx.query
        start_time = time.monotonic()

        # ── Checks de seguridad ──
        from core.security.undercover_mode import undercover

        if not await undercover.check_query(query):
            await emit_system(events, "❌ Solicitud bloqueada por razones de seguridad.")
            return None

        # ── Tool orchestrator setup ──
        from tools.orchestrator import ToolOrchestrator

        ToolOrchestrator.reset_token_budget()
        ToolOrchestrator.on_approval_required = events.on_approval_required

        try:
            # ── Ruta directa: comando de herramienta (fast path) ──
            direct_tool = _parse_direct_tool_command(query)
            if direct_tool:
                return await WorkflowOrchestrator._run_direct_tool(
                    direct_tool,
                    query,
                    start_time,
                    ctx.conversation_history,
                    events,
                    ctx.conversation_id,
                )

            # ── Dispatch according to active template / workflow ──
            return await WorkflowOrchestrator._dispatch_route(
                session=session,
                query=query,
                ctx=ctx,
                events=events,
                start_time=start_time,
            )
        finally:
            ToolOrchestrator.on_approval_required = None

    @staticmethod
    async def _dispatch_route(
        session: Session,
        query: str,
        ctx: WorkflowContext,
        events: WorkflowEvents,
        start_time: float,
    ) -> str:
        """Evaluate the template and active workflow to choose the execution route.

        Orden de precedencia:
        1. TDD loop (cuando active_wf == 'tdd')
        2. Collaborative (template.type == 'collaborative')
        3. Coordinated (template.type == 'coordinated')
        4. Development (template.type == 'development')
        5. Default: analiza la tarea y decide entre conversación simple u orquestación completa.
        """
        conversation_history = ctx.conversation_history
        conversation_id = ctx.conversation_id

        # ── Resolver workspace + plantilla ──
        workspaces = get_global_workspaces()
        template = load_workflow_template(workspaces.current, get_active_workflow())
        project_root = template.get("project", {}).get("root")
        if project_root:
            ctx.project_root = project_root
        elif not ctx.project_root:
            ctx.project_root = "."

        active_wf = get_active_workflow()
        logger.info("📌 Workflow activo: %s (workspace: %s)", active_wf, workspaces.current)

        # ── Agentes y herramientas permitidos ──
        allowed_agents = template.get("agents", {}).get("allowed")
        if allowed_agents is not None and not isinstance(allowed_agents, list):
            allowed_agents = None
        workflow_allowed_tools = template.get("tools", {}).get("allowed", None)
        if workflow_allowed_tools is not None:
            ctx.allowed_tools = workflow_allowed_tools

        _validate_template_consistency(template)

        # ── Ruta 1: TDD ──
        if active_wf == "tdd":
            return await WorkflowOrchestrator._run_tdd_loop(
                query,
                ctx.project_root or project_root,
                workspaces.current,
                workflow_allowed_tools,
                conversation_history,
                start_time,
                events,
                conversation_id,
            )

        # ── Ruta 2: Colaborativa ──
        if template.get("type") == "collaborative":
            return await CollaborativeOrchestrator.run(
                query=query,
                template=template,
                events=events,
                history=conversation_history,
                project_root=ctx.project_root,
                workspace=workspaces.current,
                force_agent=ctx.force_agent,
                workflow_allowed_tools=workflow_allowed_tools,
                start_time=start_time,
            )

        # ── Ruta 3: Coordinada ──
        if template.get("type") == "coordinated":
            return await WorkflowOrchestrator._run_coordinated(
                query=query,
                ctx=ctx,
                events=events,
                template=template,
                allowed_agents=allowed_agents,
                workflow_allowed_tools=workflow_allowed_tools,
                conversation_history=conversation_history,
                workspaces=workspaces,
                project_root=ctx.project_root,
                start_time=start_time,
                session=session,
            )

        # ── Route 4: Development (full orchestration without prior analysis) ──
        if template.get("type") == "development":
            return await WorkflowOrchestrator._run_full_orchestration(
                query,
                conversation_history,
                await TaskAnalyzer.analyze_task(query, is_follow_up=ctx.is_follow_up),
                ctx,
                events,
                project_root,
                workspaces.current,
                allowed_agents,
                workflow_allowed_tools,
                start_time,
            )

        # ── Ruta 5: Default — analizar y decidir simple vs full ──
        await emit_stats(
            events,
            {
                "subtasks_total": 0,
                "subtasks_completed": 0,
                "tokens_used": 0,
                "elapsed_time": "0s",
                "current_agent": "—",
                "status": "Iniciando",
            },
        )
        await emit_system(events, "🔍 Analizando tu solicitud...")
        await emit_stats(events, {"status": "Analizando", "current_agent": "TaskAnalyzer"})

        task_analysis = await TaskAnalyzer.analyze_task(query)

        if not task_analysis.get("requires_full_orchestration", True):
            return await WorkflowOrchestrator._run_simple_conversation(
                query,
                conversation_history,
                task_analysis,
                template,
                allowed_agents,
                start_time,
                events,
                conversation_id,
            )

        return await WorkflowOrchestrator._run_full_orchestration(
            query,
            conversation_history,
            task_analysis,
            ctx,
            events,
            project_root,
            workspaces.current,
            allowed_agents,
            workflow_allowed_tools,
            start_time,
        )

    @staticmethod
    async def _run_simple_conversation(
        query: str,
        conversation_history: list,
        task_analysis: dict,
        template: dict,
        allowed_agents: list | None,
        start_time: float,
        events: WorkflowEvents,
        conversation_id: int | None = None,
    ) -> str:
        logger.info("🚀 Modo conversación simple activado → ruta rápida")
        from agents.service import AgentsService

        default_agent = template.get("agents", {}).get("default_simple", settings.fallback_agent)
        if allowed_agents and default_agent not in allowed_agents:
            default_agent = (
                allowed_agents[0] if len(allowed_agents) > 0 else settings.fallback_agent
            )

        await emit_stats(
            events, {"status": "Respondiendo", "current_agent": default_agent.capitalize()}
        )

        try:
            stream_callback = events.on_stream_chunk if events else None

            # Apply context compression if enabled and history exceeds limit
            if settings.context_compression:
                max_tokens = settings.max_context_tokens
                if ContextManager.estimate_tokens(conversation_history) > max_tokens * 0.7:
                    conversation_history = ContextManager.compress_history(
                        conversation_history, max_tokens=max_tokens
                    )

            # Check if the agent has tools → use agent loop with function-calling
            from agents.registry import agents_registry as _reg

            agent_profile = _reg.get_profile(default_agent)
            agent_tools = agent_profile.get("tools", []) if agent_profile else []

            if agent_tools:
                # Use execute_agent_loop with the agent's tools
                from tools.specs import expand_allowed_tools

                expanded_tools = expand_allowed_tools(agent_tools)
                from orchestration.loop import execute_agent_loop

                loop_result = await execute_agent_loop(
                    task=query,
                    agent_type=default_agent,
                    history=conversation_history,
                    allowed_tools=expanded_tools,
                    workspace=get_global_workspaces().current,
                    on_stream_chunk=stream_callback,
                )
                final_content = clean_llm_response(
                    loop_result.get("result", str(loop_result))
                    if isinstance(loop_result, dict)
                    else str(loop_result)
                )
            else:
                raw_response = await AgentsService.execute_agent(
                    agent_type=default_agent,
                    query=query,
                    history=conversation_history,
                    on_stream_chunk=stream_callback,
                )
                final_content = clean_llm_response(raw_response)
        except Exception as e:
            logger.error(f"Error en ruta rápida: {e}")
            final_content = (
                "❌ Hubo un problema al procesar tu solicitud. ¿Puedes intentarlo de nuevo?"
            )

        # Apply anti-distillation protection
        from core.security.undercover_mode import undercover

        final_content = undercover.get_safe_response(final_content)

        scorecard = {
            "subtasks": 1,
            "completadas": 1,
            "recuperadas": 0,
            "fallidas": 0,
            "tokens": len(final_content) // 4,
            "tiempo": f"{round(time.monotonic() - start_time, 2)}s",
            "calidad": "Alta",
            "tipo_tarea": "simple_conversation",
            "complejidad": "simple",
        }

        await emit_stats(
            events,
            {
                "subtasks_total": 1,
                "subtasks_completed": 1,
                "tokens_used": scorecard["tokens"],
                "elapsed_time": scorecard["tiempo"],
                "current_agent": "Conversacional",
                "status": "Completado",
            },
        )

        await finalize_workflow(
            query=query,
            final_output=final_content,
            conversation_history=conversation_history,
            conversation_id=conversation_id,
            scorecard=scorecard,
            subtasks_list=["Respuesta directa"],
            task_analysis=task_analysis,
            G=None,
            events=events,
        )
        return final_content

    @staticmethod
    async def _run_coordinated(
        query: str,
        ctx,
        events,
        template,
        allowed_agents,
        workflow_allowed_tools,
        conversation_history,
        workspaces,
        project_root,
        start_time,
        session=None,
    ) -> str:
        """Run the multi-agent coordinator with DAG execution and shared blackboard."""
        from orchestration.decomposer import decompose_task_with_phases
        from orchestration.workflows.coordinated import MultiAgentCoordinator

        coordinator = MultiAgentCoordinator()
        all_files_written: list[str] = []

        # Try phase-aware decomposition first
        decomposition = await decompose_task_with_phases(
            query,
            is_follow_up=ctx.is_follow_up,
            conversation_history=conversation_history,
            project_root=project_root,
        )
        phases = decomposition.get("phases", [])

        if phases and len(phases) > 1:
            # Multi-phase execution
            await emit_system(events, f"🧩 Decomposed into {len(phases)} phases...")
            await emit_stats(events, {"status": "Executing phases", "current_agent": "Coordinator"})

            all_phase_results = {}
            subtasks_list = []
            for pi, phase in enumerate(phases):
                phase_name = phase["phase"]
                phase_subtasks = phase["subtasks"]
                subtasks_list.extend(phase_subtasks)
                await emit_system(
                    events,
                    f"📌 Phase {pi+1}/{len(phases)}: {phase_name} ({len(phase_subtasks)} subtasks)",
                )

                st_objs = [
                    {"id": f"{phase_name}_{i}", "description": s}
                    for i, s in enumerate(phase_subtasks)
                ]
                assignments = await coordinator.assign_agents(st_objs, allowed_agents)

                phase_results = await coordinator.execute_dag(
                    subtasks=st_objs,
                    assignments=assignments,
                    project_root=project_root,
                    workspace=workspaces.current,
                    allowed_tools=workflow_allowed_tools,
                    events=events,
                    session=session,
                )

                # Write to blackboard with phase namespace
                for sid, r in phase_results.items():
                    await coordinator.blackboard.write(
                        f"{sid}_result",
                        {
                            "agent": r.get("agent", "?"),
                            "task": str(r.get("result", ""))[:300],
                            "status": r.get("status", "completed"),
                            "files_written": r.get("files_written", []),
                        },
                        phase=phase_name,
                    )
                all_phase_results.update(phase_results)
                all_files_written.extend(_collect_files_written(phase_results))

                # Persist blackboard to DB after each phase
                if ctx.conversation_id:
                    await coordinator.blackboard.sync_to_db(f"coord_{ctx.conversation_id}")

            results = all_phase_results
            total_subtasks = sum(len(p["subtasks"]) for p in phases)
        else:
            # Fallback: single-phase DAG (original behavior)
            await emit_system(events, "🧩 Decomposing task into coordinated DAG...")
            await emit_stats(events, {"status": "Decomposing DAG", "current_agent": "Coordinator"})

            dag = await coordinator.decompose_task_dag(query)
            subtasks = dag["subtasks"]
            logger.info(f"Coordinator DAG: {len(subtasks)} subtask(s)")

            await emit_system(events, f"📋 {len(subtasks)} subtask(s) planned, assigning agents...")
            await emit_stats(
                events,
                {
                    "subtasks_total": len(subtasks),
                    "subtasks_completed": 0,
                    "status": "Assigning agents",
                    "current_agent": "Coordinator",
                },
            )

            assignments = await coordinator.assign_agents(subtasks, allowed_agents)

            await emit_system(
                events, f"🚀 Executing {len(subtasks)} subtask(s) with DAG parallelism..."
            )
            await emit_stats(events, {"status": "Executing DAG", "current_agent": "Coordinator"})

            results = await coordinator.execute_dag(
                subtasks=subtasks,
                assignments=assignments,
                project_root=project_root,
                workspace=workspaces.current,
                allowed_tools=workflow_allowed_tools,
                events=events,
                session=session,
            )
            all_files_written.extend(_collect_files_written(results))
            total_subtasks = len(subtasks)
            subtasks_list = [st.get("description", st.get("id", "")) for st in subtasks]

        completed = sum(1 for r in results.values() if r.get("status") == "completed")
        failed = sum(1 for r in results.values() if r.get("status") == "failed")
        await emit_stats(
            events,
            {
                "subtasks_completed": completed,
                "status": f"Aggregating ({completed} done, {failed} failed)",
                "current_agent": "Coordinator",
                "files_written": all_files_written,
            },
        )

        await emit_system(events, "📊 Aggregating results with confidence evaluation...")
        final_content = await coordinator.aggregate_with_confidence(query, results)

        # Finalize
        from orchestration.finalizer import finalize_workflow
        from orchestration.utils import generate_scorecard

        scorecard = generate_scorecard(
            results=results,
            G=None,
            final_content=final_content,
            query=query,
            task_analysis={"primary_type": "coordinated", "requires_full_orchestration": True},
            start_time=start_time,
        )

        await finalize_workflow(
            query=query,
            final_output=final_content,
            conversation_history=conversation_history,
            conversation_id=ctx.conversation_id,
            scorecard=scorecard,
            subtasks_list=subtasks_list,
            task_analysis={"primary_type": "coordinated", "requires_full_orchestration": True},
            G=None,
            events=events,
            workspace=workspaces.current,
            project_root=ctx.project_root or project_root,
            files_written=all_files_written,
        )

        return final_content

    @staticmethod
    async def _run_full_orchestration(
        query: str,
        conversation_history: list,
        task_analysis: dict,
        ctx: WorkflowContext,
        events: WorkflowEvents,
        project_root: str | None,
        workspace: str,
        allowed_agents: list | None,
        workflow_allowed_tools: list | None,
        start_time: float,
    ) -> str:
        # Create shared blackboard for cross-subtask context
        from orchestration.workflows.blackboard import SharedBlackboard

        ctx.blackboard = SharedBlackboard()

        agent_results: list[dict] = []

        # Apply context compression if enabled
        if settings.context_compression:
            if (
                ContextManager.estimate_tokens(conversation_history)
                > settings.max_context_tokens * 0.8
            ):
                conversation_history = ContextManager.compress_history(
                    conversation_history, max_tokens=settings.max_context_tokens
                )

        subtasks_list = await decompose_task(
            query,
            is_follow_up=ctx.is_follow_up,
            conversation_history=conversation_history,
            project_root=project_root,
        )

        await emit_system(events, f"📊 {len(subtasks_list)} subtareas generadas")
        await emit_stats(
            events,
            {
                "subtasks_total": len(subtasks_list),
                "subtasks_completed": 0,
                "status": "Descomponiendo",
                "subtask_list": [
                    {
                        "name": (t if isinstance(t, str) else t.get("description", str(t)))[:60],
                        "status": "pending",
                    }
                    for t in subtasks_list
                ],
            },
        )

        G = nx.DiGraph()
        for i, task in enumerate(subtasks_list):
            if isinstance(task, dict):
                task_desc = task.get("description", str(task))
                agent = task.get("agent", settings.fallback_agent)
            else:
                task_desc = str(task)
                agent = settings.fallback_agent

            G.add_node(i, task=task_desc, agent=agent, status="pending")
            if i > 0:
                G.add_edge(i - 1, i)

        logger.info("🔍 Supervisor revisando selección de agentes...")
        router_selections = []

        primary_type = task_analysis.get("primary_type", "mixed")

        for task in subtasks_list:
            desc = task if isinstance(task, str) else task.get("description", str(task))
            best_agent = await agent_router.select_best_agent(
                desc,
                primary_type=primary_type,
                allowed_agents=allowed_agents,
            )
            router_selections.append(best_agent)

        corrected_agents = await WorkflowSupervisor.review_and_correct(
            task_analysis,
            router_selections,
            subtasks_list,
            allowed_agents=allowed_agents or [],
        )

        for node in G.nodes():
            if node < len(corrected_agents):
                G.nodes[node]["agent"] = corrected_agents[node]

        await update_live_diagram(G, events)

        results: dict[int, dict[str, Any]] = {}
        for node in nx.topological_sort(G):
            forced_agent = corrected_agents[node] if node < len(corrected_agents) else None
            task_desc = G.nodes[node]["task"]
            agent = G.nodes[node]["agent"]

            await emit_stats(
                events,
                {
                    "current_agent": agent.capitalize(),
                    "status": f"Ejecutando subtarea {node + 1}",
                    "subtask_list": _build_subtask_list(subtasks_list, results, node, "running"),
                },
            )

            try:
                # Filter tool messages without tool_call_id (DeepSeek rejects them)
                clean_history = [
                    m
                    for m in conversation_history
                    if m.get("role") not in ("tool",) or m.get("tool_call_id")
                ]
                result = await execute_subtask_safe(
                    node=node,
                    task=task_desc,
                    G=G,
                    conversation_history=clean_history,
                    current_pdf_text=ctx.current_pdf_text,
                    ctx=ctx,
                    events=events,
                    forced_agent=forced_agent,
                    task_analysis=task_analysis,
                )
            except Exception as e:
                logger.error(f"Subtask {node} failed with exception: {e}")
                result = {
                    "status": "failed",
                    "result": f"Error in subtask {node}: {e}",
                    "files_written": [],
                }

            results[node] = result

            # Write to blackboard for cross-subtask context
            if ctx.blackboard is not None:
                await ctx.blackboard.write(
                    f"subtask_{node}_result",
                    {
                        "task": task_desc[:200],
                        "agent": agent,
                        "status": result.get("status", "completed"),
                        "files_written": result.get("files_written", []),
                    },
                    phase="default",
                )

            # Append agent/tool messages for export (NOT to LLM conversation_history)
            result_text = str(result.get("result", ""))[:800]
            if result_text.strip():
                agent_results.append(
                    {
                        "role": "agent",
                        "content": f"[{agent.capitalize()} - {str(task_desc)[:60]}]\n{result_text}",
                    }
                )
            files_written = result.get("files_written", [])
            if files_written:
                agent_results.append(
                    {"role": "tool", "content": f"Files written: {', '.join(files_written[:10])}"}
                )
            files_written = result.get("files_written", [])
            if files_written:
                conversation_history.append(
                    {
                        "role": "tool",
                        "content": f"Files written: {', '.join(files_written[:10])}",
                    }
                )

            if result.get("status") == "clarification_needed":
                ctx.last_clarification = result["clarification_question"]
                blackboard_snap = ctx.blackboard.snapshot() if ctx.blackboard is not None else None
                paused_data = {
                    "subtask_index": node,
                    "subtasks": subtasks_list,
                    "results": results,
                    "corrected_agents": corrected_agents,
                    "paused_loop_state": result["paused_loop_state"],
                    "conversation_history": conversation_history,
                    "task_analysis": task_analysis,
                    "G_nodes": [G.nodes[i] for i in range(len(G.nodes))],
                    "blackboard_snapshot": blackboard_snap,
                }
                await _save_paused_session(
                    conv_id=ctx.conversation_id,
                    query=query,
                    question=result["clarification_question"],
                    options=result.get("clarification_options", []),
                    paused_state=paused_data,
                )
                logger.info(
                    f"⏸️ Workflow pausado en subtarea {node + 1}: {result['clarification_question'][:80]}"
                )
                return "[PAUSED:clarification_needed]"

            completed = sum(1 for r in results.values() if r.get("status") == "completed")
            await emit_stats(
                events,
                {
                    "subtasks_completed": completed,
                    "subtask_list": _build_subtask_list(subtasks_list, results, node, "completed"),
                },
            )
            await update_live_diagram(G, events)

        # Collect all files modified by subtasks
        all_files_written = _collect_files_written(results)

        # ────────────────────────────────────────────────────────
        # ╔══════════════════════════════════════════════════════╗
        # ║         GLOBAL PROJECT VERIFICATION                  ║
        # ╚══════════════════════════════════════════════════════╝
        if project_root:
            await emit_system(events, "🔍 Realizando verificación global del proyecto...")
            await emit_stats(
                events, {"status": "Verificando proyecto", "current_agent": "Verificador"}
            )

            best_agent = corrected_agents[0] if corrected_agents else settings.default_agent

            global_ok = await _run_global_verification(
                query=query,
                project_root=project_root,
                workspace=ctx.workspace,
                allowed_tools=workflow_allowed_tools or ["file_manager", "git_manager"],
                best_agent=best_agent,
                events=events,
            )
            if global_ok:
                await emit_system(events, "✅ Verificación global superada.")
            else:
                await emit_system(
                    events,
                    "⚠️ Se detectaron incumplimientos globales; se aplicaron correcciones automáticas.",
                )
        # ────────────────────────────────────────────────────────
        # Update all_files_written with what was actually created
        if project_root:
            base = paths.memory_dir(ctx.workspace) / project_root
            if base.exists():
                for fpath in base.rglob("*"):
                    if fpath.is_file() and fpath.suffix in {
                        ".py",
                        ".txt",
                        ".md",
                        ".yml",
                        ".yaml",
                        ".json",
                        ".cfg",
                        ".ini",
                        ".toml",
                    }:
                        rel = str(fpath.relative_to(base))
                        if rel not in all_files_written:
                            all_files_written.append(rel)

        await emit_system(events, "🔄 Preparando la respuesta final...")
        await emit_stats(events, {"status": "Sintetizando", "current_agent": "ResultAggregator"})

        try:
            final_content = await ResultAggregator.aggregate_results(
                query,
                results,
                G,
                task_analysis,
                files_written=all_files_written,
                project_root=project_root,
                workspace=workspace,
            )
        except Exception as e:
            logger.error(f"Result aggregation failed: {e}")
            # Fallback: build a simple summary from raw results
            result_summaries = []
            for node, r in results.items():
                status = r.get("status", "unknown")
                output = r.get("result", str(r))[:200]
                result_summaries.append(f"- Subtask {node}: {status}{output}")
            final_content = (
                "⚠️ Result aggregation encountered an error. Partial results:\n\n"
                + "\n".join(result_summaries)
            )

        # Apply anti-distillation protection
        from core.security.undercover_mode import undercover

        final_content = undercover.get_safe_response(final_content)

        scorecard = generate_scorecard(
            results, G, final_content, query, task_analysis, start_time, ctx.enc
        )
        elapsed = round(time.monotonic() - start_time, 1)

        await emit_stats(
            events,
            {
                "subtasks_total": len(subtasks_list),
                "subtasks_completed": len(results),
                "tokens_used": scorecard.get("tokens", 0),
                "elapsed_time": f"{elapsed}s",
                "current_agent": "—",
                "status": "Completado",
                "files_written": all_files_written,
            },
        )

        export_history = list(conversation_history) + agent_results
        await finalize_workflow(
            query=query,
            final_output=final_content,
            conversation_history=export_history,
            conversation_id=ctx.conversation_id,
            scorecard=scorecard,
            subtasks_list=subtasks_list,
            task_analysis=task_analysis,
            G=G,
            events=events,
            project_root=project_root,
            workspace=ctx.workspace,
            files_written=all_files_written,
        )

        if final_content and final_content.strip():
            from orchestration.events import emit_assistant

            await emit_assistant(events, final_content)

        return final_content

    @staticmethod
    async def resume_workflow(session: Session, answer: str) -> str | None:
        """Resume a paused workflow after the user provides a clarification answer.

        Loads the most recent PausedSession, injects the answer back into the
        agent loop state, and continues execution from where it was paused.
        """
        ctx = session.context
        events = session.events
        conv_id = ctx.conversation_id

        async with get_async_session() as db_session:
            from sqlalchemy import select

            stmt = (
                select(PausedSession)
                .where(PausedSession.conversation_id == conv_id)  # type: ignore[arg-type]
                .where(PausedSession.resolved_at == None)  # type: ignore[arg-type]  # noqa: E711
                .order_by(PausedSession.created_at.desc())  # type: ignore[attr-defined]
                .limit(1)
            )
            result = await db_session.execute(stmt)
            paused = result.scalar()
            if paused is None:
                logger.warning(f"No paused session found for conversation {conv_id}")
                return None

            paused.clarification_answer = answer
            paused.resolved_at = (
                __import__("datetime").datetime.now(__import__("datetime").UTC).replace(tzinfo=None)
            )
            db_session.add(paused)

        paused_data = paused.paused_state
        if isinstance(paused_data, str):
            paused_data = __import__("json").loads(paused_data)
        question = paused.clarification_question

        # Restore blackboard if saved
        blackboard_snap = paused_data.get("blackboard_snapshot")
        if blackboard_snap and ctx.blackboard is not None:
            ctx.blackboard.restore(blackboard_snap)
            logger.info("Blackboard restored from pause snapshot")

        await emit_system(events, f"📝 Respuesta de clarificación: {answer}")

        # Reconstruir el estado del agent loop
        loop_state = paused_data.get("paused_loop_state", {})
        messages = loop_state.get("messages", [])
        messages.append({"role": "user", "content": f"[Respuesta a: {question}] {answer}"})

        from orchestration.loop import execute_agent_loop

        result = await execute_agent_loop(
            task=loop_state.get("task", ""),
            agent_type=loop_state.get("agent_type", settings.default_agent),
            history=messages,
            allowed_tools=loop_state.get("allowed_tools"),
            project_root=ctx.project_root,
            workspace=ctx.workspace,
            session=session,
        )

        # Continue with remaining subtasks
        subtasks = paused_data.get("subtasks", [])
        current_idx = paused_data.get("subtask_index", 0)
        remaining = subtasks[current_idx:]

        # If the current subtask finished successfully, update results
        paused_results = paused_data.get("results", {})
        if result.get("status") == "completed":
            paused_results[current_idx] = {
                "node": current_idx,
                "result": result["result"],
                "status": "completed",
                "files_written": result.get("files_written", []),
            }

        # Ejecutar las subtareas restantes (si hay)
        import networkx as nx

        G = nx.DiGraph()
        for i, task in enumerate(subtasks):
            G.add_node(i, task=task, agent="developer", status="pending")

        corrected_agents = paused_data.get("corrected_agents", ["developer"] * len(subtasks))
        conversation_history = paused_data.get("conversation_history", [])
        conversation_history.append({"role": "user", "content": f"[Clarificación] {answer}"})

        for node in range(current_idx + 1, len(subtasks)):
            if session.is_cancelled:
                break
            forced_agent = corrected_agents[node] if node < len(corrected_agents) else None
            task_desc = G.nodes[node]["task"]

            await emit_stats(
                events,
                {
                    "current_agent": forced_agent or "developer",
                    "status": f"Ejecutando subtarea {node + 1}",
                },
            )

            try:
                subtask_result = await execute_subtask_safe(
                    node=node,
                    task=task_desc,
                    G=G,
                    conversation_history=conversation_history,
                    current_pdf_text=ctx.current_pdf_text,
                    ctx=ctx,
                    events=events,
                    forced_agent=forced_agent,
                    task_analysis=paused_data.get("task_analysis"),
                )
                if subtask_result.get("status") == "clarification_needed":
                    # Pausa anidada — guardar y retornar
                    ctx.last_clarification = subtask_result["clarification_question"]
                    await _save_paused_session(
                        conv_id=conv_id,
                        query=paused_data.get("query", ""),
                        question=subtask_result["clarification_question"],
                        options=subtask_result.get("clarification_options", []),
                        paused_state={
                            **paused_data,
                            "subtask_index": node,
                            "paused_loop_state": subtask_result["paused_loop_state"],
                            "corrected_agents": corrected_agents,
                        },
                    )
                    return "[PAUSED:clarification_needed]"
                paused_results[node] = subtask_result
            except Exception as e:
                logger.error(f"Subtask {node} failed during resume: {e}")
                paused_results[node] = {"status": "failed", "result": str(e), "files_written": []}

        # Finalizar — agregar y finalizar
        query = ctx.query
        from orchestration.aggregator import ResultAggregator  # noqa: F811

        all_files = _collect_files_written(paused_results)
        final_content = await ResultAggregator.aggregate_results(
            query=query,
            results=paused_results,
            G=G,
            task_analysis=paused_data.get("task_analysis", {}),
            files_written=all_files,
            project_root=ctx.project_root,
            workspace=ctx.workspace,
        )

        await finalize_workflow(
            query=query,
            final_output=final_content,
            conversation_history=conversation_history,
            conversation_id=conv_id,
            scorecard={},
            subtasks_list=subtasks,
            task_analysis=paused_data.get("task_analysis"),
            G=G,
            events=events,
            project_root=ctx.project_root,
            workspace=ctx.workspace,
            files_written=all_files,
        )

        await emit_stats(
            events,
            {
                "subtasks_total": len(subtasks),
                "subtasks_completed": len(paused_results),
                "status": "Completado",
                "current_agent": "—",
                "files_written": all_files,
            },
        )

        if final_content and final_content.strip():
            from orchestration.context import emit_assistant

            await emit_assistant(events, final_content)

        return final_content
Functions
run_full_workflow async staticmethod
run_full_workflow(session: Session) -> str | None

Execute the full workflow pipeline.

Receives a Session with unified context and events. Delegates routing to _dispatch_route to keep cyclomatic complexity low.

Returns:

Name Type Description
str str | None

final response, or None if the request was blocked.

Source code in orchestration/workflows/orchestrator.py
@staticmethod
async def run_full_workflow(
    session: Session,
) -> str | None:
    """Execute the full workflow pipeline.

    Receives a Session with unified context and events. Delegates routing
    to _dispatch_route to keep cyclomatic complexity low.

    Returns:
        str: final response, or None if the request was blocked.
    """
    ctx = session.context
    events = session.events
    query = ctx.query
    start_time = time.monotonic()

    # ── Checks de seguridad ──
    from core.security.undercover_mode import undercover

    if not await undercover.check_query(query):
        await emit_system(events, "❌ Solicitud bloqueada por razones de seguridad.")
        return None

    # ── Tool orchestrator setup ──
    from tools.orchestrator import ToolOrchestrator

    ToolOrchestrator.reset_token_budget()
    ToolOrchestrator.on_approval_required = events.on_approval_required

    try:
        # ── Ruta directa: comando de herramienta (fast path) ──
        direct_tool = _parse_direct_tool_command(query)
        if direct_tool:
            return await WorkflowOrchestrator._run_direct_tool(
                direct_tool,
                query,
                start_time,
                ctx.conversation_history,
                events,
                ctx.conversation_id,
            )

        # ── Dispatch according to active template / workflow ──
        return await WorkflowOrchestrator._dispatch_route(
            session=session,
            query=query,
            ctx=ctx,
            events=events,
            start_time=start_time,
        )
    finally:
        ToolOrchestrator.on_approval_required = None
resume_workflow async staticmethod
resume_workflow(
    session: Session, answer: str
) -> str | None

Resume a paused workflow after the user provides a clarification answer.

Loads the most recent PausedSession, injects the answer back into the agent loop state, and continues execution from where it was paused.

Source code in orchestration/workflows/orchestrator.py
@staticmethod
async def resume_workflow(session: Session, answer: str) -> str | None:
    """Resume a paused workflow after the user provides a clarification answer.

    Loads the most recent PausedSession, injects the answer back into the
    agent loop state, and continues execution from where it was paused.
    """
    ctx = session.context
    events = session.events
    conv_id = ctx.conversation_id

    async with get_async_session() as db_session:
        from sqlalchemy import select

        stmt = (
            select(PausedSession)
            .where(PausedSession.conversation_id == conv_id)  # type: ignore[arg-type]
            .where(PausedSession.resolved_at == None)  # type: ignore[arg-type]  # noqa: E711
            .order_by(PausedSession.created_at.desc())  # type: ignore[attr-defined]
            .limit(1)
        )
        result = await db_session.execute(stmt)
        paused = result.scalar()
        if paused is None:
            logger.warning(f"No paused session found for conversation {conv_id}")
            return None

        paused.clarification_answer = answer
        paused.resolved_at = (
            __import__("datetime").datetime.now(__import__("datetime").UTC).replace(tzinfo=None)
        )
        db_session.add(paused)

    paused_data = paused.paused_state
    if isinstance(paused_data, str):
        paused_data = __import__("json").loads(paused_data)
    question = paused.clarification_question

    # Restore blackboard if saved
    blackboard_snap = paused_data.get("blackboard_snapshot")
    if blackboard_snap and ctx.blackboard is not None:
        ctx.blackboard.restore(blackboard_snap)
        logger.info("Blackboard restored from pause snapshot")

    await emit_system(events, f"📝 Respuesta de clarificación: {answer}")

    # Reconstruir el estado del agent loop
    loop_state = paused_data.get("paused_loop_state", {})
    messages = loop_state.get("messages", [])
    messages.append({"role": "user", "content": f"[Respuesta a: {question}] {answer}"})

    from orchestration.loop import execute_agent_loop

    result = await execute_agent_loop(
        task=loop_state.get("task", ""),
        agent_type=loop_state.get("agent_type", settings.default_agent),
        history=messages,
        allowed_tools=loop_state.get("allowed_tools"),
        project_root=ctx.project_root,
        workspace=ctx.workspace,
        session=session,
    )

    # Continue with remaining subtasks
    subtasks = paused_data.get("subtasks", [])
    current_idx = paused_data.get("subtask_index", 0)
    remaining = subtasks[current_idx:]

    # If the current subtask finished successfully, update results
    paused_results = paused_data.get("results", {})
    if result.get("status") == "completed":
        paused_results[current_idx] = {
            "node": current_idx,
            "result": result["result"],
            "status": "completed",
            "files_written": result.get("files_written", []),
        }

    # Ejecutar las subtareas restantes (si hay)
    import networkx as nx

    G = nx.DiGraph()
    for i, task in enumerate(subtasks):
        G.add_node(i, task=task, agent="developer", status="pending")

    corrected_agents = paused_data.get("corrected_agents", ["developer"] * len(subtasks))
    conversation_history = paused_data.get("conversation_history", [])
    conversation_history.append({"role": "user", "content": f"[Clarificación] {answer}"})

    for node in range(current_idx + 1, len(subtasks)):
        if session.is_cancelled:
            break
        forced_agent = corrected_agents[node] if node < len(corrected_agents) else None
        task_desc = G.nodes[node]["task"]

        await emit_stats(
            events,
            {
                "current_agent": forced_agent or "developer",
                "status": f"Ejecutando subtarea {node + 1}",
            },
        )

        try:
            subtask_result = await execute_subtask_safe(
                node=node,
                task=task_desc,
                G=G,
                conversation_history=conversation_history,
                current_pdf_text=ctx.current_pdf_text,
                ctx=ctx,
                events=events,
                forced_agent=forced_agent,
                task_analysis=paused_data.get("task_analysis"),
            )
            if subtask_result.get("status") == "clarification_needed":
                # Pausa anidada — guardar y retornar
                ctx.last_clarification = subtask_result["clarification_question"]
                await _save_paused_session(
                    conv_id=conv_id,
                    query=paused_data.get("query", ""),
                    question=subtask_result["clarification_question"],
                    options=subtask_result.get("clarification_options", []),
                    paused_state={
                        **paused_data,
                        "subtask_index": node,
                        "paused_loop_state": subtask_result["paused_loop_state"],
                        "corrected_agents": corrected_agents,
                    },
                )
                return "[PAUSED:clarification_needed]"
            paused_results[node] = subtask_result
        except Exception as e:
            logger.error(f"Subtask {node} failed during resume: {e}")
            paused_results[node] = {"status": "failed", "result": str(e), "files_written": []}

    # Finalizar — agregar y finalizar
    query = ctx.query
    from orchestration.aggregator import ResultAggregator  # noqa: F811

    all_files = _collect_files_written(paused_results)
    final_content = await ResultAggregator.aggregate_results(
        query=query,
        results=paused_results,
        G=G,
        task_analysis=paused_data.get("task_analysis", {}),
        files_written=all_files,
        project_root=ctx.project_root,
        workspace=ctx.workspace,
    )

    await finalize_workflow(
        query=query,
        final_output=final_content,
        conversation_history=conversation_history,
        conversation_id=conv_id,
        scorecard={},
        subtasks_list=subtasks,
        task_analysis=paused_data.get("task_analysis"),
        G=G,
        events=events,
        project_root=ctx.project_root,
        workspace=ctx.workspace,
        files_written=all_files,
    )

    await emit_stats(
        events,
        {
            "subtasks_total": len(subtasks),
            "subtasks_completed": len(paused_results),
            "status": "Completado",
            "current_agent": "—",
            "files_written": all_files,
        },
    )

    if final_content and final_content.strip():
        from orchestration.context import emit_assistant

        await emit_assistant(events, final_content)

    return final_content

Functions

orchestration.workflows.collaborative

Collaborative Orchestrator — multi-agent debate with rounds and consensus.

Type 'collaborative' workflow: multiple agents with different perspectives analyze a question, debate in rounds, and reach consensus via moderator.

Classes

CollaborativeOrchestrator

Orquestador de debate colaborativo con rondas iterativas.

Source code in orchestration/workflows/collaborative.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class CollaborativeOrchestrator:
    """Orquestador de debate colaborativo con rondas iterativas."""

    @staticmethod
    async def run(
        query: str,
        template: dict,
        events: WorkflowEvents,
        history: list | None = None,
        project_root: str | None = None,
        workspace: str = "main",
        force_agent: str | None = None,
        workflow_allowed_tools: list[str] | None = None,
        start_time: float = 0.0,
    ) -> str:
        panel = template.get("panel", [])
        rounds = template.get("rounds", 3)
        moderator_name = template.get("moderator", "moderador")
        requires_project = template.get("requires_project")

        if len(panel) < 2:
            await emit_system(events, "⚠️ El panel colaborativo necesita al menos 2 agentes.")
            return "Error: panel insuficiente."

        conversation_history = history or []

        # Build project context if available
        project_context = ""
        if project_root and (requires_project in (True, "optional", "true")):
            project_context = CollaborativeOrchestrator._build_project_context(
                project_root, workspace
            )
            if project_context:
                await emit_system(events, f"📁 Contexto del proyecto cargado: `{project_root}`")

        # Forced leader agent: add to panel if not present
        leader = None
        if force_agent:
            if force_agent not in panel:
                panel = list(panel) + [force_agent]
            leader = force_agent
            await emit_system(events, f"🎯 **{force_agent.capitalize()}** lidera el debate")

        # ── RONDA 1: Opiniones iniciales ──
        await emit_system(
            events, f"🤝 **Debate colaborativo iniciado** — {len(panel)} agentes, {rounds} rondas"
        )
        await emit_stats(events, {"status": f"Ronda 1/{rounds}", "current_agent": "Panel"})

        previous_opinions: dict[str, str] = {}

        for r in range(1, rounds + 1):
            await emit_system(events, f"\n--- **Ronda {r} de {rounds}** ---")

            round_opinions: dict[str, str] = {}

            if r == 1:
                # Round 1: question + project context only
                tasks = [
                    CollaborativeOrchestrator._ask_agent(
                        agent_name=name,
                        query=query,
                        others_opinions=None,
                        events=events,
                        history=conversation_history,
                        project_context=project_context,
                        project_root=project_root,
                        workspace=workspace,
                        workflow_allowed_tools=workflow_allowed_tools,
                        round_label=f"Ronda {r}",
                    )
                    for name in panel
                ]
            else:
                # Subsequent rounds: question + previous opinions + context
                tasks = [
                    CollaborativeOrchestrator._ask_agent(
                        agent_name=name,
                        query=query,
                        others_opinions=previous_opinions,
                        events=events,
                        history=conversation_history,
                        project_context=project_context,
                        project_root=project_root,
                        workspace=workspace,
                        workflow_allowed_tools=workflow_allowed_tools,
                        round_label=f"Ronda {r}",
                    )
                    for name in panel
                ]

            try:
                results_raw = await asyncio.wait_for(
                    asyncio.gather(*tasks, return_exceptions=True), timeout=120
                )
            except TimeoutError:
                logger.warning(f"Round {r} timed out after 120s")
                await emit_system(events, f"⚠️ Ronda {r} excedió el tiempo límite (120s).")
                break

            for name, result in zip(panel, results_raw, strict=True):
                if isinstance(result, Exception):
                    await emit_system(events, f"⚠️ {name.capitalize()}: error — {result}")
                    round_opinions[name] = f"[Error: {result}]"
                else:
                    round_opinions[name] = str(result)

            previous_opinions = round_opinions
            await emit_stats(events, {"status": f"Ronda {r}/{rounds} completada"})

        # ── MODERATOR: Final synthesis ──
        await emit_system(
            events, f"\n⚖️ **{moderator_name.capitalize()}** sintetizando consenso final..."
        )
        await emit_stats(
            events, {"status": "Consenso final", "current_agent": moderator_name.capitalize()}
        )

        debate_summary = CollaborativeOrchestrator._build_debate_summary(
            query, previous_opinions, leader
        )

        try:
            final_answer = await CollaborativeOrchestrator._ask_moderator(
                agent_name=moderator_name,
                debate_summary=debate_summary,
                events=events,
                history=conversation_history,
                workflow_allowed_tools=workflow_allowed_tools,
            )
        except Exception as e:
            logger.error(f"Moderator failed: {e}", exc_info=True)
            final_answer = CollaborativeOrchestrator._fallback_consensus(debate_summary)

        await emit_system(events, "✅ Debate colaborativo finalizado.")
        await emit_stats(events, {"status": "Completado", "current_agent": "—"})

        # ── Unified finalization (shared with development/coordinated) ──
        from orchestration.finalizer import finalize_workflow
        from orchestration.utils import generate_scorecard

        # Build results dict compatible with generate_scorecard / finalize_workflow
        panel_results = {
            name: {"status": "completed", "result": opinion, "files_written": []}
            for name, opinion in previous_opinions.items()
        }
        scorecard = generate_scorecard(
            results=panel_results,
            G=None,
            final_content=final_answer,
            query=query,
            task_analysis={"primary_type": "collaborative", "requires_full_orchestration": True},
            start_time=start_time,
        )

        await finalize_workflow(
            query=query,
            final_output=final_answer,
            conversation_history=conversation_history,
            scorecard=scorecard,
            subtasks_list=[f"{name}: {op[:100]}" for name, op in previous_opinions.items()],
            task_analysis={"primary_type": "collaborative", "requires_full_orchestration": True},
            G=None,
            events=events,
            workspace=workspace,
            project_root=project_root,
            files_written=[],
        )

        return final_answer

    @staticmethod
    async def _ask_agent(
        agent_name: str,
        query: str,
        others_opinions: dict[str, str] | None,
        events: WorkflowEvents,
        history: list,
        project_context: str = "",
        project_root: str | None = None,
        workspace: str = "main",
        workflow_allowed_tools: list[str] | None = None,
        round_label: str = "",
    ) -> str:
        """Ask an agent for their opinion, with tool access (1 round).

        Agents can use their registered tools (file_manager, code_search, etc.)
        to inspect the project before responding. Tool results feed back
        into a second LLM call for an informed response.
        """
        enriched_query = CollaborativeOrchestrator._build_query(
            query, agent_name, others_opinions, project_context
        )

        await emit_system(events, f"💬 **{agent_name.capitalize()}** está pensando...")

        # Get agent profile and allowed tools
        profile = agents_registry.get_profile(agent_name)
        agent_tools = profile.get("tools", []) if profile else []
        tool_defs = None
        effective_tools: list[str] = []

        if agent_tools:
            from tools.specs import expand_allowed_tools

            expanded_profile = expand_allowed_tools(agent_tools) or []
            # Filter against workflow allowlist if provided
            if workflow_allowed_tools is not None:
                effective_tools = [
                    t for t in expanded_profile if tool_matches_allowlist(t, workflow_allowed_tools)
                ]
            else:
                effective_tools = expanded_profile
            tool_defs = build_tool_definitions(effective_tools) if effective_tools else None
        else:
            tool_defs = None
        model_role = profile.get("model_role", "agent") if profile else "agent"
        temperature = profile.get("temperature", 0.4) if profile else 0.4

        # Build messages with token-aware compression
        from core.config import settings as _settings
        from core.context_manager import ContextManager

        budget = int(_settings.max_context_tokens * 0.6)
        messages = ContextManager.compress_history(history, max_tokens=budget)
        if not messages:
            messages = history.copy()
        messages.append({"role": "user", "content": enriched_query})

        try:
            import json as _json

            # First call — agent may request a tool
            response = await models.call(
                messages=messages,
                role=model_role,
                temperature=temperature,
                tools=tool_defs,
                tool_choice="auto" if tool_defs else "none",
            )
            text = clean_llm_response(response)

            # Extract reasoning_content (DeepSeek thinking mode — must be passed back)
            reasoning = None
            try:
                choice = response.choices[0]
                msg = choice.message
                reasoning = getattr(msg, "reasoning_content", None)
            except (AttributeError, IndexError, TypeError):
                pass

            # If agent requested tools, execute and feed results back
            tool_calls = tool_calls_from_response(response)
            if tool_calls and effective_tools:
                # Build assistant message WITH tool_calls (API requirement)
                assistant_msg: dict = {
                    "role": "assistant",
                    "content": text or None,
                    "tool_calls": [],
                }
                if reasoning:
                    assistant_msg["reasoning_content"] = reasoning
                for tc in tool_calls[:3]:  # max 3 tool calls per round
                    tc_id = tc.get("id", "") if isinstance(tc, dict) else getattr(tc, "id", "")
                    tc_func = (
                        tc.get("function", {})
                        if isinstance(tc, dict)
                        else getattr(tc, "function", None)
                    )
                    name = (
                        tc_func.get("name", "")
                        if isinstance(tc_func, dict)
                        else getattr(tc_func, "name", "") if tc_func else ""
                    )
                    raw_args = (
                        tc_func.get("arguments", "{}")
                        if isinstance(tc_func, dict)
                        else getattr(tc_func, "arguments", "{}") if tc_func else "{}"
                    )
                    if isinstance(raw_args, str):
                        try:
                            args = _json.loads(raw_args)
                        except Exception:
                            args = {}
                    else:
                        args = raw_args if isinstance(raw_args, dict) else {}
                    if project_root:
                        args.setdefault("project_root", project_root)
                    args.setdefault("workspace", workspace)
                    assistant_msg["tool_calls"].append(
                        {
                            "id": tc_id,
                            "type": "function",
                            "function": {"name": name, "arguments": _json.dumps(args)},
                        }
                    )
                messages.append(assistant_msg)

                # Execute tools and append results with matching tool_call_id
                for tc_data in assistant_msg["tool_calls"]:
                    call_id = tc_data["id"]
                    tool_name = tc_data["function"]["name"]
                    tool_args = _json.loads(tc_data["function"]["arguments"])
                    result = await safe_tool_call(tool_name, tool_args, role="agent")
                    output = (
                        result.get("output", str(result))
                        if isinstance(result, dict)
                        else str(result)
                    )
                    messages.append(
                        {
                            "role": "tool",
                            "tool_call_id": call_id,
                            "content": f"[{tool_name}]: {output}",
                        }
                    )

                # Second call — now valid: tool messages follow assistant with tool_calls
                response = await models.call(
                    messages=messages,
                    role=model_role,
                    temperature=temperature,
                )
                text = clean_llm_response(response)

        except Exception as e:
            logger.error(f"Error in collaborative agent {agent_name}: {e}")
            text = f"[Error: {e}]"

        stream_callback = events.on_stream_chunk if events else None
        if stream_callback:
            await stream_callback("\n\n")

        await emit_agent(events, agent_name, round_label, text)
        return text

    @staticmethod
    async def _ask_moderator(
        agent_name: str,
        debate_summary: str,
        events: WorkflowEvents,
        history: list,
        workflow_allowed_tools: list[str] | None = None,
    ) -> str:
        """Ask the moderator to synthesize the final consensus.

        If the moderator profile has tools, they are filtered against
        workflow_allowed_tools and executed via execute_agent_loop.
        Otherwise falls back to AgentsService.execute_agent (text-only).
        """
        await emit_system(events, f"⚖️ **{agent_name.capitalize()}** deliberando...")

        stream_callback = events.on_stream_chunk if events else None

        # Filter moderator tools against workflow allowlist
        profile = agents_registry.get_profile(agent_name)
        agent_tools = profile.get("tools", []) if profile else []
        effective_tools: list[str] = []

        if agent_tools:
            from tools.specs import expand_allowed_tools

            expanded_profile = expand_allowed_tools(agent_tools) or []
            if workflow_allowed_tools is not None:
                effective_tools = [
                    t for t in expanded_profile if tool_matches_allowlist(t, workflow_allowed_tools)
                ]
            else:
                effective_tools = expanded_profile

        if effective_tools:
            from orchestration.loop import execute_agent_loop

            loop_result = await execute_agent_loop(
                task=debate_summary,
                agent_type=agent_name,
                history=history,
                allowed_tools=effective_tools,
                on_stream_chunk=stream_callback,
            )
            text = (
                loop_result.get("result", str(loop_result))
                if isinstance(loop_result, dict)
                else str(loop_result)
            )
            return clean_llm_response(text)

        final = await AgentsService.execute_agent(
            agent_type=agent_name,
            query=debate_summary,
            history=history,
            on_stream_chunk=stream_callback,
        )
        return clean_llm_response(final)

    @staticmethod
    def _fallback_consensus(debate_summary: str) -> str:
        """Fallback consensus when moderator fails — simple concatenation."""
        return (
            "El panel debatió pero no se pudo alcanzar un consenso formal. "
            "A continuación el resumen de las opiniones:\n\n" + debate_summary
        )

    @staticmethod
    def _build_query(
        query: str,
        agent_name: str,
        others_opinions: dict[str, str] | None,
        project_context: str = "",
    ) -> str:
        """Construye el prompt enriquecido para un agente del panel."""
        parts = []

        if project_context:
            parts.append(f"Contexto del proyecto:\n{project_context}\n")

        parts.append(f"Pregunta: {query}")

        if others_opinions and len(others_opinions) > 1:
            others_text = "\n".join(
                f"  **{n.capitalize()}**: {o[:300]}"
                for n, o in others_opinions.items()
                if n != agent_name
            )
            parts.append(f"\nEsto opinaron los demás en la ronda anterior:\n{others_text}")
            parts.append(
                "Responde desde tu personalidad. Puedes mantener tu postura, "
                "refinarla, o cambiar de opinión si te convencieron. "
                "Sé fiel a tu personaje. Responde en primera persona."
            )
        else:
            parts.append(
                "Responde desde tu personalidad. Sé fiel a tu personaje. Responde en primera persona."
            )

        return "\n\n".join(parts)

    @staticmethod
    def _build_project_context(project_root: str, workspace: str) -> str:
        """Construye un resumen del proyecto para inyectar en el debate."""
        from core.path_resolver import paths

        base = paths.memory_dir(workspace) / project_root
        if not base.exists():
            return ""

        lines = []
        try:
            # Estructura de directorios (primer nivel)
            items = sorted(base.iterdir())[:30]
            dirs = [d.name + "/" for d in items if d.is_dir() and not d.name.startswith(".")]
            files = [f.name for f in items if f.is_file() and not f.name.startswith(".")]

            if dirs:
                lines.append(f"Directorios: {', '.join(dirs)}")
            if files:
                lines.append(f"Archivos: {', '.join(files[:15])}")

            # requirements.txt or pyproject.toml (first 20 lines)
            for fname in ("requirements.txt", "pyproject.toml", "package.json"):
                fpath = base / fname
                if fpath.exists():
                    content = fpath.read_text(encoding="utf-8")[:800]
                    lines.append(f"\n{fname}:\n{content}")
                    break
        except Exception:
            pass

        return "\n".join(lines) if lines else ""

    @staticmethod
    def _build_debate_summary(
        query: str, final_opinions: dict[str, str], leader: str | None = None
    ) -> str:
        """Construye el resumen del debate para el moderador."""
        opinions_text = "\n\n".join(f"**{n.capitalize()}**: {o}" for n, o in final_opinions.items())
        leader_note = ""
        if leader:
            leader_note = (
                f"\n\n**Nota:** {leader.capitalize()} es el líder designado de este debate. "
                f"Su opinión tiene peso especial en la decisión final."
            )
        return (
            f"Eres el moderador de un debate. Resume el consenso al que llegó el panel "
            f"sobre la siguiente pregunta:\n\n"
            f"**Pregunta:** {query}\n\n"
            f"**Opiniones finales del panel:**\n\n{opinions_text}"
            f"{leader_note}\n\n"
            f"Sintetiza la conclusión final del grupo. Combina las mejores ideas de cada uno. "
            f"Si hay desacuerdo, señálalo con diplomacia pero inclina la balanza con tu criterio neutral. "
            f"Estructura tu respuesta como un veredicto final."
        )

Functions

orchestration.workflows.coordinated

Multi-Agent Coordinator — manager-worker pattern with DAG execution.

Replaces the sequential linear subtask execution in full orchestration with a proper DAG, parallel execution of independent subtasks, shared blackboard for inter-agent communication, and confidence-weighted aggregation.

Architecture
  1. decompose_task_dag() → LLM produces structured DAG (JSON)
  2. assign_agents() → LLM-based quality routing (replaces keyword supervisor)
  3. execute_dag() → parallel asyncio.gather per level, respecting dependencies
  4. aggregate_with_confidence() → LLM evaluates + synthesizes

Classes

MultiAgentCoordinator

Orchestrates multiple agents with DAG-based execution and shared blackboard.

Source code in orchestration/workflows/coordinated.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class MultiAgentCoordinator:
    """Orchestrates multiple agents with DAG-based execution and shared blackboard."""

    def __init__(self):
        self.blackboard = SharedBlackboard()

    # ── PHASE 1: DECOMPOSE INTO DAG ──
    async def decompose_task_dag(self, query: str) -> dict[str, Any]:
        """Ask the LLM to decompose a task into a structured DAG.

        Returns: {"subtasks": [...], "raw_response": "..."}
        Each subtask: {"id": str, "description": str, "depends_on": [str], "agent_hint": str}
        """
        prompt = DAG_DECOMPOSE_PROMPT.format(query=query)

        try:
            response = await models.call(
                messages=[{"role": "user", "content": prompt}],
                role="reasoning",
                temperature=0.1,
            )
            raw = clean_llm_response(response)
            content = raw.choices[0].message.content if hasattr(raw, "choices") else str(raw)

            # Extract JSON from response
            data = self._parse_dag_json(content)
            if data and isinstance(data.get("subtasks"), list) and len(data["subtasks"]) >= 1:
                # Validate and clean subtasks
                valid = []
                for st in data["subtasks"]:
                    if not isinstance(st, dict):
                        continue
                    sid = st.get("id", "")
                    desc = st.get("description", "")
                    if sid and desc and len(desc) > 5:
                        st.setdefault("depends_on", [])
                        st.setdefault("agent_hint", "developer")
                        valid.append(st)
                if valid:
                    return {"subtasks": valid[:6], "raw_response": content}

        except Exception as e:
            logger.warning(f"DAG decomposition failed: {e}")

        # Fallback: single linear subtask
        return {
            "subtasks": [
                {
                    "id": "main_task",
                    "description": query,
                    "depends_on": [],
                    "agent_hint": "developer",
                }
            ],
            "raw_response": "",
        }

    @staticmethod
    def _parse_dag_json(text: str) -> dict | None:
        """Extract JSON object from LLM response text."""
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass
        # Try to find JSON between braces containing "subtasks"
        idx = text.find('"subtasks"')
        if idx < 0:
            return None
        start = text.rfind("{", 0, idx)
        if start < 0:
            return None
        depth = 0
        end = start
        for i in range(start, len(text)):
            if text[i] == "{":
                depth += 1
            elif text[i] == "}":
                depth -= 1
                if depth == 0:
                    end = i + 1
                    break
        if end > start:
            try:
                return json.loads(text[start:end])
            except json.JSONDecodeError:
                pass
        return None

    # ── PHASE 2: ASSIGN AGENTS ──
    async def assign_agents(
        self,
        subtasks: list[dict],
        allowed_agents: list[str] | None = None,
        force_agent: str | None = None,
    ) -> dict[str, str]:
        """Assign the best agent for each subtask.

        Uses AgentRouter first, then LLM quality review for confidence.
        Falls back to keyword matching if LLM is unavailable.

        Returns: {subtask_id: agent_name}
        """
        assignments: dict[str, str] = {}

        for st in subtasks:
            sid = st["id"]
            if force_agent:
                assignments[sid] = force_agent
                continue

            hint = st.get("agent_hint", "")
            desc = st.get("description", "")

            # Try AgentRouter first
            try:
                picked = await AgentRouter.select_best_agent(desc, "coordinated", allowed_agents)
                if picked and (allowed_agents is None or picked in allowed_agents):
                    assignments[sid] = picked
                    continue
            except Exception:
                pass

            # Fallback: agent_hint or first allowed
            if hint and (allowed_agents is None or hint in allowed_agents):
                assignments[sid] = hint
            elif allowed_agents:
                assignments[sid] = allowed_agents[0]
            else:
                assignments[sid] = "developer"

        return assignments

    # ── PHASE 3: EXECUTE DAG WITH PARALLELISM ──
    async def execute_dag(
        self,
        subtasks: list[dict],
        assignments: dict[str, str],
        project_root: str | None = None,
        workspace: str = "main",
        allowed_tools: list[str] | None = None,
        events=None,
        session=None,
    ) -> dict[str, dict]:
        """Execute subtasks respecting DAG dependencies with parallel branches.

        Topological level execution:
        - Level 0: subtasks with no dependencies → all run in parallel
        - Level 1: subtasks whose dependencies are all in level 0 → parallel
        - etc.

        Returns: {subtask_id: {status, result, agent, files_written, error}}
        """
        results: dict[str, dict] = {}
        completed: set[str] = set()
        remaining = {st["id"]: st for st in subtasks}

        while remaining:
            # Find subtasks whose dependencies are all completed
            ready = []
            for sid, st in list(remaining.items()):
                deps = st.get("depends_on", [])
                if all(d in completed for d in deps):
                    ready.append((sid, st))

            if not ready:
                # Circular dependency or all stuck — execute remaining sequentially
                logger.warning("DAG stuck — executing remaining subtasks sequentially")
                for sid, st in remaining.items():
                    result = await self._execute_one(
                        sid,
                        st,
                        assignments.get(sid, "developer"),
                        project_root,
                        workspace,
                        allowed_tools,
                        events,
                        session,
                    )
                    results[sid] = result
                    completed.add(sid)
                break

            logger.info(
                f"DAG level: {len(ready)} subtask(s) in parallel: " f"{[s[0] for s in ready]}"
            )

            # Execute ready subtasks in parallel
            tasks = [
                self._execute_one(
                    sid,
                    st,
                    assignments.get(sid, "developer"),
                    project_root,
                    workspace,
                    allowed_tools,
                    events,
                    session,
                )
                for sid, st in ready
            ]

            parallel_results = await asyncio.gather(*tasks, return_exceptions=True)

            for (sid, st), raw_result in zip(ready, parallel_results, strict=True):
                if isinstance(raw_result, BaseException):
                    logger.error(f"Subtask '{sid}' failed: {raw_result}")
                    # Retry with different agent
                    fallback = next(
                        (
                            a
                            for a in (assignments.get(sid, "developer"), "developer", "analista")
                            if a != assignments.get(sid)
                        ),
                        "developer",
                    )
                    logger.info(f"Retrying '{sid}' with agent '{fallback}'")
                    try:
                        result = await self._execute_one(
                            sid,
                            st,
                            fallback,
                            project_root,
                            workspace,
                            allowed_tools,
                            events,
                            session,
                        )
                    except Exception as e2:
                        result = {
                            "status": "failed",
                            "result": f"Failed after retry: {e2}",
                            "agent": fallback,
                            "files_written": [],
                            "error": str(e2),
                        }
                else:
                    result = raw_result

                results[sid] = result
                completed.add(sid)
                remaining.pop(sid, None)

        return results

    async def _execute_one(
        self,
        sid: str,
        st: dict,
        agent: str,
        project_root: str | None,
        workspace: str,
        allowed_tools: list[str] | None,
        events,
        session,
    ) -> dict:
        """Execute a single subtask with one agent, injecting blackboard context."""
        desc = st.get("description", st.get("id", ""))

        # Build blackboard context for this agent
        blackboard_ctx = await self.blackboard.get_agent_context()
        if blackboard_ctx:
            blackboard_ctx = (
                "⚠️  SHARED CONTEXT — Resultados de otros agentes en este workflow:\n"
                + blackboard_ctx
                + "\n\nUsa esta información para evitar trabajo duplicado. "
                "Si un archivo ya fue creado por otro agente, NO lo recrees."
            )

        # Filter tools against agent profile
        agent_filtered_tools = allowed_tools
        from agents.registry import agents_registry as _reg

        agent_profile = _reg.get_profile(agent)
        if agent_profile and agent_profile.get("tools"):
            profile_tools = agent_profile.get("tools", [])
            from tools.specs import expand_allowed_tools

            expanded_profile = expand_allowed_tools(profile_tools) or []
            if allowed_tools is not None:
                # Intersect using prefix/component matching (supports MCP tool names)
                agent_filtered_tools = [
                    t for t in expanded_profile if tool_matches_allowlist(t, allowed_tools)
                ]
            else:
                agent_filtered_tools = expanded_profile

        try:
            result = await asyncio.wait_for(
                execute_agent_loop(
                    task=desc,
                    agent_type=agent,
                    allowed_tools=agent_filtered_tools,
                    project_root=project_root,
                    workspace=workspace,
                    extra_context=blackboard_ctx,
                    session=session,
                    config=AgentLoopConfig(max_agent_iterations=settings.max_agent_iterations),
                ),
                timeout=180,
            )
        except TimeoutError:
            return {
                "status": "failed",
                "result": "Subtask timed out after 180s",
                "agent": agent,
                "files_written": [],
                "error": "Timeout",
            }
        except Exception as e:
            return {
                "status": "failed",
                "result": str(e),
                "agent": agent,
                "files_written": [],
                "error": str(e),
            }

        # Emit agent response to UI for real-time visibility
        result_text = result.get("result", "") if isinstance(result, dict) else str(result)
        if session and hasattr(session, "events") and session.events:
            await emit_agent(session.events, agent, desc[:50], str(result_text)[:500])

        # Write result to blackboard for other agents
        result_text = result.get("result", "") if isinstance(result, dict) else str(result)
        await self.blackboard.write(
            f"subtask_{sid}_result",
            {
                "agent": agent,
                "task": desc[:200],
                "result": str(result_text)[:500],
                "status": (
                    result.get("status", "completed") if isinstance(result, dict) else "completed"
                ),
                "files_written": (
                    result.get("files_written", []) if isinstance(result, dict) else []
                ),
            },
        )

        return {
            "status": result.get("status", "done") if isinstance(result, dict) else "done",
            "result": result_text,
            "agent": agent,
            "files_written": result.get("files_written", []) if isinstance(result, dict) else [],
            "error": None,
        }

    # ── PHASE 4: AGGREGATE WITH CONFIDENCE ──
    async def aggregate_with_confidence(
        self,
        query: str,
        results: dict[str, dict],
    ) -> str:
        """Synthesize subtask results into a final response with confidence evaluation.

        Asks the LLM to score each result's quality, then synthesize the best
        information into one coherent answer.
        """
        if not results:
            return "No results produced."

        # Build results summary for the LLM
        parts = []
        for sid, r in results.items():
            agent = r.get("agent", "?")
            status = r.get("status", "?")
            result_text = str(r.get("result", ""))[:600]
            parts.append(f"Agent: {agent} | Task: {sid} | Status: {status}\n{result_text}")

        results_text = "\n\n---\n\n".join(parts)

        prompt = (
            "You are a result synthesizer. Review the following agent outputs "
            "and produce ONE final response to the user.\n\n"
            f"Original request: {query}\n\n"
            f"Agent results:\n{results_text}\n\n"
            "Instructions:\n"
            "- If results conflict, pick the most reliable one and explain why.\n"
            "- Combine complementary information into a coherent answer.\n"
            "- If all results are poor, state what's missing honestly.\n"
            "- Keep the response concise but complete.\n"
        )

        try:
            response = await models.call(
                messages=[{"role": "user", "content": prompt}],
                role="fast",
                temperature=0.3,
            )
            return clean_llm_response(response)
        except Exception as e:
            logger.warning(f"Aggregation failed: {e}")
            # Fallback: simple concatenation
            return "\n\n".join(
                f"[{r.get('agent', '?')}] {r.get('result', '')}" for r in results.values()
            )
Functions
decompose_task_dag async
decompose_task_dag(query: str) -> dict[str, Any]

Ask the LLM to decompose a task into a structured DAG.

Returns: {"subtasks": [...], "raw_response": "..."} Each subtask: {"id": str, "description": str, "depends_on": [str], "agent_hint": str}

Source code in orchestration/workflows/coordinated.py
async def decompose_task_dag(self, query: str) -> dict[str, Any]:
    """Ask the LLM to decompose a task into a structured DAG.

    Returns: {"subtasks": [...], "raw_response": "..."}
    Each subtask: {"id": str, "description": str, "depends_on": [str], "agent_hint": str}
    """
    prompt = DAG_DECOMPOSE_PROMPT.format(query=query)

    try:
        response = await models.call(
            messages=[{"role": "user", "content": prompt}],
            role="reasoning",
            temperature=0.1,
        )
        raw = clean_llm_response(response)
        content = raw.choices[0].message.content if hasattr(raw, "choices") else str(raw)

        # Extract JSON from response
        data = self._parse_dag_json(content)
        if data and isinstance(data.get("subtasks"), list) and len(data["subtasks"]) >= 1:
            # Validate and clean subtasks
            valid = []
            for st in data["subtasks"]:
                if not isinstance(st, dict):
                    continue
                sid = st.get("id", "")
                desc = st.get("description", "")
                if sid and desc and len(desc) > 5:
                    st.setdefault("depends_on", [])
                    st.setdefault("agent_hint", "developer")
                    valid.append(st)
            if valid:
                return {"subtasks": valid[:6], "raw_response": content}

    except Exception as e:
        logger.warning(f"DAG decomposition failed: {e}")

    # Fallback: single linear subtask
    return {
        "subtasks": [
            {
                "id": "main_task",
                "description": query,
                "depends_on": [],
                "agent_hint": "developer",
            }
        ],
        "raw_response": "",
    }
assign_agents async
assign_agents(
    subtasks: list[dict],
    allowed_agents: list[str] | None = None,
    force_agent: str | None = None,
) -> dict[str, str]

Assign the best agent for each subtask.

Uses AgentRouter first, then LLM quality review for confidence. Falls back to keyword matching if LLM is unavailable.

Returns: {subtask_id: agent_name}

Source code in orchestration/workflows/coordinated.py
async def assign_agents(
    self,
    subtasks: list[dict],
    allowed_agents: list[str] | None = None,
    force_agent: str | None = None,
) -> dict[str, str]:
    """Assign the best agent for each subtask.

    Uses AgentRouter first, then LLM quality review for confidence.
    Falls back to keyword matching if LLM is unavailable.

    Returns: {subtask_id: agent_name}
    """
    assignments: dict[str, str] = {}

    for st in subtasks:
        sid = st["id"]
        if force_agent:
            assignments[sid] = force_agent
            continue

        hint = st.get("agent_hint", "")
        desc = st.get("description", "")

        # Try AgentRouter first
        try:
            picked = await AgentRouter.select_best_agent(desc, "coordinated", allowed_agents)
            if picked and (allowed_agents is None or picked in allowed_agents):
                assignments[sid] = picked
                continue
        except Exception:
            pass

        # Fallback: agent_hint or first allowed
        if hint and (allowed_agents is None or hint in allowed_agents):
            assignments[sid] = hint
        elif allowed_agents:
            assignments[sid] = allowed_agents[0]
        else:
            assignments[sid] = "developer"

    return assignments
execute_dag async
execute_dag(
    subtasks: list[dict],
    assignments: dict[str, str],
    project_root: str | None = None,
    workspace: str = "main",
    allowed_tools: list[str] | None = None,
    events=None,
    session=None,
) -> dict[str, dict]

Execute subtasks respecting DAG dependencies with parallel branches.

Topological level execution: - Level 0: subtasks with no dependencies → all run in parallel - Level 1: subtasks whose dependencies are all in level 0 → parallel - etc.

Returns: {subtask_id: {status, result, agent, files_written, error}}

Source code in orchestration/workflows/coordinated.py
async def execute_dag(
    self,
    subtasks: list[dict],
    assignments: dict[str, str],
    project_root: str | None = None,
    workspace: str = "main",
    allowed_tools: list[str] | None = None,
    events=None,
    session=None,
) -> dict[str, dict]:
    """Execute subtasks respecting DAG dependencies with parallel branches.

    Topological level execution:
    - Level 0: subtasks with no dependencies → all run in parallel
    - Level 1: subtasks whose dependencies are all in level 0 → parallel
    - etc.

    Returns: {subtask_id: {status, result, agent, files_written, error}}
    """
    results: dict[str, dict] = {}
    completed: set[str] = set()
    remaining = {st["id"]: st for st in subtasks}

    while remaining:
        # Find subtasks whose dependencies are all completed
        ready = []
        for sid, st in list(remaining.items()):
            deps = st.get("depends_on", [])
            if all(d in completed for d in deps):
                ready.append((sid, st))

        if not ready:
            # Circular dependency or all stuck — execute remaining sequentially
            logger.warning("DAG stuck — executing remaining subtasks sequentially")
            for sid, st in remaining.items():
                result = await self._execute_one(
                    sid,
                    st,
                    assignments.get(sid, "developer"),
                    project_root,
                    workspace,
                    allowed_tools,
                    events,
                    session,
                )
                results[sid] = result
                completed.add(sid)
            break

        logger.info(
            f"DAG level: {len(ready)} subtask(s) in parallel: " f"{[s[0] for s in ready]}"
        )

        # Execute ready subtasks in parallel
        tasks = [
            self._execute_one(
                sid,
                st,
                assignments.get(sid, "developer"),
                project_root,
                workspace,
                allowed_tools,
                events,
                session,
            )
            for sid, st in ready
        ]

        parallel_results = await asyncio.gather(*tasks, return_exceptions=True)

        for (sid, st), raw_result in zip(ready, parallel_results, strict=True):
            if isinstance(raw_result, BaseException):
                logger.error(f"Subtask '{sid}' failed: {raw_result}")
                # Retry with different agent
                fallback = next(
                    (
                        a
                        for a in (assignments.get(sid, "developer"), "developer", "analista")
                        if a != assignments.get(sid)
                    ),
                    "developer",
                )
                logger.info(f"Retrying '{sid}' with agent '{fallback}'")
                try:
                    result = await self._execute_one(
                        sid,
                        st,
                        fallback,
                        project_root,
                        workspace,
                        allowed_tools,
                        events,
                        session,
                    )
                except Exception as e2:
                    result = {
                        "status": "failed",
                        "result": f"Failed after retry: {e2}",
                        "agent": fallback,
                        "files_written": [],
                        "error": str(e2),
                    }
            else:
                result = raw_result

            results[sid] = result
            completed.add(sid)
            remaining.pop(sid, None)

    return results
aggregate_with_confidence async
aggregate_with_confidence(
    query: str, results: dict[str, dict]
) -> str

Synthesize subtask results into a final response with confidence evaluation.

Asks the LLM to score each result's quality, then synthesize the best information into one coherent answer.

Source code in orchestration/workflows/coordinated.py
async def aggregate_with_confidence(
    self,
    query: str,
    results: dict[str, dict],
) -> str:
    """Synthesize subtask results into a final response with confidence evaluation.

    Asks the LLM to score each result's quality, then synthesize the best
    information into one coherent answer.
    """
    if not results:
        return "No results produced."

    # Build results summary for the LLM
    parts = []
    for sid, r in results.items():
        agent = r.get("agent", "?")
        status = r.get("status", "?")
        result_text = str(r.get("result", ""))[:600]
        parts.append(f"Agent: {agent} | Task: {sid} | Status: {status}\n{result_text}")

    results_text = "\n\n---\n\n".join(parts)

    prompt = (
        "You are a result synthesizer. Review the following agent outputs "
        "and produce ONE final response to the user.\n\n"
        f"Original request: {query}\n\n"
        f"Agent results:\n{results_text}\n\n"
        "Instructions:\n"
        "- If results conflict, pick the most reliable one and explain why.\n"
        "- Combine complementary information into a coherent answer.\n"
        "- If all results are poor, state what's missing honestly.\n"
        "- Keep the response concise but complete.\n"
    )

    try:
        response = await models.call(
            messages=[{"role": "user", "content": prompt}],
            role="fast",
            temperature=0.3,
        )
        return clean_llm_response(response)
    except Exception as e:
        logger.warning(f"Aggregation failed: {e}")
        # Fallback: simple concatenation
        return "\n\n".join(
            f"[{r.get('agent', '?')}] {r.get('result', '')}" for r in results.values()
        )

Functions

orchestration.workflows.tdd

TDD Loop — ciclo de Test-Driven Development automatizado.

Complete TDD loop
  • Ejecuta tests existentes
  • Si fallan, invoca al agente para corregir el código
  • Repite hasta que todos los tests pasen o se alcance el límite

Functions

execute_tdd_loop async

execute_tdd_loop(
    task: str,
    workspace: str = "main",
    project_root: str | None = None,
    allowed_tools: list | None = None,
    agent_type: str | None = None,
    conversation_history: list | None = None,
    max_iterations: int = MAX_TDD_ITERATIONS,
) -> dict

Ciclo TDD automatizado completo.

Flujo real: 1. Ejecutar tests existentes → ver fallos 2. Si hay fallos, invocar al agente con el contexto de fallos 3. El agente corrige el código vía file_manager / diff_editor 4. Volver a ejecutar tests 5. Repetir hasta que todos pasen o se alcance max_iterations

Parameters:

Name Type Description Default
task str

Descripción de la tarea a implementar.

required
workspace str

Workspace activo.

'main'
project_root str | None

Directorio del proyecto.

None
allowed_tools list | None

Herramientas permitidas para el agente.

None
agent_type str | None

Tipo de agente a usar para las correcciones.

None
conversation_history list | None

Historial de conversación previo.

None
max_iterations int

Máximo de iteraciones del ciclo.

MAX_TDD_ITERATIONS

Returns:

Type Description
dict

{"status": "completed"|"failed", "result": str, "iterations": int}

Source code in orchestration/workflows/tdd.py
async def execute_tdd_loop(
    task: str,
    workspace: str = "main",
    project_root: str | None = None,
    allowed_tools: list | None = None,
    agent_type: str | None = None,
    conversation_history: list | None = None,
    max_iterations: int = MAX_TDD_ITERATIONS,
) -> dict:
    """Ciclo TDD automatizado completo.

    Flujo real:
    1. Ejecutar tests existentes → ver fallos
    2. Si hay fallos, invocar al agente con el contexto de fallos
    3. El agente corrige el código vía file_manager / diff_editor
    4. Volver a ejecutar tests
    5. Repetir hasta que todos pasen o se alcance max_iterations

    Args:
        task: Descripción de la tarea a implementar.
        workspace: Workspace activo.
        project_root: Directorio del proyecto.
        allowed_tools: Herramientas permitidas para el agente.
        agent_type: Tipo de agente a usar para las correcciones.
        conversation_history: Historial de conversación previo.
        max_iterations: Máximo de iteraciones del ciclo.

    Returns:
        {"status": "completed"|"failed", "result": str, "iterations": int}
    """
    from orchestration.loop import execute_agent_loop

    tdd_tools = allowed_tools or ["file_manager", "diff_editor", "test_runner", "git_manager"]

    iterations = 0
    test_output = ""
    files_modified: list[str] = []

    while iterations < max_iterations:
        iterations += 1
        logger.info("🔄 TDD Loop iteración %d/%d", iterations, max_iterations)

        # 1. Ejecutar tests
        test_result = await safe_tool_call(
            tool_name="test_runner",
            parameters={
                "file_path": ".",
                "workspace": workspace,
                "project_root": project_root,
            },
            role="agent",
        )

        test_output_raw = test_result if isinstance(test_result, dict) else {}
        inner_output = (
            test_output_raw.get("output", {})
            if isinstance(test_output_raw.get("output"), dict)
            else {}
        )
        test_output = str(inner_output.get("output", str(test_result)))
        test_success = inner_output.get("success", False)
        failed_count = inner_output.get("failed_count", 0)
        error_count = inner_output.get("error_count", 0)

        if test_success and failed_count == 0 and error_count == 0:
            logger.info("✅ TDD Loop: todos los tests pasan en iteración %d", iterations)
            return {
                "status": "completed",
                "result": f"✅ Todos los tests pasan en iteración {iterations}.\n\n{test_output[:2000]}",
                "iterations": iterations,
                "files_modified": files_modified,
            }

        # 2. Invoke the agent: write tests (green-field) or fix failures
        no_tests = _project_has_no_tests(workspace, project_root, files_modified)
        if no_tests:
            impl_context = (
                f"Tarea: {task}\n\n"
                "Aún NO existen tests en el proyecto. Implementa con TDD:\n"
                "1. PRIMERO escribe los tests con pytest usando file_manager (action=write); "
                "nómbralos 'test_*.py' (p.ej. 'test_es_primo.py').\n"
                "2. LUEGO escribe la implementación para que los tests pasen.\n"
                "Usa SIEMPRE rutas RELATIVAS al proyecto (ej: 'es_primo.py'); "
                "NUNCA rutas absolutas ni '/'."
            )
        else:
            impl_context = (
                f"Tarea original: {task}\n\n"
                f"Resultados de tests (iteración {iterations}):\n{test_output[:3000]}\n\n"
                f"Fallos: {failed_count}, Errores: {error_count}.\n\n"
                "Analiza los fallos y CORRIGE el código para que los tests pasen. "
                "Usa file_manager (action=read) para leer archivos existentes, "
                "y file_manager (action=write) o diff_editor (action=apply) para modificarlos. "
                "Después de cada cambio, explica qué corregiste y por qué."
            )

        logger.info(
            "TDD Loop: %s (fallos=%d, errores=%d)",
            "sin tests aún → escribiendo tests + implementación" if no_tests else "corrigiendo",
            failed_count,
            error_count,
        )

        try:
            agent_result = await asyncio.wait_for(
                execute_agent_loop(
                    task=impl_context,
                    agent_type=agent_type,
                    history=conversation_history,
                    allowed_tools=tdd_tools,
                    project_root=project_root,
                    workspace=workspace,
                    extra_context=f"Modo TDD. Iteración {iterations}/{max_iterations}.",
                ),
                timeout=300,
            )
        except TimeoutError:
            logger.warning("TDD Loop: iteración %d agotó timeout (300s)", iterations)
            return {
                "status": "failed",
                "result": f"TDD iteration {iterations} timed out after 300s.",
                "iterations": iterations,
                "files_modified": files_modified,
            }

        agent_files = agent_result.get("files_written", [])
        for f in agent_files:
            if f not in files_modified:
                files_modified.append(f)

        if agent_result.get("status") == "stalled":
            logger.warning("TDD Loop: agente estancado en iteración %d", iterations)
            return {
                "status": "failed",
                "result": (
                    f"⚠️ Agente estancado en iteración {iterations}.\n\n"
                    f"Último resultado de tests:\n{test_output[:2000]}\n\n"
                    f"{agent_result['result']}"
                ),
                "iterations": iterations,
                "files_modified": files_modified,
            }

    return {
        "status": "failed",
        "result": (
            f"⚠️ Límite de {max_iterations} iteraciones alcanzado.\n\n"
            f"Últimos resultados:\n{test_output[:2000]}"
        ),
        "iterations": iterations,
        "files_modified": files_modified,
    }

orchestration.workflows.blackboard

Shared Blackboard — multi-phase inter-agent communication channel.

Provides an async-safe, phase-scoped key-value store that agents in coordinated and development workflows can read from and write to. Supports cross-phase context sharing, snapshot/restore for pause/resume, and PostgreSQL persistence.

Usage

blackboard = SharedBlackboard() await blackboard.write("schema", data, phase="design") ctx = await blackboard.get_cross_phase_context(exclude_phase="implement") snapshot = blackboard.snapshot() blackboard.restore(snapshot) await blackboard.sync_to_db("session_123")

Classes

SharedBlackboard

Async-safe multi-phase shared workspace for agent coordination.

Source code in orchestration/workflows/blackboard.py
class SharedBlackboard:
    """Async-safe multi-phase shared workspace for agent coordination."""

    def __init__(self):
        self._phases: dict[str, dict[str, Any]] = {}
        self._lock = asyncio.Lock()
        self._write_count = 0

    # ── Core write/read ────────────────────────────────────────────

    async def write(self, key: str, value: Any, phase: str = "default") -> None:
        """Write a value to a phase-scoped namespace."""
        async with self._lock:
            if phase not in self._phases:
                self._phases[phase] = {}
            self._phases[phase][key] = value
            self._write_count += 1
            logger.debug(f"Blackboard write: [{phase}] '{key}' (#{self._write_count})")

    async def read(self, key: str, phase: str | None = None) -> Any:
        """Read a value. If phase is None, searches all phases."""
        async with self._lock:
            if phase:
                return self._phases.get(phase, {}).get(key)
            for pdata in self._phases.values():
                if key in pdata:
                    return pdata[key]
            return None

    async def read_phase(self, phase: str) -> dict[str, Any]:
        """Return all entries from a specific phase."""
        async with self._lock:
            return dict(self._phases.get(phase, {}))

    async def list_phases(self) -> list[str]:
        """Return all phase names in insertion order."""
        async with self._lock:
            return list(self._phases.keys())

    async def list_keys(self, phase: str | None = None) -> list[str]:
        """Return keys. If phase is None, returns keys from all phases."""
        async with self._lock:
            if phase:
                return list(self._phases.get(phase, {}).keys())
            all_keys: list[str] = []
            for pdata in self._phases.values():
                all_keys.extend(pdata.keys())
            return all_keys

    async def delete(self, key: str, phase: str | None = None) -> bool:
        """Remove a key. If phase is None, removes from all phases."""
        async with self._lock:
            if phase:
                data = self._phases.get(phase, {})
                if key in data:
                    del data[key]
                    return True
                return False
            found = False
            for pdata in self._phases.values():
                if key in pdata:
                    del pdata[key]
                    found = True
            return found

    async def clear_phase(self, phase: str) -> None:
        """Clear all entries from a specific phase."""
        async with self._lock:
            self._phases.pop(phase, None)

    async def clear(self) -> None:
        """Clear all phases and entries."""
        async with self._lock:
            self._phases.clear()
            self._write_count = 0

    # ── Context generators ─────────────────────────────────────────

    async def get_cross_phase_context(self, exclude_phase: str) -> str:
        """Build context text from all phases EXCEPT the given one.

        Used to inject "what other agents already did" into the current phase.
        """
        async with self._lock:
            lines: list[str] = []
            for pname, pdata in self._phases.items():
                if pname == exclude_phase or not pdata:
                    continue
                lines.append(f"\n[Shared Context — Phase: {pname}]")
                for key, value in list(pdata.items())[:8]:
                    value_str = str(value)[:300]
                    lines.append(f"  - {key}: {value_str}")
            return "\n".join(lines)

    async def get_phase_context(self, phase: str, max_keys: int = 10) -> str:
        """Build context text from a specific phase."""
        async with self._lock:
            pdata = self._phases.get(phase, {})
            if not pdata:
                return ""
            lines = [f"\n[Phase Context — {phase}]"]
            for key, value in list(pdata.items())[:max_keys]:
                value_str = str(value)[:300]
                lines.append(f"  - {key}: {value_str}")
            return "\n".join(lines)

    def get_context_snapshot(self, max_keys: int = 10) -> str:
        """Best-effort non-locked snapshot (backward compat)."""
        return _build_snapshot_text(self._phases, max_keys)

    async def get_agent_context(self, relevant_keys: list[str] | None = None) -> str:
        """Async-locked context snapshot (backward compat)."""
        async with self._lock:
            all_entries: list[tuple[str, Any]] = []
            for pdata in self._phases.values():
                for key, value in pdata.items():
                    if relevant_keys is None or key in relevant_keys:
                        all_entries.append((key, value))

            if not all_entries:
                return ""

            lines = ["[Shared Context from other agents]"]
            for key, value in all_entries[:10]:
                lines.append(f"  - {key}: {str(value)[:200]}")
            return "\n".join(lines)

    # ── Snapshot / Restore ──────────────────────────────────────────

    def snapshot(self) -> dict:
        """Serialize the entire blackboard to a JSON-safe dict."""
        snapshot_data: dict[str, dict[str, Any]] = {}
        for phase, pdata in self._phases.items():
            snapshot_data[phase] = dict(pdata)
        return {"phases": snapshot_data, "write_count": self._write_count}

    def restore(self, data: dict) -> None:
        """Restore blackboard state from a snapshot dict."""
        if not data:
            return
        self._phases = data.get("phases", {})
        self._write_count = data.get("write_count", 0)
        logger.info(
            f"Blackboard restored: {self._write_count} writes across " f"{len(self._phases)} phases"
        )

    # ── Database persistence ───────────────────────────────────────

    async def sync_to_db(self, session_id: str) -> None:
        """Persist all blackboard entries to PostgreSQL for the given session."""
        try:
            from core.database import get_async_session
            from core.models import BlackboardEntry

            async with get_async_session() as db_session:
                from sqlalchemy import delete as sa_delete

                samples = BlackboardEntry  # avoid re-import
                await db_session.execute(
                    sa_delete(samples).where(samples.session_id == session_id)  # type: ignore[arg-type]
                )
                for phase, pdata in self._phases.items():
                    for key, value in pdata.items():
                        entry = BlackboardEntry(
                            session_id=session_id,
                            phase=phase,
                            key=key,
                            value=json.dumps(value, ensure_ascii=False, default=str),
                            created_at=datetime.now(UTC).replace(tzinfo=None),
                        )
                        db_session.add(entry)
                await db_session.commit()
            logger.debug(f"Blackboard synced to DB: {session_id} ({self._write_count} writes)")
        except Exception as e:
            logger.warning(f"Blackboard sync_to_db failed (non-fatal): {e}")

    async def sync_from_db(self, session_id: str) -> bool:
        """Load blackboard entries from PostgreSQL. Returns True if entries found."""
        try:
            from sqlalchemy import select as sa_select

            from core.database import get_async_session
            from core.models import BlackboardEntry

            async with get_async_session() as db_session:
                result = await db_session.execute(
                    sa_select(BlackboardEntry).where(
                        BlackboardEntry.session_id == session_id  # type: ignore[arg-type]
                    )
                )
                entries = result.scalars().all()
                if not entries:
                    return False

                for entry in entries:
                    phase_data = self._phases.setdefault(entry.phase, {})
                    try:
                        phase_data[entry.key] = json.loads(entry.value)
                    except (json.JSONDecodeError, TypeError):
                        phase_data[entry.key] = entry.value
                    self._write_count += 1

            logger.info(
                f"Blackboard loaded from DB: {session_id} "
                f"({len(entries)} entries across {len(self._phases)} phases)"
            )
            return True
        except Exception as e:
            logger.warning(f"Blackboard sync_from_db failed (non-fatal): {e}")
            return False

    # ── Properties ─────────────────────────────────────────────────

    @property
    def entry_count(self) -> int:
        return self._write_count

    @property
    def phase_count(self) -> int:
        return len(self._phases)
Functions
write async
write(key: str, value: Any, phase: str = 'default') -> None

Write a value to a phase-scoped namespace.

Source code in orchestration/workflows/blackboard.py
async def write(self, key: str, value: Any, phase: str = "default") -> None:
    """Write a value to a phase-scoped namespace."""
    async with self._lock:
        if phase not in self._phases:
            self._phases[phase] = {}
        self._phases[phase][key] = value
        self._write_count += 1
        logger.debug(f"Blackboard write: [{phase}] '{key}' (#{self._write_count})")
read async
read(key: str, phase: str | None = None) -> Any

Read a value. If phase is None, searches all phases.

Source code in orchestration/workflows/blackboard.py
async def read(self, key: str, phase: str | None = None) -> Any:
    """Read a value. If phase is None, searches all phases."""
    async with self._lock:
        if phase:
            return self._phases.get(phase, {}).get(key)
        for pdata in self._phases.values():
            if key in pdata:
                return pdata[key]
        return None
read_phase async
read_phase(phase: str) -> dict[str, Any]

Return all entries from a specific phase.

Source code in orchestration/workflows/blackboard.py
async def read_phase(self, phase: str) -> dict[str, Any]:
    """Return all entries from a specific phase."""
    async with self._lock:
        return dict(self._phases.get(phase, {}))
list_phases async
list_phases() -> list[str]

Return all phase names in insertion order.

Source code in orchestration/workflows/blackboard.py
async def list_phases(self) -> list[str]:
    """Return all phase names in insertion order."""
    async with self._lock:
        return list(self._phases.keys())
list_keys async
list_keys(phase: str | None = None) -> list[str]

Return keys. If phase is None, returns keys from all phases.

Source code in orchestration/workflows/blackboard.py
async def list_keys(self, phase: str | None = None) -> list[str]:
    """Return keys. If phase is None, returns keys from all phases."""
    async with self._lock:
        if phase:
            return list(self._phases.get(phase, {}).keys())
        all_keys: list[str] = []
        for pdata in self._phases.values():
            all_keys.extend(pdata.keys())
        return all_keys
delete async
delete(key: str, phase: str | None = None) -> bool

Remove a key. If phase is None, removes from all phases.

Source code in orchestration/workflows/blackboard.py
async def delete(self, key: str, phase: str | None = None) -> bool:
    """Remove a key. If phase is None, removes from all phases."""
    async with self._lock:
        if phase:
            data = self._phases.get(phase, {})
            if key in data:
                del data[key]
                return True
            return False
        found = False
        for pdata in self._phases.values():
            if key in pdata:
                del pdata[key]
                found = True
        return found
clear_phase async
clear_phase(phase: str) -> None

Clear all entries from a specific phase.

Source code in orchestration/workflows/blackboard.py
async def clear_phase(self, phase: str) -> None:
    """Clear all entries from a specific phase."""
    async with self._lock:
        self._phases.pop(phase, None)
clear async
clear() -> None

Clear all phases and entries.

Source code in orchestration/workflows/blackboard.py
async def clear(self) -> None:
    """Clear all phases and entries."""
    async with self._lock:
        self._phases.clear()
        self._write_count = 0
get_cross_phase_context async
get_cross_phase_context(exclude_phase: str) -> str

Build context text from all phases EXCEPT the given one.

Used to inject "what other agents already did" into the current phase.

Source code in orchestration/workflows/blackboard.py
async def get_cross_phase_context(self, exclude_phase: str) -> str:
    """Build context text from all phases EXCEPT the given one.

    Used to inject "what other agents already did" into the current phase.
    """
    async with self._lock:
        lines: list[str] = []
        for pname, pdata in self._phases.items():
            if pname == exclude_phase or not pdata:
                continue
            lines.append(f"\n[Shared Context — Phase: {pname}]")
            for key, value in list(pdata.items())[:8]:
                value_str = str(value)[:300]
                lines.append(f"  - {key}: {value_str}")
        return "\n".join(lines)
get_phase_context async
get_phase_context(phase: str, max_keys: int = 10) -> str

Build context text from a specific phase.

Source code in orchestration/workflows/blackboard.py
async def get_phase_context(self, phase: str, max_keys: int = 10) -> str:
    """Build context text from a specific phase."""
    async with self._lock:
        pdata = self._phases.get(phase, {})
        if not pdata:
            return ""
        lines = [f"\n[Phase Context — {phase}]"]
        for key, value in list(pdata.items())[:max_keys]:
            value_str = str(value)[:300]
            lines.append(f"  - {key}: {value_str}")
        return "\n".join(lines)
get_context_snapshot
get_context_snapshot(max_keys: int = 10) -> str

Best-effort non-locked snapshot (backward compat).

Source code in orchestration/workflows/blackboard.py
def get_context_snapshot(self, max_keys: int = 10) -> str:
    """Best-effort non-locked snapshot (backward compat)."""
    return _build_snapshot_text(self._phases, max_keys)
get_agent_context async
get_agent_context(
    relevant_keys: list[str] | None = None,
) -> str

Async-locked context snapshot (backward compat).

Source code in orchestration/workflows/blackboard.py
async def get_agent_context(self, relevant_keys: list[str] | None = None) -> str:
    """Async-locked context snapshot (backward compat)."""
    async with self._lock:
        all_entries: list[tuple[str, Any]] = []
        for pdata in self._phases.values():
            for key, value in pdata.items():
                if relevant_keys is None or key in relevant_keys:
                    all_entries.append((key, value))

        if not all_entries:
            return ""

        lines = ["[Shared Context from other agents]"]
        for key, value in all_entries[:10]:
            lines.append(f"  - {key}: {str(value)[:200]}")
        return "\n".join(lines)
snapshot
snapshot() -> dict

Serialize the entire blackboard to a JSON-safe dict.

Source code in orchestration/workflows/blackboard.py
def snapshot(self) -> dict:
    """Serialize the entire blackboard to a JSON-safe dict."""
    snapshot_data: dict[str, dict[str, Any]] = {}
    for phase, pdata in self._phases.items():
        snapshot_data[phase] = dict(pdata)
    return {"phases": snapshot_data, "write_count": self._write_count}
restore
restore(data: dict) -> None

Restore blackboard state from a snapshot dict.

Source code in orchestration/workflows/blackboard.py
def restore(self, data: dict) -> None:
    """Restore blackboard state from a snapshot dict."""
    if not data:
        return
    self._phases = data.get("phases", {})
    self._write_count = data.get("write_count", 0)
    logger.info(
        f"Blackboard restored: {self._write_count} writes across " f"{len(self._phases)} phases"
    )
sync_to_db async
sync_to_db(session_id: str) -> None

Persist all blackboard entries to PostgreSQL for the given session.

Source code in orchestration/workflows/blackboard.py
async def sync_to_db(self, session_id: str) -> None:
    """Persist all blackboard entries to PostgreSQL for the given session."""
    try:
        from core.database import get_async_session
        from core.models import BlackboardEntry

        async with get_async_session() as db_session:
            from sqlalchemy import delete as sa_delete

            samples = BlackboardEntry  # avoid re-import
            await db_session.execute(
                sa_delete(samples).where(samples.session_id == session_id)  # type: ignore[arg-type]
            )
            for phase, pdata in self._phases.items():
                for key, value in pdata.items():
                    entry = BlackboardEntry(
                        session_id=session_id,
                        phase=phase,
                        key=key,
                        value=json.dumps(value, ensure_ascii=False, default=str),
                        created_at=datetime.now(UTC).replace(tzinfo=None),
                    )
                    db_session.add(entry)
            await db_session.commit()
        logger.debug(f"Blackboard synced to DB: {session_id} ({self._write_count} writes)")
    except Exception as e:
        logger.warning(f"Blackboard sync_to_db failed (non-fatal): {e}")
sync_from_db async
sync_from_db(session_id: str) -> bool

Load blackboard entries from PostgreSQL. Returns True if entries found.

Source code in orchestration/workflows/blackboard.py
async def sync_from_db(self, session_id: str) -> bool:
    """Load blackboard entries from PostgreSQL. Returns True if entries found."""
    try:
        from sqlalchemy import select as sa_select

        from core.database import get_async_session
        from core.models import BlackboardEntry

        async with get_async_session() as db_session:
            result = await db_session.execute(
                sa_select(BlackboardEntry).where(
                    BlackboardEntry.session_id == session_id  # type: ignore[arg-type]
                )
            )
            entries = result.scalars().all()
            if not entries:
                return False

            for entry in entries:
                phase_data = self._phases.setdefault(entry.phase, {})
                try:
                    phase_data[entry.key] = json.loads(entry.value)
                except (json.JSONDecodeError, TypeError):
                    phase_data[entry.key] = entry.value
                self._write_count += 1

        logger.info(
            f"Blackboard loaded from DB: {session_id} "
            f"({len(entries)} entries across {len(self._phases)} phases)"
        )
        return True
    except Exception as e:
        logger.warning(f"Blackboard sync_from_db failed (non-fatal): {e}")
        return False