Source code for moderatelyai_sdk.resources.agent_executions

"""Agent executions resource for the Moderately AI API."""

from typing import Any, Dict, Optional

from ..types import AgentExecution, PaginatedResponse
from ._base import BaseResource


[docs] class AgentExecutions(BaseResource): """Manage agent executions and monitor their progress. Examples: ```python # List all executions executions = client.agent_executions.list() # Get a specific execution execution = client.agent_executions.retrieve("execution_123") # Create a new execution execution = client.agent_executions.create( agent_id="agent_123", input_data={"message": "Hello, world!"} ) # Cancel a running execution client.agent_executions.cancel("execution_123") ``` """
[docs] def list( self, *, agent_id: Optional[str] = None, status: Optional[str] = None, page: int = 1, page_size: int = 10, order_by: str = "created_at", order_direction: str = "desc", ) -> PaginatedResponse: """List all agent executions with pagination. Args: agent_id: Filter executions by agent ID. status: Filter executions by status (e.g., "running", "completed", "failed"). page: Page number (1-based). Defaults to 1. page_size: Number of items per page. Defaults to 10. order_by: Field to sort by. Defaults to "created_at". order_direction: Sort direction ("asc" or "desc"). Defaults to "desc". Returns: Paginated list of agent executions. """ query = { "page": page, "page_size": page_size, "order_by": order_by, "order_direction": order_direction, } if agent_id is not None: query["agent_id"] = agent_id if status is not None: query["status"] = status return self._get( "/agent-executions", options={"query": query}, )
[docs] def retrieve(self, execution_id: str) -> AgentExecution: """Retrieve a specific agent execution by ID. Args: execution_id: The ID of the execution to retrieve. Returns: The agent execution data. Raises: NotFoundError: If the execution doesn't exist. """ return self._get(f"/agent-executions/{execution_id}")
[docs] def create( self, *, agent_id: str, input_data: Optional[Dict[str, Any]] = None, **kwargs, ) -> AgentExecution: """Create a new agent execution. Args: agent_id: The ID of the agent to execute. input_data: Input data to pass to the agent. **kwargs: Additional execution properties. Returns: The created agent execution data. Raises: ValidationError: If the request data is invalid. NotFoundError: If the agent doesn't exist. """ body = { "agent_id": agent_id, **kwargs, } if input_data is not None: body["input_data"] = input_data return self._post("/agent-executions", body=body)
[docs] def cancel(self, execution_id: str) -> AgentExecution: """Cancel a running agent execution. Args: execution_id: The ID of the execution to cancel. Returns: The updated execution data. Raises: NotFoundError: If the execution doesn't exist. ConflictError: If the execution cannot be cancelled (e.g., already completed). """ return self._post(f"/agent-executions/{execution_id}/cancel")
[docs] def wait_for_completion( self, execution_id: str, *, timeout: Optional[float] = None, poll_interval: float = 2.0, ) -> AgentExecution: """Wait for an agent execution to complete. This is a convenience method that polls the execution status until it reaches a terminal state (completed, failed, or cancelled). Args: execution_id: The ID of the execution to wait for. timeout: Maximum time to wait in seconds. If None, waits indefinitely. poll_interval: Time between status checks in seconds. Defaults to 2.0. Returns: The final execution data. Raises: TimeoutError: If the timeout is reached before completion. NotFoundError: If the execution doesn't exist. """ import time start_time = time.time() while True: execution = self.retrieve(execution_id) status = execution.get("status", "") # Check if execution is in a terminal state if status in ["completed", "failed", "cancelled"]: return execution # Check timeout if timeout is not None and (time.time() - start_time) > timeout: raise TimeoutError( f"Execution {execution_id} did not complete within {timeout} seconds" ) # Wait before next poll time.sleep(poll_interval)