Source code for moderatelyai_sdk.resources_async.pipelines

"""Async pipelines resource for the Moderately AI API."""

from typing import List, Optional

from ..models.pipeline_async import PipelineAsyncModel
from ..types import PaginatedResponse
from ._base import AsyncBaseResource


[docs] class AsyncPipelines(AsyncBaseResource): """Manage pipelines in your teams (async version). Pipelines are basic metadata containers with name, description, and team ownership. The actual pipeline logic is stored in PipelineConfigurationVersions. Execution instances are managed through PipelineExecutions. Examples: ```python # List all pipelines pipelines = await client.pipelines.list() # Get a specific pipeline pipeline = await client.pipelines.retrieve("pipeline_123") # Create a new pipeline pipeline = await client.pipelines.create( name="Document Analysis Pipeline", description="Processes legal documents" ) # Update a pipeline pipeline = await client.pipelines.update( "pipeline_123", name="Updated Pipeline Name" ) # Delete a pipeline await client.pipelines.delete("pipeline_123") ``` """
[docs] async def list( self, *, pipeline_ids: Optional[List[str]] = None, name_like: Optional[str] = None, page: int = 1, page_size: int = 10, order_by: Optional[str] = None, order_direction: str = "asc", ) -> PaginatedResponse: """List all pipelines with pagination (async). Note: Results are automatically filtered to the team specified in the client. Args: pipeline_ids: Filter by specific pipeline IDs. name_like: Filter pipelines by name (case-insensitive partial match). page: Page number (1-based). Defaults to 1. page_size: Number of items per page (1-1000). Defaults to 10. order_by: Field to sort by ("name", "createdAt", "updatedAt"). order_direction: Sort direction ("asc" or "desc"). Defaults to "asc". Returns: Paginated list of pipelines for the client's team. """ query = { "page": page, "pageSize": page_size, "orderDirection": order_direction, } if pipeline_ids is not None: query["pipelineIds"] = ",".join(pipeline_ids) if name_like is not None: query["nameLike"] = name_like if order_by is not None: query["orderBy"] = order_by response = await self._get("/pipelines", options={"query": query}) # Convert pipeline items to fat models if "items" in response: response["items"] = [ PipelineAsyncModel(item, self._client) for item in response["items"] ] return response
[docs] async def retrieve(self, pipeline_id: str) -> PipelineAsyncModel: """Retrieve a specific pipeline by ID (async). Args: pipeline_id: The ID of the pipeline to retrieve. Returns: The pipeline model instance. Raises: NotFoundError: If the pipeline doesn't exist. """ pipeline_data = await self._get(f"/pipelines/{pipeline_id}") return PipelineAsyncModel(pipeline_data, self._client)
[docs] async def create( self, *, name: str, description: Optional[str] = None, **kwargs, ) -> PipelineAsyncModel: """Create a new pipeline (async). Note: The pipeline will be created in the team specified in the client. Args: name: The pipeline's name (1-255 characters). Must be unique within the team. description: The pipeline's description (max 1000 characters). **kwargs: Additional pipeline properties. Returns: The created pipeline model instance. Raises: ValidationError: If the request data is invalid. ConflictError: If a pipeline with the same name already exists in the team. """ body = { "teamId": self._client.team_id, "name": name, **kwargs, } if description is not None: body["description"] = description pipeline_data = await self._post("/pipelines", body=body) return PipelineAsyncModel(pipeline_data, self._client)
[docs] async def update( self, pipeline_id: str, *, name: Optional[str] = None, description: Optional[str] = None, **kwargs, ) -> PipelineAsyncModel: """Update an existing pipeline (async). Args: pipeline_id: The ID of the pipeline to update. name: New pipeline name (1-255 characters). Must be unique within the team. description: New pipeline description (max 1000 characters). **kwargs: Additional properties to update. Returns: The updated pipeline model instance. Raises: NotFoundError: If the pipeline doesn't exist. ValidationError: If the request data is invalid. ConflictError: If a pipeline with the same name already exists in the team. """ body = {**kwargs} if name is not None: body["name"] = name if description is not None: body["description"] = description pipeline_data = await self._patch(f"/pipelines/{pipeline_id}", body=body) return PipelineAsyncModel(pipeline_data, self._client)
[docs] async def delete(self, pipeline_id: str) -> None: """Delete a pipeline (async). Args: pipeline_id: The ID of the pipeline to delete. Raises: NotFoundError: If the pipeline doesn't exist. """ await self._delete(f"/pipelines/{pipeline_id}")