Source code for moderatelyai_sdk.resources.files

"""Files resource for the Moderately AI API."""

import base64
import hashlib
import mimetypes
from pathlib import Path
from typing import Any, Dict, Optional, Union

import httpx

from ..exceptions import APIError
from ..models.file import FileModel
from ._base import BaseResource


[docs] class Files(BaseResource): """Manage files in your team. The Files resource provides methods for uploading, downloading, listing, and managing files. All methods return FileModel instances which provide rich functionality for file operations. Key Features: - Upload files with automatic MIME type detection - Download files to memory or disk - List and filter files with pagination - Rich file type detection (CSV, images, documents, etc.) - Automatic presigned URL handling for secure transfers Examples: ```python # Upload a file and get a FileModel instance file = client.files.upload( file="/path/to/data.csv", name="Dataset" ) # Use rich FileModel methods if file.is_ready() and file.is_csv(): content = file.download() # Download to memory file.download(path="./local_copy.csv") # Download to disk # List files with filtering files_response = client.files.list( mime_type="text/csv", page_size=20 ) csv_files = files_response["items"] # List of FileModel instances # Get a specific file file = client.files.retrieve("file_123") print(f"File: {file.name} ({file.file_size} bytes)") # Delete files file.delete() # Using FileModel method # OR client.files.delete("file_123") # Using resource method ``` """
[docs] def list( self, *, dataset_id: Optional[str] = None, status: Optional[str] = None, mime_type: Optional[str] = None, file_hashes: Optional[str] = None, page: int = 1, page_size: int = 10, order_by: str = "created_at", order_direction: str = "desc", ) -> Dict[str, Any]: """List all files with pagination and filtering. Returns a paginated response containing FileModel instances. Results are automatically filtered to the team specified in the client configuration. Args: dataset_id: Filter files by dataset ID. status: Filter files by status (e.g., "completed", "processing", "error"). mime_type: Filter files by MIME type (e.g., "text/csv", "application/pdf"). file_hashes: Filter files by SHA256 hash. Can be a single hash string. page: Page number (1-based). Defaults to 1. page_size: Number of items per page (max 100). Defaults to 10. order_by: Field to sort by. Defaults to "created_at". order_direction: Sort direction ("asc" or "desc"). Defaults to "desc". Returns: Dictionary with "items" (list of FileModel instances) and "pagination" info. Example: ```python # List recent CSV files response = client.files.list( mime_type="text/csv", page_size=20, order_direction="desc" ) csv_files = response["items"] # List of FileModel instances for file in csv_files: if file.is_ready(): print(f"Ready: {file.name} ({file.file_size} bytes)") ``` """ query = { "page": page, "page_size": page_size, "order_by": order_by, "order_direction": order_direction, } # Don't add team_ids here since it's handled by default_query in client if dataset_id is not None: query["dataset_id"] = dataset_id if status is not None: query["status"] = status if mime_type is not None: query["mime_type"] = mime_type if file_hashes is not None: query["fileHashes"] = file_hashes response = self._get( "/files", options={"query": query}, ) # Convert items to FileModel instances if "items" in response: response["items"] = [ FileModel(item, self._client) for item in response["items"] ] return response
[docs] def retrieve(self, file_id: str) -> FileModel: """Retrieve a specific file by ID. Args: file_id: The ID of the file to retrieve. Returns: FileModel instance with rich file operations. Raises: NotFoundError: If the file doesn't exist. Example: ```python file = client.files.retrieve("file_123") print(f"File: {file.name} ({file.mime_type})") if file.is_ready(): content = file.download() ``` """ data = self._get(f"/files/{file_id}") return FileModel(data, self._client)
[docs] def upload( self, file: Union[str, Path, bytes, Any], *, name: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> FileModel: """Upload a file using secure presigned URL workflow. Accepts files in multiple convenient formats and handles all the complexity of secure upload automatically, including SHA256 hashing, MIME type detection, and presigned URL generation. Supported file inputs: - File paths (str or Path objects) - Raw bytes data - File-like objects with .read() method (buffers, streams) Args: file: The file to upload in any supported format name: Custom display name for the file. If not provided, uses the filename from path or defaults to a generic name. metadata: Additional metadata dictionary to store with the file. **kwargs: Additional file properties. Returns: FileModel instance representing the uploaded file with rich operations. Raises: ValueError: If file is invalid, not found, or unsupported format. APIError: If upload process fails at any step. Examples: ```python # Upload from file path file = client.files.upload("/path/to/document.pdf") # Upload with custom name and metadata file = client.files.upload( file="data.csv", name="Customer Data", metadata={"category": "sales", "quarter": "Q1"} ) # Upload raw bytes with open("image.jpg", "rb") as f: file = client.files.upload( file=f.read(), name="Profile Picture" ) # Upload from file-like object import io buffer = io.BytesIO(b"Hello, World!") file = client.files.upload(buffer, name="greeting.txt") # Use the returned FileModel if file.is_ready(): print(f"Uploaded: {file.name} ({file.file_size} bytes)") ``` """ # Step 1: Process the file input to get bytes and metadata file_data: bytes file_name: str if isinstance(file, (str, Path)): # Handle file path file_path = Path(file) if not file_path.exists(): raise ValueError(f"File not found: {file_path}") with open(file_path, "rb") as f: file_data = f.read() # If custom name provided, preserve the original extension if name: file_extension = file_path.suffix if not name.endswith(file_extension): file_name = f"{name}{file_extension}" else: file_name = name else: file_name = file_path.name elif isinstance(file, bytes): # Handle raw bytes file_data = file file_name = name or "uploaded_file" elif hasattr(file, "read"): # Handle file-like object (buffer) file_data = file.read() if isinstance(file_data, str): file_data = file_data.encode("utf-8") # Try to get filename from buffer object buffer_name = getattr(file, "name", None) if buffer_name and not name: file_name = Path(buffer_name).name else: file_name = name or "uploaded_file" else: raise ValueError( f"Unsupported file type: {type(file)}. Must be str, Path, bytes, or file-like object." ) # Step 2: Calculate file properties file_size = len(file_data) file_hash = hashlib.sha256(file_data).hexdigest() # Auto-detect MIME type mime_type, _ = mimetypes.guess_type(file_name) if not mime_type: mime_type = "application/octet-stream" # Step 3: Get presigned upload URL upload_request = { "fileName": file_name, "fileSize": file_size, "fileHash": file_hash, "mimeType": mime_type, "teamId": self._client.team_id, } if metadata: upload_request["metadata"] = metadata # Get the presigned URL upload_response = self._post("/files/upload-url", body=upload_request) file_info = upload_response["file"] presigned_url = upload_response["uploadUrl"] file_id = file_info["fileId"] # Step 4: Upload file to presigned URL try: with httpx.Client() as client: upload_result = client.put( presigned_url, content=file_data, headers={"Content-Type": mime_type}, ) upload_result.raise_for_status() except Exception as e: raise APIError(f"Failed to upload file to presigned URL: {e}") from e # Step 5: Mark upload as complete try: complete_response = self._post( f"/files/{file_id}/complete", body={"fileSize": file_size, "fileHash": file_hash}, ) return FileModel(complete_response, self._client) except Exception as e: raise APIError(f"Failed to complete file upload: {e}") from e
[docs] def delete(self, file_id: str) -> None: """Delete a file permanently. This operation cannot be undone. The file will be removed from both the database and cloud storage. Consider using FileModel.delete() for better ergonomics. Args: file_id: The ID of the file to delete. Raises: NotFoundError: If the file doesn't exist. APIError: If deletion fails. Example: ```python # Delete using resource method client.files.delete("file_123") # OR delete using FileModel (recommended) file = client.files.retrieve("file_123") file.delete() ``` """ self._delete(f"/files/{file_id}")
[docs] def download( self, file_id: str, *, path: Optional[Union[str, Path]] = None ) -> Optional[bytes]: """Download file content. Note: Consider using FileModel.download() instead for better ergonomics: file = client.files.retrieve(file_id) content = file.download(path=path) Args: file_id: The ID of the file to download. path: Optional path to save the file. If provided, saves to this location. If not provided, returns the file content as bytes. Returns: If path is provided: None (file is saved to disk) If path is not provided: The file content as bytes Raises: NotFoundError: If the file doesn't exist. IOError: If unable to write to the specified path. """ # Get the file content from the API response = self._get(f"/files/{file_id}/download") # Parse the response to get file content file_data: bytes if isinstance(response, dict) and "downloadUrl" in response: # If API returns a download URL, we need to fetch it try: with httpx.Client() as client: download_response = client.get(response["downloadUrl"]) download_response.raise_for_status() file_data = download_response.content except httpx.HTTPError as e: raise APIError(f"Failed to download file from URL: {e}") from e elif isinstance(response, dict) and "content" in response: # If API returns base64 encoded content file_data = base64.b64decode(response["content"]) else: # Assume response is already binary data file_data = ( response if isinstance(response, bytes) else str(response).encode() ) # Save to file or return bytes if path is not None: file_path = Path(path) # Create parent directories if they don't exist file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, "wb") as f: f.write(file_data) return None else: return file_data