Skip to main content

Python SDK

The Python SDK provides a Pythonic interface to the Covia Grid with full type safety, sync and async support, and idiomatic error handling.

Installation

pip install covia

For development:

pip install covia[dev]

Quick Start

Connecting to a Venue

from covia import Grid

# Connect using a URL
venue = Grid.connect("https://venue-test.covia.ai")

# Connect using a DID
venue = Grid.connect("did:web:venue-test.covia.ai")

# Use as a context manager for automatic cleanup
with Grid.connect("https://venue-test.covia.ai") as venue:
status = venue.status()
print(status.name)

Invoking Operations

with Grid.connect("https://venue-test.covia.ai") as venue:
# Invoke and get a Job for tracking
job = venue.invoke("my-operation", {"prompt": "hello"})
job.wait(timeout=60)
print(job.output)

# Or use run() for synchronous one-shot execution
result = venue.run("my-operation", {"prompt": "hello"}, timeout=60)

Discovering Assets

# List all assets
assets = venue.list_assets()
print(f"Total: {assets.total}")

# With pagination
assets = venue.list_assets(offset=10, limit=20)

# Get a specific asset by ID
asset = venue.get_asset("abc123def456...")
print(asset.name)
print(asset.description)
print(asset.metadata)

Working with Asset Content

# Download binary content
content = venue.get_asset_content("abc123def456...")

# Upload content
content_hash = venue.put_asset_content("abc123def456...", b"file contents")

# Or use the Asset object directly
asset = venue.get_asset("abc123def456...")
content = asset.get_content()
asset.put_content(b"updated content")

Registering Assets

from covia import Asset

# Register an Asset object (recommended)
asset = venue.register(
Asset({
"name": "My Dataset",
"description": "Sample data for analysis",
"content": {"contentType": "text/csv"},
})
)
print(asset.id) # server-assigned content-addressed ID
print(asset.name) # "My Dataset"

# Or pass a plain dict
asset = venue.register({
"name": "My Dataset",
"description": "Sample data for analysis",
})

Authentication

The SDK supports pluggable authentication via the auth parameter on Grid.connect().

Bearer Token

from covia import Grid
from covia.auth import BearerAuth

venue = Grid.connect(
"https://venue-test.covia.ai",
auth=BearerAuth("your-token-here")
)

HTTP Basic

from covia.auth import BasicAuth

venue = Grid.connect(
"https://venue-test.covia.ai",
auth=BasicAuth("username", "password")
)

No Authentication

from covia.auth import NoAuth

venue = Grid.connect("https://venue-test.covia.ai", auth=NoAuth())

This is the default when auth is not specified.

Custom Authentication

Implement the Auth interface for custom schemes (OAuth 2.0, Ed25519 signing, etc.):

from covia.auth import Auth

class OAuth2Auth(Auth):
def __init__(self, access_token: str):
self._token = access_token

def apply(self, headers: dict[str, str]) -> None:
headers["Authorization"] = f"Bearer {self._token}"

def __repr__(self) -> str:
return "OAuth2Auth(***)"

Job Lifecycle

Jobs represent the execution of an operation. They progress through a lifecycle:

PENDING → STARTED → COMPLETE | FAILED | CANCELLED | REJECTED | TIMEOUT

Jobs may also enter interactive states: PAUSED, INPUT_REQUIRED, AUTH_REQUIRED.

Tracking Jobs

job = venue.invoke("my-operation", {"x": 1})

# Check status
print(job.status) # JobStatus.PENDING
print(job.is_finished) # False
print(job.is_complete) # False

# Poll for latest status
job.refresh()

# Wait for completion with exponential backoff
job.wait(timeout=120)

# Access the result
if job.is_complete:
print(job.output)
else:
print(job.error)

Convenience Methods

# wait + output in one call
output = job.result(timeout=60)

# invoke + wait + output in one call
output = venue.run("my-operation", {"x": 1}, timeout=60)

Cancelling Jobs

job = venue.invoke("long-running-op", {"data": "..."})
# ... later
job.cancel()

SSE Streaming

for event in job.stream():
print(f"Event: {event.event}, Data: {event.data}")

Managing Jobs

# List all job IDs
job_ids = venue.list_jobs()

# Get a specific job
job = venue.get_job("job-id-here")

# Delete a job record
venue.delete_job("job-id-here")

Operations

Operations are named capabilities exposed by a venue.

# List all available operations
operations = venue.list_operations()
for op in operations:
print(f"{op.name}: {op.description}")

# Get details of a specific operation
op = venue.get_operation("test:echo")

# Invoke by operation name
job = venue.invoke("test:echo", {"message": "hello"})

# Invoke by asset ID
job = venue.invoke("abc123def456...")

# Invoke by DID URL
job = venue.invoke("did:key:z6Mk.../a/abc123def456...")

Discovery

Venues expose standard discovery endpoints for interoperability.

