Skip to content

Tools API Reference

tools.specs

Tool Specifications — JSON Schema definitions para function-calling nativo. Reemplaza el antiguo sistema text-based de build_tool_instructions().

Functions

expand_allowed_tools

expand_allowed_tools(
    allowed_tools: list[str] | None,
) -> list[str] | None

Expand tool group names into individual tool names.

If an allowed_tools entry is NOT an exact match for any TOOL_DEFINITIONS key, treat it as a prefix and expand to all matching keys. This bridges the gap between agent profiles (e.g., tools: ["browser"]) and MCP-registered tool names (e.g., "mcp_browser_browser_navigate" sanitized for DeepSeek strict mode).

Example: ["browser", "file_manager"] → ["mcp_browser_browser_navigate", ..., "file_manager"]

Source code in tools/specs.py
def expand_allowed_tools(allowed_tools: list[str] | None) -> list[str] | None:
    """Expand tool group names into individual tool names.

    If an allowed_tools entry is NOT an exact match for any TOOL_DEFINITIONS key,
    treat it as a prefix and expand to all matching keys. This bridges the gap
    between agent profiles (e.g., tools: ["browser"]) and MCP-registered tool names
    (e.g., "mcp_browser_browser_navigate" sanitized for DeepSeek strict mode).

    Example: ["browser", "file_manager"] → ["mcp_browser_browser_navigate", ..., "file_manager"]
    """
    if allowed_tools is None:
        return None

    expanded: list[str] = []
    for entry in allowed_tools:
        if entry in TOOL_DEFINITIONS:
            expanded.append(entry)
            continue
        # Exact key prefix
        matched = [key for key in TOOL_DEFINITIONS if key.startswith(entry)]
        if not matched:
            # Original MCP prefix: "mcp:entry" in key or ".entry" in key
            matched = [
                key for key in TOOL_DEFINITIONS if f"mcp:{entry}" in key or f".{entry}" in key
            ]
        if not matched:
            # Sanitized MCP prefix: "mcp_entry" as prefix
            matched = [key for key in TOOL_DEFINITIONS if key.startswith(f"mcp_{entry}_")]
        if matched:
            expanded.extend(matched)
        else:
            expanded.append(entry)
    return expanded

build_tool_definitions

build_tool_definitions(
    allowed_tools: list[str] | None = None,
) -> list[dict]

Devuelve las definiciones de herramientas en formato OpenAI function-calling. Si allowed_tools es None, devuelve todas. Si es una lista, filtra por nombres.

Source code in tools/specs.py
def build_tool_definitions(allowed_tools: list[str] | None = None) -> list[dict]:
    """Devuelve las definiciones de herramientas en formato OpenAI function-calling.
    Si allowed_tools es None, devuelve todas. Si es una lista, filtra por nombres."""
    from core.config import settings

    strict = settings.deepseek_strict_mode
    expanded = expand_allowed_tools(allowed_tools)
    specs = []
    for name, tool_def in TOOL_DEFINITIONS.items():
        if expanded is None or name in expanded:
            specs.append(tool_def.to_openai_spec(strict=strict))
    return specs

build_tool_instructions

build_tool_instructions(
    allowed_tools: list[str] | None = None,
    project_root: str | None = None,
    plan_mode: bool = True,
) -> str

Instrucciones textuales para el LLM (fallback cuando no hay function-calling nativo). Mantenido por compatibilidad con el modo Ollama y el sistema legacy.

Source code in tools/specs.py
def build_tool_instructions(
    allowed_tools: list[str] | None = None,
    project_root: str | None = None,
    plan_mode: bool = True,
) -> str:
    """Instrucciones textuales para el LLM (fallback cuando no hay function-calling nativo).
    Mantenido por compatibilidad con el modo Ollama y el sistema legacy."""
    if not allowed_tools:
        return "No hay herramientas disponibles para esta tarea."

    lines = [
        "## 🛠️ Herramientas disponibles",
        "",
        "Debes responder con un JSON que contenga las acciones a ejecutar.",
    ]
    if plan_mode:
        lines.append('Formato: {"actions": [{"tool": "...", "action": "...", "params": {...}}]}')
    else:
        lines.append("Formato: objetos JSON concatenados: ")
        lines.append('{"tool": "...", "action": "...", "params": {...}}')

    lines.append("")

    for name in expand_allowed_tools(allowed_tools) or []:
        tool_def = TOOL_DEFINITIONS.get(name)
        if tool_def:
            lines.append(f"### {tool_def.name}")
            lines.append(f"{tool_def.description}")
            lines.append(f"Parámetros: {', '.join(tool_def.parameters.keys())}")
            lines.append("")

    if project_root:
        project_path = paths.code_projects_dir("main", project_root)
        lines.append(f"El directorio del proyecto es: {project_path}")
        lines.append("")

    lines.append("⚠️ Reglas obligatorias:")
    lines.append("- Usa 'file_manager' con action='write' para crear o modificar archivos.")
    lines.append("- Después de escribir archivos, usa 'git_manager' para init/add/commit.")
    lines.append("- Si hay test, escribe el archivo de test con pytest.")
    lines.append("- No uses herramientas que no estén en la lista de disponibles.")

    return "\n".join(lines)

