Source code for moderatelyai_sdk.models.dataset_schema_version

"""Dataset schema version models with user-friendly functionality.

This module provides DatasetSchemaVersionModel and SchemaBuilder classes for
managing dataset schemas with a user-friendly API that simplifies the complex
underlying schema structure.

Example:
    ```python
    from moderatelyai_sdk import ModeratelyAI

    client = ModeratelyAI(api_key="your_key", team_id="your_team")
    dataset = client.datasets.retrieve("dataset_123")

    # Simple schema creation
    schema = dataset.create_schema([
        {"name": "id", "type": "int", "required": True},
        {"name": "name", "type": "string"},
        {"name": "created_at", "type": "datetime"}
    ])

    # Auto-inference from CSV
    schema = dataset.create_schema_from_sample("data.csv")

    # Advanced fluent API
    schema = (dataset.schema_builder()
        .add_column("user_id", "int", required=True)
        .add_column("email", "string")
        .with_parsing(delimiter=",", header_row=1)
        .as_current()
        .create())
    ```
"""

from typing import Any, Dict, List, Optional, Union

from ._base import BaseModel


[docs] class DatasetSchemaVersionModel(BaseModel): """Model representing a dataset schema version with rich functionality. A schema version defines the structure and data types for a dataset's columns. This model provides methods for managing schema versions, including activation, column manipulation, and updates. Key Features: - Activate/archive schema versions - Add/remove columns dynamically - Update schema properties - Rich column access and validation Attributes: dataset_schema_version_id: Unique identifier for this schema version dataset_id: ID of the parent dataset version_no: Version number (incremental) columns: List of column definitions status: Schema status (draft, current, archived) parsing_options: Data parsing configuration created_at: Creation timestamp updated_at: Last update timestamp Example: ```python # Get current schema schema = dataset.get_current_schema() # Add a new column schema.add_column("new_field", "string", required=False) # Activate the schema schema.activate() # Access column information user_id_col = schema.get_column("user_id") print(f"Column: {user_id_col['name']} ({user_id_col['type']})") ``` """ @property def dataset_schema_version_id(self) -> str: """The unique identifier for this schema version.""" return self._data["datasetSchemaVersionId"] @property def dataset_id(self) -> str: """The dataset this schema version belongs to.""" return self._data["datasetId"] @property def version_no(self) -> int: """The version number.""" return self._data["versionNo"] @property def columns(self) -> List[Dict[str, Any]]: """The column definitions for this schema.""" return self._data["columnsJson"] @property def status(self) -> str: """The status of this schema version (draft, current, archived).""" return self._data["status"] @property def parsing_options(self) -> Optional[Dict[str, Any]]: """Parsing options for data processing.""" return self._data.get("parsingOptions") @property def created_at(self) -> str: """When this schema version was created.""" return self._data["createdAt"] @property def updated_at(self) -> str: """When this schema version was last updated.""" return self._data["updatedAt"]
[docs] def activate(self) -> "DatasetSchemaVersionModel": """Make this schema version the current/active one. Returns: Updated schema version model. """ return self.update(status="current")
[docs] def archive(self) -> "DatasetSchemaVersionModel": """Archive this schema version. Returns: Updated schema version model. """ return self.update(status="archived")
[docs] def update( self, *, status: Optional[str] = None, columns: Optional[List[Dict[str, Any]]] = None, parsing_options: Optional[Dict[str, Any]] = None, ) -> "DatasetSchemaVersionModel": """Update this schema version. Args: status: New status ('draft', 'current', 'archived'). columns: Updated column definitions. parsing_options: Updated parsing options. Returns: Updated schema version model. """ body = {} if status is not None: body["status"] = status if columns is not None: body["columns"] = columns # API expects 'columns', not 'columnsJson' if parsing_options is not None: body["parsingOptions"] = parsing_options updated_data = self._client._request( method="PATCH", path=f"/dataset-schema-versions/{self.dataset_schema_version_id}", body=body, cast_type=dict, ) # Update our internal data and return self self._data = updated_data return self
[docs] def add_column( self, name: str, column_type: str, *, required: bool = True, description: Optional[str] = None, position: Optional[int] = None, **kwargs: Any, ) -> "DatasetSchemaVersionModel": """Add a new column to this schema version. Args: name: Column name. column_type: Column type ('string', 'integer', 'float', 'datetime', 'boolean'). required: Whether the column is required (not nullable). description: Optional column description. position: Position in the schema (1-based). If None, adds to end. **kwargs: Additional column properties. Returns: Updated schema version model. """ # Convert user-friendly types to API types type_mapping = { "str": "string", "int": "integer", "float": "float", "datetime": "datetime", "bool": "boolean", "date": "datetime", } api_type = type_mapping.get(column_type, column_type) # Create new column definition new_column = { "name": name, "type": api_type, "nullable": not required, **kwargs, } if description: new_column["description"] = description # Add position if specified columns = self.columns.copy() if position is not None: new_column["pos"] = position # Insert at specified position (convert from 1-based to 0-based) columns.insert(position - 1, new_column) else: # Add to end with next position max_pos = max((col.get("pos", 0) for col in columns), default=0) new_column["pos"] = max_pos + 1 columns.append(new_column) return self.update(columns=columns)
[docs] def remove_column(self, name: str) -> "DatasetSchemaVersionModel": """Remove a column from this schema version. Args: name: Name of the column to remove. Returns: Updated schema version model. Raises: ValueError: If column doesn't exist. """ columns = self.columns.copy() original_length = len(columns) # Remove column by name columns = [col for col in columns if col.get("name") != name] if len(columns) == original_length: raise ValueError(f"Column '{name}' not found in schema") return self.update(columns=columns)
[docs] def get_column(self, name: str) -> Optional[Dict[str, Any]]: """Get a column definition by name. Args: name: Column name to find. Returns: Column definition dictionary or None if not found. """ for column in self.columns: if column.get("name") == name: return column return None
def _refresh(self) -> None: """Refresh this schema version from the API.""" response = self._client._request( method="GET", path=f"/dataset-schema-versions/{self.dataset_schema_version_id}", cast_type=dict, ) self._data = response
[docs] class SchemaBuilder: """Fluent API builder for creating dataset schema versions. SchemaBuilder provides a chainable interface for constructing complex dataset schemas with parsing options and validation. It offers the most advanced level of schema creation with full control over column properties and data processing. The builder pattern allows you to: - Add columns with detailed specifications - Configure parsing options (delimiters, headers, encoding) - Set schema status (draft/current) - Create the final schema version Example: ```python # Build a comprehensive schema schema = (dataset.schema_builder() .add_column("id", "int", required=True, description="Primary key") .add_column("email", "string", required=True) .add_column("signup_date", "datetime", required=False) .add_column("is_active", "bool", required=True) .with_parsing( delimiter=",", header_row=1, encoding="utf-8" ) .as_current() # Mark as active schema .create()) # Execute the creation print(f"Created schema version {schema.version_no}") ``` """
[docs] def __init__(self, dataset_id: str, client): """Initialize schema builder. Args: dataset_id: The dataset to create schema for. client: The API client instance. """ self._dataset_id = dataset_id self._client = client self._columns: List[Dict[str, Any]] = [] self._parsing_options: Dict[str, Any] = {} self._status = "draft"
[docs] def add_column( self, name: str, column_type: str, *, required: bool = True, description: Optional[str] = None, operations: Optional[List[Dict[str, Any]]] = None, **kwargs: Any, ) -> "SchemaBuilder": """Add a column to the schema being built. Args: name: Column name. column_type: Column type ('string', 'integer', 'float', 'datetime', 'boolean'). required: Whether the column is required (not nullable). description: Optional column description. operations: List of column operations (advanced). **kwargs: Additional column properties. Returns: This builder for method chaining. """ # Convert user-friendly types to API types type_mapping = { "str": "string", "int": "integer", "float": "float", "datetime": "datetime", "bool": "boolean", "date": "datetime", } api_type = type_mapping.get(column_type, column_type) column = { "pos": len(self._columns) + 1, "name": name, "type": api_type, "nullable": not required, **kwargs, } if description: column["description"] = description if operations: column["operations"] = operations self._columns.append(column) return self
[docs] def with_parsing( self, *, skip_rows: Optional[Union[int, List[int]]] = None, header_row: Optional[int] = None, delimiter: Optional[str] = None, encoding: Optional[str] = None, ) -> "SchemaBuilder": """Set parsing options for the schema. Args: skip_rows: Row indices to skip (0-based), or single row index. header_row: Row containing column headers (1-based). delimiter: Field delimiter (e.g., ',', '\t', ';'). encoding: File encoding (e.g., 'utf-8', 'latin1'). Returns: This builder for method chaining. """ if skip_rows is not None: self._parsing_options["skipRows"] = skip_rows if header_row is not None: self._parsing_options["headerRow"] = header_row if delimiter is not None: self._parsing_options["delimiter"] = delimiter if encoding is not None: self._parsing_options["encoding"] = encoding return self
[docs] def as_draft(self) -> "SchemaBuilder": """Set the schema status to draft. Returns: This builder for method chaining. """ self._status = "draft" return self
[docs] def as_current(self) -> "SchemaBuilder": """Set the schema status to current (active). Returns: This builder for method chaining. """ self._status = "current" return self
[docs] def create(self) -> DatasetSchemaVersionModel: """Create the schema version with the configured settings. Returns: The created schema version model. Raises: ValueError: If no columns have been added. APIError: If creation fails. """ if not self._columns: raise ValueError("Cannot create schema with no columns. Use add_column() first.") # Build request body body = { "datasetId": self._dataset_id, "columns": self._columns, # Note: API expects 'columns', not 'columnsJson' "status": self._status, } if self._parsing_options: body["parsingOptions"] = self._parsing_options # Create the schema version response = self._client._request( method="POST", path="/dataset-schema-versions", body=body, cast_type=dict, ) return DatasetSchemaVersionModel(response, self._client)