# DID Document (W3C Decentralized Identifier)
doc = venue.did_document()
print(doc.id) # "did:web:venue-test.covia.ai"
print(doc.context) # "https://www.w3.org/ns/did/v1"

# Cached DID (fetched once, then cached)
did = venue.did # "did:web:venue-test.covia.ai"

# MCP Discovery (Model Context Protocol)
mcp = venue.mcp_discovery()
print(mcp.mcp_version)

# A2A Agent Card
card = venue.agent_card()
print(card.agentProvider)

Async API

The SDK provides a full async mirror of the sync API via covia.async_api.

import asyncio
from covia.async_api import AsyncGrid

async def main():
async with AsyncGrid.connect("https://venue-test.covia.ai") as venue:
# All methods are awaitable
status = await venue.status()
result = await venue.run("my-operation", {"prompt": "hello"})
print(result)

asyncio.run(main())

Concurrent Operations

async def run_parallel():
async with AsyncGrid.connect("https://venue-test.covia.ai") as venue:
# Launch multiple operations concurrently
jobs = await asyncio.gather(
venue.invoke("op-a", {"x": 1}),
venue.invoke("op-b", {"x": 2}),
venue.invoke("op-c", {"x": 3}),
)

# Wait for all results
results = await asyncio.gather(
*[job.result(timeout=60) for job in jobs]
)
return results

Async Authentication

Authentication works identically with the async API:

from covia.async_api import AsyncGrid
from covia.auth import BearerAuth

async with AsyncGrid.connect(
"https://venue-test.covia.ai",
auth=BearerAuth("token")
) as venue:
...

Error Handling

The SDK provides a structured exception hierarchy:

CoviaError                          # Base for all SDK errors
├── GridError # HTTP error responses (4xx/5xx)
│ └── NotFoundError # 404 responses
│ ├── AssetNotFoundError # Asset not found
│ └── JobNotFoundError # Job not found
├── CoviaConnectionError # Connection failures (also: ConnectionError)
├── CoviaTimeoutError # Timeouts (also: TimeoutError)
└── JobFailedError # Job finished with non-COMPLETE status

CoviaConnectionError and CoviaTimeoutError also subclass Python's built-in ConnectionError and TimeoutError, so you can catch them idiomatically:

from covia import (
Grid,
CoviaError,
GridError,
NotFoundError,
AssetNotFoundError,
JobNotFoundError,
CoviaConnectionError,
CoviaTimeoutError,
JobFailedError,
)

try:
with Grid.connect("https://venue-test.covia.ai") as venue:
result = venue.run("my-operation", {"x": 1}, timeout=30)
except AssetNotFoundError as e:
print(f"Asset not found: {e.asset_id}")
except JobNotFoundError as e:
print(f"Job not found: {e.job_id}")
except NotFoundError:
print("Resource not found")
except GridError as e:
print(f"API error {e.status_code}: {e.message}")
except JobFailedError as e:
print(f"Job failed: {e.job_data.error}")
except CoviaConnectionError:
print("Cannot connect to venue")
except CoviaTimeoutError:
print("Operation timed out")

# Or catch all SDK errors
except CoviaError:
print("Something went wrong")

# Or use Python built-in exception types
except TimeoutError:
print("Timed out")
except ConnectionError:
print("Connection failed")

Configuration

Timeouts

# Custom timeout (applies to connect, read, and write)
venue = Grid.connect("https://venue-test.covia.ai", timeout=60)

Custom Headers

venue = Grid.connect(
"https://venue-test.covia.ai",
headers={"X-Request-ID": "abc123"}
)

Combining Options

from covia.auth import BearerAuth

venue = Grid.connect(
"did:web:venue-test.covia.ai",
timeout=30,
headers={"X-Tenant": "acme"},
auth=BearerAuth("token"),
)

Logging

The SDK uses Python's standard logging module. By default, no output is produced (the library registers a NullHandler).

Enable debug logging to trace HTTP requests, job polling, and DID resolution:

import logging
logging.basicConfig(level=logging.DEBUG)

Or target just the SDK:

import logging
logging.getLogger("covia").setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(name)s %(levelname)s %(message)s"))
logging.getLogger("covia").addHandler(handler)

Log Levels

LevelWhat's Logged
DEBUGHTTP requests/responses, job polling cycles, DID resolution, connections
WARNINGPlain HTTP connections (no TLS)

Module Structure

covia
├── Grid # Entry point — Grid.connect()
├── Venue # Venue interaction (assets, jobs, operations, discovery)
├── Job # Job lifecycle (wait, result, cancel, stream)
├── Asset # Asset metadata and content
├── JobStatus # Status enum (PENDING, STARTED, COMPLETE, ...)
├── auth # Auth, NoAuth, BearerAuth, BasicAuth
├── exceptions # CoviaError, GridError, NotFoundError, ...
├── models # Pydantic data models (VenueStatus, JobData, ...)
└── async_api # Full async mirror
├── AsyncGrid
├── AsyncVenue
└── AsyncJob