tool_matches_allowlist

tool_matches_allowlist(
    tool_name: str, allowlist: list[str]
) -> bool

Check if a tool name matches the workflow allowlist.

Supports exact match, prefix match (mcp:browser. vs 'browser'), component match ('mcp:browser' in tool_name), and sanitized prefix match (mcp_browser_ for DeepSeek strict mode compat).

Source code in tools/specs.py
def tool_matches_allowlist(tool_name: str, allowlist: list[str]) -> bool:
    """Check if a tool name matches the workflow allowlist.

    Supports exact match, prefix match (mcp:browser.* vs 'browser'),
    component match ('mcp:browser' in tool_name), and sanitized
    prefix match (mcp_browser_* for DeepSeek strict mode compat).
    """
    if tool_name in allowlist:
        return True
    for entry in allowlist:
        if tool_name.startswith(entry):
            return True
        if f"mcp:{entry}" in tool_name or f".{entry}" in tool_name:
            return True
        # Sanitized MCP names: mcp_browser_browser_navigate
        if tool_name.startswith(f"mcp_{entry}_"):
            return True
    return False

tools.registry

Classes

ToolsRegistry

Registro de herramientas — instanciable, no singleton.

La instancia global legacy 'tools_registry' se mantiene para backward compat. Para nuevos entry points o tests, crear ToolsRegistry() directamente.

Source code in tools/registry.py
class ToolsRegistry:
    """Registro de herramientas — instanciable, no singleton.

    La instancia global legacy 'tools_registry' se mantiene para backward compat.
    Para nuevos entry points o tests, crear ToolsRegistry() directamente.
    """

    def __init__(self):
        self._tools: dict[str, Callable] = {}

    def register(self, name: str) -> Callable:
        def decorator(func: Callable) -> Callable:
            self._tools[name] = func
            logger.info(f"Tool registrada: {name}")
            return func

        return decorator

    def get_tool(self, name: str) -> Callable | None:
        return self._tools.get(name)

    def list_tools(self) -> dict[str, Callable]:
        return self._tools.copy()

    def unregister(self, name: str) -> bool:
        if name in self._tools:
            del self._tools[name]
            return True
        return False

    def clear(self):
        self._tools.clear()

tools.orchestrator

Tool Orchestrator Avanzado - Versión Final Robusta (Prioridad 3) - Retries y backoff alineados con models_controller - Mejor manejo de errores y mensajes amigables - Token budget aislado por workflow vía contextvars (thread/async-safe)

Classes

ToolOrchestrator

