"""Dataset models with rich functionality for data operations.
This module provides the DatasetModel and DatasetDataVersionModel classes for
working with datasets and their data versions. These models offer rich functionality
for uploading data, creating schemas, and managing dataset versions.
Example:
```python
from moderatelyai_sdk import ModeratelyAI
client = ModeratelyAI(api_key="your_key", team_id="your_team")
# Create a dataset
dataset = client.datasets.create(
name="Sales Data",
description="Monthly sales records"
)
# Upload data
data_version = dataset.upload_data("sales.csv")
# Create a schema from sample data
schema = dataset.create_schema_from_sample("sales.csv")
# Download processed data
content = data_version.download()
```
"""
import csv
import hashlib
import re
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import httpx
from ..exceptions import APIError
from ._base import BaseModel
from ._shared.dataset_operations import DatasetOperations
if TYPE_CHECKING:
from .dataset_schema_version import DatasetSchemaVersionModel, SchemaBuilder
[docs]
class DatasetDataVersionModel(BaseModel):
"""Model representing a dataset data version.
A data version represents a specific upload of data to a dataset. Each time
you upload new data to a dataset, a new data version is created. This model
provides access to version metadata and download functionality.
Attributes:
dataset_data_version_id: Unique identifier for this data version
dataset_id: ID of the parent dataset
version_no: Version number (incremental)
file_type: Type of uploaded file (csv, xlsx)
file_hash: SHA256 hash of the uploaded file
row_count: Number of data rows
file_size_bytes: Size of the uploaded file
status: Processing status (draft, current, archived)
created_at: Creation timestamp
updated_at: Last update timestamp
Example:
```python
# Get the current data version
data_version = dataset.get_data_version("version_id")
print(f"Version {data_version.version_no}: {data_version.file_type}")
print(f"Rows: {data_version.row_count}, Size: {data_version.file_size_bytes}")
# Download the data
content = data_version.download()
data_version.download(path="./local_data.csv")
```
"""
@property
def dataset_data_version_id(self) -> str:
"""The unique identifier for this data version."""
return self._data["datasetDataVersionId"]
@property
def dataset_id(self) -> str:
"""The dataset this version belongs to."""
return self._data["datasetId"]
@property
def version_no(self) -> int:
"""The version number."""
return self._data["versionNo"]
@property
def file_type(self) -> str:
"""The file type (csv, xlsx)."""
return self._data["fileType"]
@property
def file_hash(self) -> Optional[str]:
"""The SHA256 hash of the file."""
return self._data.get("fileHash")
@property
def row_count(self) -> Optional[int]:
"""The number of rows in the dataset."""
return self._data.get("rowCount")
@property
def file_size_bytes(self) -> Optional[int]:
"""The size of the file in bytes."""
return self._data.get("fileSizeBytes")
@property
def status(self) -> str:
"""The status of this version (draft, current, archived)."""
return self._data["status"]
@property
def created_at(self) -> str:
"""When this version was created."""
return self._data["createdAt"]
@property
def updated_at(self) -> str:
"""When this version was last updated."""
return self._data["updatedAt"]
[docs]
def download(self, *, path: Optional[Union[str, Path]] = None) -> Optional[bytes]:
"""Download the data for this version.
Args:
path: Optional path to save the file. If provided, saves to this location.
If not provided, returns the file content as bytes.
Returns:
If path is provided: None (file is saved to disk)
If path is not provided: The file content as bytes
"""
# Get download URL
response = self._client._request(
method="GET",
path=f"/dataset-data-versions/{self.dataset_data_version_id}/download",
cast_type=dict,
)
download_url = response["downloadUrl"]
# Download the file
try:
with httpx.Client() as client:
download_response = client.get(download_url)
download_response.raise_for_status()
file_data = download_response.content
except httpx.HTTPError as e:
raise APIError(f"Failed to download data version from URL: {e}") from e
# Save to file or return bytes
if path is not None:
file_path = Path(path)
# Create parent directories if they don't exist
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as f:
f.write(file_data)
return None
else:
return file_data
def _refresh(self) -> None:
"""Refresh this data version from the API."""
response = self._client._request(
method="GET",
path=f"/dataset-data-versions/{self.dataset_data_version_id}",
cast_type=dict,
)
self._data = response
[docs]
class DatasetModel(BaseModel):
"""Model representing a dataset with rich data operations.
DatasetModel provides a high-level interface for working with datasets in the
Moderately AI platform. It offers rich functionality for data upload, schema
management, and version control.
Key Features:
- Upload data in various formats (CSV, Excel)
- Automatic schema inference from sample data
- Schema builder with fluent API
- Data version management and downloads
- Rich metadata access
This class is returned by dataset operations like:
- `client.datasets.create()`
- `client.datasets.retrieve()`
- `client.datasets.list()` (returns list of DatasetModel instances)
Attributes:
dataset_id: Unique identifier for the dataset
name: Dataset display name
description: Optional dataset description
team_id: Team that owns this dataset
record_count: Number of records in current data version
total_size_bytes: Total size of dataset data
current_schema_version_id: ID of current schema version
current_data_version_id: ID of current data version
processing_status: Data processing status
created_at: Creation timestamp
updated_at: Last update timestamp
Example:
```python
# Create a dataset
dataset = client.datasets.create(
name="Customer Data",
description="Customer information and metrics"
)
# Upload data with automatic schema inference
data_version = dataset.upload_data("customers.csv")
schema = dataset.create_schema_from_sample("customers.csv")
# Use schema builder for complex schemas
schema = (dataset.schema_builder()
.add_column("id", "int", required=True)
.add_column("name", "string")
.add_column("signup_date", "datetime")
.as_current()
.create())
# Access dataset information
print(f"Dataset: {dataset.name} ({dataset.record_count} records)")
# Download current data
content = dataset.download_data()
```
"""
@property
def dataset_id(self) -> str:
"""The unique identifier for this dataset."""
return self._data["datasetId"]
@property
def name(self) -> str:
"""The dataset name."""
return self._data["name"]
@property
def description(self) -> Optional[str]:
"""The dataset description."""
return self._data.get("description")
@property
def team_id(self) -> str:
"""The team this dataset belongs to."""
return self._data["teamId"]
@property
def record_count(self) -> Optional[int]:
"""Number of records in current data version."""
return self._data.get("recordCount")
@property
def total_size_bytes(self) -> Optional[int]:
"""Total size in bytes."""
return self._data.get("totalSizeBytes")
@property
def current_schema_version_id(self) -> Optional[str]:
"""Current schema version ID."""
return self._data.get("currentSchemaVersionId")
@property
def current_data_version_id(self) -> Optional[str]:
"""Current data version ID."""
return self._data.get("currentDataVersionId")
@property
def processing_status(self) -> Optional[str]:
"""Processing status: completed, failed, in_progress, needs-processing."""
return self._data.get("processingStatus")
@property
def created_at(self) -> str:
"""When this dataset was created."""
return self._data["createdAt"]
@property
def updated_at(self) -> str:
"""When this dataset was last updated."""
return self._data["updatedAt"]
[docs]
def upload_data(
self,
file: Union[str, Path, bytes, Any],
*,
file_type: Optional[str] = None,
status: str = "current",
**kwargs: Any,
) -> DatasetDataVersionModel:
"""Upload data to this dataset, creating a new data version.
Args:
file: The file to upload - can be a path, bytes, or file-like object
file_type: File type ('csv' or 'xlsx'). Auto-detected if not provided.
status: Version status ('draft' or 'current'). Defaults to 'current'.
**kwargs: Additional upload options.
Returns:
The created data version model.
Raises:
ValueError: If file is invalid or not found.
APIError: If upload process fails.
"""
# Step 1: Process the file input to get bytes and metadata
file_data: bytes
file_name: str
if isinstance(file, (str, Path)):
# Handle file path
file_path = Path(file)
if not file_path.exists():
raise ValueError(f"File not found: {file_path}")
with open(file_path, "rb") as f:
file_data = f.read()
file_name = file_path.name
elif isinstance(file, bytes):
# Handle raw bytes
file_data = file
file_name = kwargs.get("filename", "uploaded_data")
elif hasattr(file, "read"):
# Handle file-like object (buffer)
file_data = file.read()
if isinstance(file_data, str):
file_data = file_data.encode("utf-8")
# Try to get filename from buffer object
buffer_name = getattr(file, "name", None)
if buffer_name:
file_name = Path(buffer_name).name
else:
file_name = kwargs.get("filename", "uploaded_data")
else:
raise ValueError(
f"Unsupported file type: {type(file)}. Must be str, Path, bytes, or file-like object."
)
# Step 2: Auto-detect file type if not provided
if file_type is None:
file_extension = Path(file_name).suffix.lower()
if file_extension == ".csv":
file_type = "csv"
elif file_extension in [".xlsx", ".xls"]:
file_type = "xlsx"
else:
raise ValueError(
f"Could not auto-detect file type from '{file_name}'. "
"Please specify file_type as 'csv' or 'xlsx'."
)
# Step 3: Calculate file properties
file_size = len(file_data)
file_hash = hashlib.sha256(file_data).hexdigest()
# Rough row count estimation for CSV (not perfect but useful)
row_count = None
if file_type == "csv":
try:
# Simple row count by counting newlines (minus header)
text_data = file_data.decode("utf-8")
row_count = max(0, text_data.count("\n") - 1) # Subtract header row
except UnicodeDecodeError:
# If we can't decode, skip row counting
pass
# Step 4: Create data version with upload URL
create_response = self._client._request(
method="POST",
path="/dataset-data-versions",
body={
"datasetId": self.dataset_id,
"fileName": file_name,
"fileType": file_type,
"status": status,
},
cast_type=dict,
)
data_version_data = create_response["dataVersion"]
upload_url = create_response["uploadUrl"]
data_version_id = data_version_data["datasetDataVersionId"]
# Step 5: Upload file to presigned URL
try:
# Determine content type
content_type = "text/csv" if file_type == "csv" else "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
with httpx.Client() as client:
upload_result = client.put(
upload_url,
content=file_data,
headers={"Content-Type": content_type},
)
upload_result.raise_for_status()
except Exception as e:
raise APIError(f"Failed to upload data to presigned URL: {e}") from e
# Step 6: Mark upload as complete
try:
complete_response = self._client._request(
method="POST",
path=f"/dataset-data-versions/{data_version_id}/complete",
body={
"fileHash": file_hash,
"fileSizeBytes": file_size,
"rowCount": row_count,
},
cast_type=dict,
)
# Return the completed data version as a model
return DatasetDataVersionModel(complete_response, self._client)
except Exception as e:
raise APIError(f"Failed to complete data version upload: {e}") from e
[docs]
def download_data(
self,
*,
version_id: Optional[str] = None,
path: Optional[Union[str, Path]] = None,
) -> Optional[bytes]:
"""Download data from this dataset.
Args:
version_id: Specific version to download. If not provided, downloads current version.
path: Optional path to save the file. If provided, saves to this location.
If not provided, returns the file content as bytes.
Returns:
If path is provided: None (file is saved to disk)
If path is not provided: The file content as bytes
Raises:
ValueError: If no data version exists.
APIError: If download fails.
"""
# Use provided version_id or fall back to current version
target_version_id = version_id or self.current_data_version_id
if not target_version_id:
raise ValueError("No data version specified and dataset has no current version")
# Create a data version model and use its download method
version_data = {"datasetDataVersionId": target_version_id}
version_model = DatasetDataVersionModel(version_data, self._client)
return version_model.download(path=path)
[docs]
def list_data_versions(
self,
*,
page: int = 1,
page_size: int = 10,
status: Optional[str] = None,
) -> List[DatasetDataVersionModel]:
"""List data versions for this dataset.
Args:
page: Page number (1-based). Defaults to 1.
page_size: Number of items per page. Defaults to 10.
status: Filter by status ('draft', 'current', 'archived').
Returns:
List of data version models.
"""
query = {
"datasetIds": [self.dataset_id],
"page": page,
"pageSize": page_size,
}
if status is not None:
query["status"] = status
response = self._client._request(
method="GET",
path="/dataset-data-versions",
cast_type=dict,
options={"query": query},
)
# Convert response items to models
versions = []
for item in response.get("items", []):
versions.append(DatasetDataVersionModel(item, self._client))
return versions
[docs]
def get_data_version(self, version_id: str) -> DatasetDataVersionModel:
"""Get a specific data version by ID.
Args:
version_id: The data version ID to retrieve.
Returns:
The data version model.
"""
response = self._client._request(
method="GET",
path=f"/dataset-data-versions/{version_id}",
cast_type=dict,
)
return DatasetDataVersionModel(response, self._client)
[docs]
def update(
self,
*,
name: Optional[str] = None,
description: Optional[str] = None,
should_process: Optional[bool] = None,
**kwargs: Any,
) -> "DatasetModel":
"""Update this dataset.
Args:
name: New dataset name.
description: New dataset description.
should_process: Whether to trigger dataset processing workflow.
**kwargs: Additional properties to update.
Returns:
Updated dataset model.
"""
body = {**kwargs}
if name is not None:
body["name"] = name
if description is not None:
body["description"] = description
if should_process is not None:
body["shouldProcess"] = should_process
updated_data = self._client._request(
method="PATCH",
path=f"/datasets/{self.dataset_id}",
body=body,
cast_type=dict,
)
# Update our internal data and return self
self._data = updated_data
return self
[docs]
def delete(self) -> None:
"""Delete this dataset."""
self._client._request(
method="DELETE",
path=f"/datasets/{self.dataset_id}",
cast_type=dict,
)
def _refresh(self) -> None:
"""Refresh this dataset from the API."""
response = self._client._request(
method="GET",
path=f"/datasets/{self.dataset_id}",
cast_type=dict,
)
self._data = response
# Schema Version Methods
[docs]
def create_schema(
self,
columns: List[Dict[str, Any]],
*,
status: str = "draft",
parsing_options: Optional[Dict[str, Any]] = None,
) -> "DatasetSchemaVersionModel":
"""Create a schema version with simple column definitions.
Args:
columns: List of column definitions. Each dict should have:
- name: Column name (required)
- type: Column type (required) - 'string', 'int', 'float', 'datetime', 'bool'
- required: Whether column is required (optional, defaults to True)
- description: Column description (optional)
status: Initial status ('draft' or 'current'). Defaults to 'draft'.
parsing_options: Optional parsing configuration.
Returns:
The created schema version model.
Example:
```python
schema = dataset.create_schema([
{"name": "user_id", "type": "int", "required": True},
{"name": "email", "type": "string", "description": "User email address"},
{"name": "signup_date", "type": "datetime"},
])
```
"""
# Convert simple column definitions to full API format
api_columns = []
for i, col in enumerate(columns, 1):
if "name" not in col or "type" not in col:
raise ValueError("Each column must have 'name' and 'type' fields")
# Convert user-friendly types to API types
type_mapping = {
"str": "string",
"int": "integer",
"float": "float",
"datetime": "datetime",
"bool": "boolean",
"date": "datetime",
}
api_type = type_mapping.get(col["type"], col["type"])
api_column = {
"pos": i,
"name": col["name"],
"type": api_type,
"nullable": not col.get("required", True),
}
if "description" in col:
api_column["description"] = col["description"]
api_columns.append(api_column)
# Import here to avoid circular imports
from ..resources.dataset_schema_versions import DatasetSchemaVersions
# Create using internal resource
schema_versions = DatasetSchemaVersions(self._client)
return schema_versions.create(
dataset_id=self.dataset_id,
columns=api_columns,
status=status,
parsing_options=parsing_options,
)
[docs]
def create_schema_from_sample(
self,
sample_file: Union[str, Path, bytes],
*,
status: str = "draft",
header_row: int = 1,
sample_size: int = 100,
) -> "DatasetSchemaVersionModel":
"""Create a schema by analyzing a sample data file.
Args:
sample_file: Sample data to analyze. Can be:
- str/Path: Path to CSV file
- bytes: Raw CSV data as bytes
status: Initial status ('draft' or 'current'). Defaults to 'draft'.
header_row: Row containing column headers (1-based). Defaults to 1.
sample_size: Number of rows to sample for type inference. Defaults to 100.
Returns:
The created schema version model.
Raises:
ValueError: If file format is not supported or file is invalid.
"""
# Prepare sample file using shared logic
file_data, file_name, file_type, _, _, _ = DatasetOperations.validate_and_prepare_file(
sample_file
)
# Infer schema using shared logic
inferred_columns = DatasetOperations.infer_schema_from_sample(
file_data, file_type, sample_size
)
# Create schema with inferred columns
return self.create_schema(
columns=inferred_columns,
status=status,
)
def _infer_column_type(self, values: List[str]) -> str:
"""Infer the data type of a column from sample values."""
if not values:
return "string"
# Try integer
int_count = 0
for value in values:
try:
int(value)
int_count += 1
except ValueError:
pass
if int_count == len(values):
return "integer"
# Try float
float_count = 0
for value in values:
try:
float(value)
float_count += 1
except ValueError:
pass
if float_count == len(values):
return "float"
# Try boolean
bool_values = {"true", "false", "1", "0", "yes", "no", "y", "n"}
bool_count = sum(1 for v in values if v.lower() in bool_values)
if bool_count == len(values):
return "boolean"
# Try datetime (simple patterns)
datetime_patterns = [
r'\d{4}-\d{2}-\d{2}', # YYYY-MM-DD
r'\d{2}/\d{2}/\d{4}', # MM/DD/YYYY
r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', # YYYY-MM-DD HH:MM:SS
]
datetime_count = 0
for value in values:
if any(re.match(pattern, value) for pattern in datetime_patterns):
datetime_count += 1
if datetime_count >= len(values) * 0.8: # 80% match threshold
return "datetime"
# Default to string
return "string"
[docs]
def schema_builder(self) -> "SchemaBuilder":
"""Get a schema builder for creating complex schemas with fluent API.
Returns:
Schema builder instance for this dataset.
Example:
```python
schema = (dataset.schema_builder()
.add_column("id", "int", required=True)
.add_column("name", "string")
.with_parsing(delimiter=",", header_row=1)
.as_current()
.create())
```
"""
from .dataset_schema_version import SchemaBuilder
return SchemaBuilder(self.dataset_id, self._client)
[docs]
def list_schema_versions(
self,
*,
status: Optional[str] = None,
page: int = 1,
page_size: int = 10,
) -> List["DatasetSchemaVersionModel"]:
"""List schema versions for this dataset.
Args:
status: Filter by status ('draft', 'current', 'archived').
page: Page number (1-based). Defaults to 1.
page_size: Number of items per page. Defaults to 10.
Returns:
List of schema version models.
"""
# Import here to avoid circular imports
from ..resources.dataset_schema_versions import DatasetSchemaVersions
schema_versions = DatasetSchemaVersions(self._client)
return schema_versions.list(
dataset_ids=[self.dataset_id],
status=status,
page=page,
page_size=page_size,
)
[docs]
def get_current_schema(self) -> Optional["DatasetSchemaVersionModel"]:
"""Get the current (active) schema version for this dataset.
Returns:
The current schema version model, or None if no current schema exists.
"""
current_schemas = self.list_schema_versions(status="current", page_size=1)
return current_schemas[0] if current_schemas else None
[docs]
def get_schema_version(self, schema_version_id: str) -> "DatasetSchemaVersionModel":
"""Get a specific schema version by ID.
Args:
schema_version_id: The schema version ID to retrieve.
Returns:
The schema version model.
"""
# Import here to avoid circular imports
from ..resources.dataset_schema_versions import DatasetSchemaVersions
schema_versions = DatasetSchemaVersions(self._client)
return schema_versions.retrieve(schema_version_id)