Source code in tools/orchestrator.py
class ToolOrchestrator:
    MAX_RETRIES = settings.tool_max_retries
    BACKOFF_BASE = settings.tool_backoff_base
    MAX_TOKENS_PER_WORKFLOW = settings.tool_max_tokens_per_workflow
    ENABLE_TOKEN_BUDGET = settings.tool_enable_token_budget

    # Tools/actions that require explicit user approval
    DANGEROUS_ACTIONS: set[str] = {
        "bash_manager",
        "code_exec",
        "file_manager.delete",
        "git_manager.commit",
        "git_manager.push",
    }

    # Global approval callback — set by UI layer (CLI or GUI)
    on_approval_required: Callable[[str, dict[str, Any]], Awaitable[bool]] | None = None

    @staticmethod
    async def execute_tool(
        tool_name: str,
        parameters: dict[str, Any],
        role: str = "agent",
        max_tokens: int | None = None,
        workspace: str = "main",
        session_id: str | None = None,
    ) -> dict[str, Any]:
        hooks_on = settings.hooks_enabled
        max_retries = settings.tool_max_retries
        backoff_base = settings.tool_backoff_base
        if not settings.tools_enabled:
            if hooks_on:
                await hooks_registry.dispatch(
                    "on_tools_disabled",
                    HookContext(
                        hook_point="on_tools_disabled",
                        tool_name=tool_name,
                        parameters=parameters,
                        role=role,
                        workspace=workspace,
                        session_id=session_id,
                    ),
                )
            return {
                "success": False,
                "error": "tools_disabled",
                "output": "❌ Herramientas desactivadas por configuración del sistema",
            }

        tool = tools_registry.get_tool(tool_name)
        if not tool:
            return {
                "success": False,
                "error": "tool_not_found",
                "output": f"❌ La herramienta '{tool_name}' no existe",
            }

        if not ToolOrchestrator._check_permissions(tool_name, role):
            if hooks_on:
                await hooks_registry.dispatch(
                    "on_permission_denied",
                    HookContext(
                        hook_point="on_permission_denied",
                        tool_name=tool_name,
                        parameters=parameters,
                        role=role,
                        workspace=workspace,
                        session_id=session_id,
                    ),
                )
            return {
                "success": False,
                "error": "permission_denied",
                "output": f"❌ Permiso denegado para usar '{tool_name}'",
            }

        # Interactive approval for dangerous operations
        action_key = (
            f"{tool_name}.{parameters.get('action', '')}" if parameters.get("action") else tool_name
        )
        if ToolOrchestrator.on_approval_required is not None and (
            tool_name in ToolOrchestrator.DANGEROUS_ACTIONS
            or action_key in ToolOrchestrator.DANGEROUS_ACTIONS
        ):
            approved = await ToolOrchestrator.on_approval_required(tool_name, parameters)
            if not approved:
                return {
                    "success": False,
                    "error": "approval_denied",
                    "output": f"❌ Operation '{tool_name}' was denied by user.",
                }

        # Token budget — isolated per async context (contextvars)
        estimated = ToolOrchestrator._estimate_tokens(parameters)
        current_budget = _token_budget_ctx.get()
        max_budget = ToolOrchestrator.MAX_TOKENS_PER_WORKFLOW
        if ToolOrchestrator.ENABLE_TOKEN_BUDGET and current_budget + estimated > max_budget:
            if hooks_on:
                await hooks_registry.dispatch(
                    "on_token_budget_exceeded",
                    HookContext(
                        hook_point="on_token_budget_exceeded",
                        tool_name=tool_name,
                        parameters=parameters,
                        role=role,
                        workspace=workspace,
                        session_id=session_id,
                    ),
                )
            return {
                "success": False,
                "error": "token_budget_exceeded",
                "output": (
                    f"❌ Presupuesto de tokens excedido "
                    f"({current_budget + estimated}/{max_budget})"
                ),
            }

        start_time = time.time()
        last_error = None

        for attempt in range(1, max_retries + 1):
            try:
                if hooks_on:
                    await hooks_registry.dispatch(
                        "on_before_tool",
                        HookContext(
                            hook_point="on_before_tool",
                            tool_name=tool_name,
                            parameters=parameters,
                            role=role,
                            attempt=attempt,
                            workspace=workspace,
                            session_id=session_id,
                        ),
                    )

                with undercover:
                    result = (
                        await tool(**parameters)
                        if asyncio.iscoroutinefunction(tool)
                        else tool(**parameters)
                    )

                duration = time.time() - start_time
                actual_tokens = (
                    result.get("tokens_used", estimated) if isinstance(result, dict) else estimated
                )

                # Check internal tool success before logging
                internal_ok = result.get("success", True) if isinstance(result, dict) else True
                if not internal_ok:
                    error_msg = (
                        result.get("output", result.get("text", "unknown error"))
                        if isinstance(result, dict)
                        else str(result)
                    )
                    # Fast-fail: skip retry for file-not-found or path errors
                    fast_fail_keywords = [
                        "no se encontró",
                        "no encontrado",
                        "not found",
                        "file not found",
                        "fuera del workspace",
                    ]
                    if any(kw in str(error_msg).lower() for kw in fast_fail_keywords):
                        logger.warning(
                            f"Tool {tool_name} failed (fast-fail: file/path error) — no retry"
                        )
                        return {
                            "success": False,
                            "error": "tool_reported_failure",
                            "output": f"❌ La herramienta '{tool_name}' reportó fallo: {str(error_msg)[:300]}",
                            "tool": tool_name,
                        }
                    # Fast-fail: skip retry for deterministic test failures
                    if tool_name == "test_runner" and isinstance(result, dict):
                        if result.get("failed_count", 0) > 0 or result.get("error_count", 0) > 0:
                            logger.warning(
                                f"test_runner: {result['failed_count']} failures, "
                                f"{result['error_count']} errors — no retry (tests are deterministic)"
                            )
                            return {
                                "success": False,
                                "error": "tests_failed",
                                "output": str(error_msg)[:300],
                                "tool": tool_name,
                            }
                    logger.warning(
                        f"Tool {tool_name} reported failure (attempt {attempt}): {str(error_msg)[:200]}"
                    )
                    if attempt < max_retries:
                        delay = backoff_base**attempt + random.uniform(0, 0.5)
                        await asyncio.sleep(delay)
                        continue
                    return {
                        "success": False,
                        "error": "tool_reported_failure",
                        "output": f"❌ La herramienta '{tool_name}' reportó fallo: {str(error_msg)[:300]}",
                        "tool": tool_name,
                    }

                # Verificar presupuesto con tokens reales antes de contabilizar
                new_total = current_budget + actual_tokens
                if ToolOrchestrator.ENABLE_TOKEN_BUDGET and new_total > max_budget:
                    logger.warning(
                        f"Tool {tool_name} excede presupuesto con tokens reales "
                        f"({new_total}/{max_budget}). Resultado descartado."
                    )
                    return {
                        "success": False,
                        "error": "token_budget_exceeded",
                        "output": f"❌ Presupuesto excedido ({new_total}/{max_budget})",
                    }
                _token_budget_ctx.set(new_total)

                logger.info(
                    f"✅ Tool {tool_name} OK (attempt {attempt}) | tokens={actual_tokens} | duration={duration:.2f}s"
                )

                if hooks_on:
                    await hooks_registry.dispatch(
                        "on_after_tool",
                        HookContext(
                            hook_point="on_after_tool",
                            tool_name=tool_name,
                            parameters=parameters,
                            role=role,
                            result=result,
                            duration=duration,
                            attempt=attempt,
                            workspace=workspace,
                            session_id=session_id,
                        ),
                    )

                return {
                    "success": True,
                    "tool": tool_name,
                    "output": result,
                    "tokens_used": actual_tokens,
                    "duration": duration,
                    "attempt": attempt,
                }

            except Exception as e:
                last_error = str(e)
                # If file-not-found error, skip retry
                if "no encontrado" in last_error or "not found" in last_error:
                    logger.warning(f"Tool {tool_name} failed (file not found) — no retry")
                    break
                logger.warning(f"Tool {tool_name} falló (attempt {attempt}/{max_retries}): {e}")

                if hooks_on:
                    await hooks_registry.dispatch(
                        "on_tool_error",
                        HookContext(
                            hook_point="on_tool_error",
                            tool_name=tool_name,
                            parameters=parameters,
                            role=role,
                            error=last_error,
                            attempt=attempt,
                            workspace=workspace,
                            session_id=session_id,
                        ),
                    )

                if attempt < max_retries:
                    delay = backoff_base**attempt + random.uniform(0, 0.5)
                    await asyncio.sleep(delay)

        # Fallback error amigable
        return {
            "success": False,
            "error": "max_retries_exceeded",
            "output": (
                f"❌ La herramienta '{tool_name}' falló después de "
                f"{ToolOrchestrator.MAX_RETRIES} attempts.\nLast error: {last_error}"
            ),
            "tool": tool_name,
        }

    @staticmethod
    def _check_permissions(tool_name: str, role: str) -> bool:
        # ALLOW_CODE_EXECUTION flag gating
        if tool_name in ("code_exec", "bash_manager") and not settings.allow_code_execution:
            return False
        key = f"allow_{tool_name}_{role}"
        return kairos.get(key, kairos.get(f"allow_{tool_name}", True))  # dynamic permission flags

    @staticmethod
    def _estimate_tokens(parameters: dict[str, Any]) -> int:
        try:
            text = json.dumps(parameters, ensure_ascii=False)
            return len(get_encoding().encode(text))
        except (TypeError, ValueError, AttributeError):
            return len(str(parameters)) // 4

    @staticmethod
    def reset_token_budget():
        """Reinicia el presupuesto de tokens al inicio de cada workflow.
        El contextvar garantiza aislamiento entre workflows concurrentes."""
        _token_budget_ctx.set(0)
        ToolOrchestrator.MAX_TOKENS_PER_WORKFLOW = settings.tool_max_tokens_per_workflow
        ToolOrchestrator.ENABLE_TOKEN_BUDGET = settings.tool_enable_token_budget
Functions
reset_token_budget staticmethod
reset_token_budget()

Reinicia el presupuesto de tokens al inicio de cada workflow. El contextvar garantiza aislamiento entre workflows concurrentes.

Source code in tools/orchestrator.py
@staticmethod
def reset_token_budget():
    """Reinicia el presupuesto de tokens al inicio de cada workflow.
    El contextvar garantiza aislamiento entre workflows concurrentes."""
    _token_budget_ctx.set(0)
    ToolOrchestrator.MAX_TOKENS_PER_WORKFLOW = settings.tool_max_tokens_per_workflow
    ToolOrchestrator.ENABLE_TOKEN_BUDGET = settings.tool_enable_token_budget

Functions

tools.wrapper

Wrapper de alto nivel para tool calls (usado por workflow_orchestrator) Incluye instrumentación de métricas por herramienta (latencia + éxito/fallo).

Functions

safe_tool_call async

safe_tool_call(
    tool_name: str,
    parameters: dict,
    role: str = "agent",
    workspace: str = "main",
    session_id: str | None = None,
)

Wrapper simple y seguro para usar en workflow_orchestrator. Routes MCP-prefixed tools to the appropriate MCP client. Records per-tool metrics (latency, success/failure).

Source code in tools/wrapper.py
async def safe_tool_call(
    tool_name: str,
    parameters: dict,
    role: str = "agent",
    workspace: str = "main",
    session_id: str | None = None,
):
    """Wrapper simple y seguro para usar en workflow_orchestrator.
    Routes MCP-prefixed tools to the appropriate MCP client.
    Records per-tool metrics (latency, success/failure).
    """
    if not tool_name or not tool_name.strip():
        return {
            "success": False,
            "error": "empty_tool_name",
            "output": "\u274c Tool name cannot be empty",
        }

    # Fast-fail: bash_manager requires 'command' parameter
    if tool_name == "bash_manager" and not str(parameters.get("command", "")).strip():
        return {
            "success": False,
            "error": "missing_required_param",
            "output": "❌ bash_manager requires 'command' parameter",
        }

    start = time.monotonic()
    try:
        if tool_name.startswith("mcp:"):
            from core.mcp.client import get_mcp_client_for_tool

            client = get_mcp_client_for_tool(tool_name)
            if client is None:
                result = {
                    "success": False,
                    "error": "mcp_client_not_found",
                    "output": f"MCP client not found for tool: {tool_name}",
                }
            else:
                result = await client.call_tool(tool_name, parameters)
        else:
            result = await tool_orchestrator.execute_tool(
                tool_name, parameters, role=role, workspace=workspace, session_id=session_id
            )
    except Exception:
        elapsed = (time.monotonic() - start) * 1000
        tool_metrics.record_call(tool_name, False, elapsed)
        raise

    elapsed = (time.monotonic() - start) * 1000
    success: bool = bool(result.get("success", False)) if isinstance(result, dict) else False
    tool_metrics.record_call(tool_name, success, elapsed)
    metrics.tool_calls += 1
    return result

tools.loader

Functions

load_global_tools

load_global_tools()

Carga las herramientas globales desde la carpeta 'tools/'.

Source code in tools/loader.py
def load_global_tools():
    """Carga las herramientas globales desde la carpeta 'tools/'."""
    global_dir = Path(__file__).parent.parent / "tools"
    if not global_dir.exists():
        logger.info("No se encontró el directorio de herramientas globales.")
        return
    for py_file in global_dir.glob("*.py"):
        if py_file.name.startswith("_"):
            continue
        logger.info(f"Cargando herramienta global: {py_file.name}")
        _import_module_from_file(f"tools.{py_file.stem}", py_file)

load_workspace_tools

load_workspace_tools(workspace: str)

Carga herramientas locales desde workspaces//tools/.

Source code in tools/loader.py
def load_workspace_tools(workspace: str):
    """Carga herramientas locales desde workspaces/<workspace>/tools/."""
    local_dir = paths.workspace_tools_dir(workspace)
    if not local_dir.exists():
        logger.info(f"No hay herramientas locales en workspace '{workspace}'")
        return
    modules_loaded = []
    for py_file in local_dir.glob("*.py"):
        if py_file.name.startswith("_"):
            continue
        full_name = f"workspaces.{workspace}.tools.{py_file.stem}"
        if _import_module_from_file(full_name, py_file):
            modules_loaded.append(full_name)
    _workspace_modules[workspace] = modules_loaded

unload_workspace_tools

unload_workspace_tools()

Elimina del registro las herramientas del workspace anterior.

Source code in tools/loader.py
def unload_workspace_tools():
    """Elimina del registro las herramientas del workspace anterior."""
    from tools.registry import tools_registry

    for workspace_name, module_names in list(_workspace_modules.items()):
        for full_name in module_names:
            tool_name = full_name.rsplit(".", 1)[-1]
            tools_registry.unregister(tool_name)
            logger.debug(f"Herramienta descargada: {tool_name} (workspace: {workspace_name})")
            if full_name in _imported_module_names():
                _cleanup_module(full_name)
    _workspace_modules.clear()
    logger.info("Herramientas de workspace descargadas del registro")

tools.file_manager

File Manager - Versión Profesional y Robusta - Alias file_path / path - Normalización inteligente de rutas: 1. Elimina el prefijo completo project_root si ya está presente. 2. Elimina el nombre del proyecto como primer componente si coincide con el último segmento de project_root. - Validación sintáctica de archivos .py antes de escribir - I/O vía asyncio.to_thread() para no bloquear el event loop

tools.bash_manager

Bash Manager — safe shell command execution.

Core CLI tool. Executes commands in the project workspace with timeout, sanitization, and dangerous pattern blocking.

Functions

tools.git_manager

Functions

tools.test_runner

Test Runner — ejecuta tests con pytest en el sandbox (Fase 4).

Functions

tools.lsp_manager

LSP Manager — Análisis de código con Jedi + Ruff diagnostics.

Classes

LspManager

Source code in tools/lsp_manager.py
class LspManager:
    def __init__(self, root_path: str):
        self.root_path = Path(root_path).resolve()
        self.project = jedi.Project(str(self.root_path))

    async def _get_script(self, file: str, line: int, character: int):
        file_path = (self.root_path / file).resolve()
        try:
            file_path.relative_to(self.root_path)
        except ValueError:
            raise ValueError(f"Archivo fuera del workspace: {file}")
        if not file_path.exists():
            raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
        code = await asyncio.to_thread(file_path.read_text, encoding="utf-8")
        return jedi.Script(code, path=str(file_path), project=self.project)

    async def definition(self, file: str, line: int, character: int) -> str:
        try:
            script = await self._get_script(file, line, character)
            defs = script.goto(line + 1, character)
            if not defs:
                # Fallback: search the project for functions with that name near the requested line
                code = await asyncio.to_thread(
                    Path(self.root_path / file).read_text, encoding="utf-8"
                )
                lines = code.splitlines()
                if line < len(lines) and lines[line].strip():
                    possible_name = lines[line].strip().split("def ")[-1].split("(")[0].strip(":")
                    full_code = await asyncio.to_thread(Path(self.root_path / file).read_text)
                    script_all = jedi.Script(
                        full_code, path=str(self.root_path / file), project=self.project
                    )
                    names = script_all.get_names(all_scopes=True)
                    for name in names:
                        if name.name == possible_name and name.type == "function":
                            return f"📍 {name.module_path or 'built-in'} L{name.line} C{name.column}: {name.description}"
                return "No se encontró definición."
            return "\n".join(
                f"📍 {d.module_path or 'built-in'} L{d.line} C{d.column}: {d.description}"
                for d in defs
            )
        except Exception as e:
            logger.error(f"LSP definition error: {e}")
            return f"Error en LSP: {str(e)[:150]}"

    async def hover(self, file: str, line: int, character: int) -> str:
        try:
            script = await self._get_script(file, line, character)
            signatures = script.get_signatures()
            if signatures:
                sig = signatures[0]
                return (
                    f"```python\n{sig.to_string()}\n```\n\n{sig.docstring() or 'Sin documentación'}"
                )
            return "Sin información disponible."
        except Exception as e:
            logger.error(f"LSP hover error: {e}")
            return f"Error en LSP: {str(e)[:150]}"

    async def diagnostics(self, file: str | None = None) -> str:
        """Analiza archivos Python en busca de problemas (vacíos, sintaxis, etc.)."""

        def _check_all(root_path: str, file: str | None) -> list[str]:
            reports: list[str] = []
            root = Path(root_path)

            def _check_file(filepath: Path, relpath: str) -> None:
                if filepath.stat().st_size == 0:
                    reports.append(f"Archivo vacío: {relpath}")
                    return
                try:
                    code = filepath.read_text(encoding="utf-8")
                    script = jedi.Script(code, path=str(filepath), project=self.project)
                    diag = script.get_names(all_scopes=True, references=False, definitions=False)
                    if not diag:
                        reports.append(f"Sin símbolos detectados: {relpath}")
                except SyntaxError as e:
                    reports.append(f"Error de sintaxis en {relpath}: {e}")
                except Exception as e:
                    logger.debug(f"Error analizando {relpath}: {e}")

            if file:
                full = root / file
                if full.is_file():
                    _check_file(full, file)
                else:
                    reports.append(f"Archivo no encontrado: {file}")
            else:
                for py_file in root.rglob("*.py"):
                    parts = py_file.parts
                    if any(p.startswith(".") or p == "__pycache__" for p in parts):
                        continue
                    rel = str(py_file.relative_to(root))
                    _check_file(py_file, rel)

            return reports

        reports = await asyncio.to_thread(_check_all, str(self.root_path), file)
        if not reports:
            reports.append("No se detectaron problemas.")
        return "\n".join(reports)

    async def ruff_check(self, file: str | None = None, fix: bool = False) -> str:
        """Ejecuta ruff como linter real con salida estructurada.

        Args:
            file: Archivo específico a analizar (None = todo el proyecto).
            fix: Si True, aplica auto-correcciones (ruff --fix).

        Returns:
            Reporte estructurado de issues encontrados.
        """
        cmd = ["ruff", "check", "--output-format=json"]
        if fix:
            cmd.append("--fix")
        target = str(self.root_path / file) if file else str(self.root_path)
        cmd.append(target)

        try:
            proc = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(self.root_path),
            )
            stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=60)
            result_stdout = stdout.decode()

            if not result_stdout.strip():
                return "✅ Ningún problema detectado por ruff."

            issues = _json.loads(result_stdout)
            if not issues or not isinstance(issues, list):
                return "✅ Ningún problema detectado por ruff."
            issues = [i for i in issues if i is not None and isinstance(i, dict)]
            if not issues:
                return "✅ Ningún problema detectado por ruff."

            lines = [f"🔍 Ruff encontró {len(issues)} problema(s):\n"]
            for issue in issues[:50]:  # cap at 50 issues
                location = issue.get("filename", "?")
                loc_data = issue.get("location") or {}
                line_no = loc_data.get("row", "?")
                col = loc_data.get("column", "?")
                code = issue.get("code", "?")
                message = issue.get("message", "")
                fixable = "🔧" if (issue.get("fix") or {}).get("applicability") else "  "
                rel_path = os.path.relpath(location, str(self.root_path))
                lines.append(f"{fixable} {rel_path}:{line_no}:{col}  [{code}] {message}")

            return "\n".join(lines)

        except TimeoutError:
            return "⏱️ Ruff excedió el tiempo límite (60s)."
        except FileNotFoundError:
            return "❌ Ruff no está instalado en el entorno."
        except Exception as e:
            logger.error(f"Ruff check error: {e}")
            return f"❌ Error ejecutando ruff: {e!s}"

    async def references(self, file: str, line: int, character: int) -> str:
        """Busca todas las referencias a un símbolo en el proyecto."""
        try:
            script = await self._get_script(file, line, character)
            refs = script.get_references()
            if not refs:
                return "No se encontraron referencias."
            return "\n".join(
                f"📍 {r.module_path or 'built-in'} L{r.line} C{r.column}: {r.description}"
                for r in refs
            )
        except Exception as e:
            logger.error(f"LSP references error: {e}")
            return f"Error en LSP references: {str(e)[:150]}"
Functions
diagnostics async
diagnostics(file: str | None = None) -> str

Analiza archivos Python en busca de problemas (vacíos, sintaxis, etc.).

Source code in tools/lsp_manager.py
async def diagnostics(self, file: str | None = None) -> str:
    """Analiza archivos Python en busca de problemas (vacíos, sintaxis, etc.)."""

    def _check_all(root_path: str, file: str | None) -> list[str]:
        reports: list[str] = []
        root = Path(root_path)

        def _check_file(filepath: Path, relpath: str) -> None:
            if filepath.stat().st_size == 0:
                reports.append(f"Archivo vacío: {relpath}")
                return
            try:
                code = filepath.read_text(encoding="utf-8")
                script = jedi.Script(code, path=str(filepath), project=self.project)
                diag = script.get_names(all_scopes=True, references=False, definitions=False)
                if not diag:
                    reports.append(f"Sin símbolos detectados: {relpath}")
            except SyntaxError as e:
                reports.append(f"Error de sintaxis en {relpath}: {e}")
            except Exception as e:
                logger.debug(f"Error analizando {relpath}: {e}")

        if file:
            full = root / file
            if full.is_file():
                _check_file(full, file)
            else:
                reports.append(f"Archivo no encontrado: {file}")
        else:
            for py_file in root.rglob("*.py"):
                parts = py_file.parts
                if any(p.startswith(".") or p == "__pycache__" for p in parts):
                    continue
                rel = str(py_file.relative_to(root))
                _check_file(py_file, rel)

        return reports

    reports = await asyncio.to_thread(_check_all, str(self.root_path), file)
    if not reports:
        reports.append("No se detectaron problemas.")
    return "\n".join(reports)
ruff_check async
ruff_check(
    file: str | None = None, fix: bool = False
) -> str

Ejecuta ruff como linter real con salida estructurada.

Parameters:

Name Type Description Default
file str | None

Archivo específico a analizar (None = todo el proyecto).

None
fix bool

Si True, aplica auto-correcciones (ruff --fix).

False

Returns:

Type Description
str

Reporte estructurado de issues encontrados.

Source code in tools/lsp_manager.py
async def ruff_check(self, file: str | None = None, fix: bool = False) -> str:
    """Ejecuta ruff como linter real con salida estructurada.

    Args:
        file: Archivo específico a analizar (None = todo el proyecto).
        fix: Si True, aplica auto-correcciones (ruff --fix).

    Returns:
        Reporte estructurado de issues encontrados.
    """
    cmd = ["ruff", "check", "--output-format=json"]
    if fix:
        cmd.append("--fix")
    target = str(self.root_path / file) if file else str(self.root_path)
    cmd.append(target)

    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(self.root_path),
        )
        stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=60)
        result_stdout = stdout.decode()

        if not result_stdout.strip():
            return "✅ Ningún problema detectado por ruff."

        issues = _json.loads(result_stdout)
        if not issues or not isinstance(issues, list):
            return "✅ Ningún problema detectado por ruff."
        issues = [i for i in issues if i is not None and isinstance(i, dict)]
        if not issues:
            return "✅ Ningún problema detectado por ruff."

        lines = [f"🔍 Ruff encontró {len(issues)} problema(s):\n"]
        for issue in issues[:50]:  # cap at 50 issues
            location = issue.get("filename", "?")
            loc_data = issue.get("location") or {}
            line_no = loc_data.get("row", "?")
            col = loc_data.get("column", "?")
            code = issue.get("code", "?")
            message = issue.get("message", "")
            fixable = "🔧" if (issue.get("fix") or {}).get("applicability") else "  "
            rel_path = os.path.relpath(location, str(self.root_path))
            lines.append(f"{fixable} {rel_path}:{line_no}:{col}  [{code}] {message}")

        return "\n".join(lines)

    except TimeoutError:
        return "⏱️ Ruff excedió el tiempo límite (60s)."
    except FileNotFoundError:
        return "❌ Ruff no está instalado en el entorno."
    except Exception as e:
        logger.error(f"Ruff check error: {e}")
        return f"❌ Error ejecutando ruff: {e!s}"
references async
references(file: str, line: int, character: int) -> str

Busca todas las referencias a un símbolo en el proyecto.

Source code in tools/lsp_manager.py
async def references(self, file: str, line: int, character: int) -> str:
    """Busca todas las referencias a un símbolo en el proyecto."""
    try:
        script = await self._get_script(file, line, character)
        refs = script.get_references()
        if not refs:
            return "No se encontraron referencias."
        return "\n".join(
            f"📍 {r.module_path or 'built-in'} L{r.line} C{r.column}: {r.description}"
            for r in refs
        )
    except Exception as e:
        logger.error(f"LSP references error: {e}")
        return f"Error en LSP references: {str(e)[:150]}"

Functions

lsp_manager_tool async

lsp_manager_tool(
    action: str = "diagnostics",
    file: str = "",
    line: int = 0,
    character: int = 0,
    project_root: str | None = None,
    workspace: str = "main",
    **kwargs
) -> str

Herramienta LSP profesional - usa project_root si se proporciona.

Source code in tools/lsp_manager.py
@tools_registry.register("lsp_manager")
async def lsp_manager_tool(
    action: str = "diagnostics",
    file: str = "",
    line: int = 0,
    character: int = 0,
    project_root: str | None = None,
    workspace: str = "main",
    **kwargs,
) -> str:
    """Herramienta LSP profesional - usa project_root si se proporciona."""
    if not action:
        return "❌ lsp_manager requiere un parámetro 'action' (diagnostics, definition, hover, references)"
    if not project_root:
        return (
            "❌ Se requiere 'project_root' para usar LSP. " "Especifica el directorio del proyecto."
        )

    from core.path_resolver import paths

    full_root = paths.code_projects_dir(workspace, project_root).resolve()
    if not full_root.exists():
        return f"❌ El proyecto {project_root} no existe aún."

    manager = LspManager(str(full_root))

    try:
        if action == "definition":
            result = await manager.definition(file, line, character)
            log_operation("lsp_definition", str(file)[:200], success=True)
            return result
        elif action == "hover":
            result = await manager.hover(file, line, character)
            log_operation("lsp_hover", str(file)[:200], success=True)
            return result
        elif action == "diagnostics":
            return await manager.diagnostics(file if file else None)
        elif action == "ruff_check":
            fix = kwargs.get("fix", False)
            return await manager.ruff_check(file if file else None, fix=fix)
        elif action == "references":
            return await manager.references(file, line, character)
        else:
            return f"Acción '{action}' no soportada."
    except Exception as e:
        logger.error(f"LSP error: {e}")
        return f"Error en LSP: {str(e)[:150]}"

tools.code_execution

CodeExecutor - Wrapper simple y seguro al sandbox hardened. La herramienta se autoregistra y devuelve texto limpio.

Classes

CodeExecutor

Source code in tools/code_execution.py
class CodeExecutor:
    @staticmethod
    async def execute(code: str) -> dict:
        """Ejecución segura con sandbox hardened (retorna dict interno)."""
        return await restricted_executor.execute(code)
Functions
execute async staticmethod
execute(code: str) -> dict

Ejecución segura con sandbox hardened (retorna dict interno).

Source code in tools/code_execution.py
@staticmethod
async def execute(code: str) -> dict:
    """Ejecución segura con sandbox hardened (retorna dict interno)."""
    return await restricted_executor.execute(code)

Functions

tools.diff_editor

Diff Editor — surgical code editing via unified diffs.

Functions

Web Search — búsqueda web via Google Custom Search API.

Functions

tools.web_fetch

Web Fetch — obtiene y convierte páginas web a texto.

Functions

Code Search — búsqueda de patrones regex en archivos del proyecto.

Functions

tools.pdf_reader

Functions

pdf_read_tool async

pdf_read_tool(
    path: str,
    workspace: str = "main",
    project_root: str | None = None,
    **kwargs
) -> str

Extrae texto de archivos PDF con validación de path traversal.

Source code in tools/pdf_reader.py
@tools_registry.register("pdf_read")
async def pdf_read_tool(
    path: str,
    workspace: str = "main",
    project_root: str | None = None,
    **kwargs,
) -> str:
    """Extrae texto de archivos PDF con validación de path traversal."""
    base = paths.code_projects_dir(workspace, project_root).resolve()

    resolved = (base / path).resolve()
    try:
        resolved.relative_to(base)
    except ValueError:
        return f"❌ Acceso denegado: {path} está fuera del workspace."

    if not resolved.exists():
        return f"❌ Archivo no encontrado: {resolved}"

    result = await asyncio.to_thread(PDFReader.read_pdf, str(resolved))
    log_operation("pdf_read", str(resolved)[:200], success=not result.startswith("Error"))
    return result

tools.ask_clarification

Clarification tool — allows agents to ask the user questions mid-workflow.