Cluster Protocol
Back to API Docs

Python SDK

pip install cluster-sdk — A Pythonic wrapper for the Cluster Protocol API

Installation

Install the Cluster SDK from PyPI. Requires Python 3.8+.

bash
pip install cluster-sdk

Or with optional async support:

bash
pip install "cluster-sdk[async]"

Authentication

Initialize the client with your API key. You can also set the CLUSTER_API_KEY environment variable.

from cluster_sdk import ClusterClient

client = ClusterClient(api_key="sk-cluster-YOUR_KEY")

Chat Completions

The SDK mirrors the OpenAI API structure. Use client.chat.completions.create() for inference.

python
from cluster_sdk import ClusterClient

client = ClusterClient(api_key="sk-cluster-YOUR_KEY")

response = client.chat.completions.create(
    model="llama-3.1-70b",
    messages=[
        {"role": "system", "content": "You are a helpful coding assistant."},
        {"role": "user", "content": "Write a Python function to check if a number is prime."}
    ],
    temperature=0.7,
    max_tokens=512
)

# Access the response
message = response.choices[0].message
print(message.content)

# Check usage
print(f"Tokens used: {response.usage.total_tokens}")
print(f"Cost: $\{response.usage.total_cost}")

Multi-turn Conversations

python
messages = [
    {"role": "system", "content": "You are a helpful assistant."}
]

# First turn
messages.append({"role": "user", "content": "What is Python?"})
response = client.chat.completions.create(
    model="llama-3.1-70b",
    messages=messages
)
messages.append({"role": "assistant", "content": response.choices[0].message.content})

# Second turn
messages.append({"role": "user", "content": "What makes it good for AI?"})
response = client.chat.completions.create(
    model="llama-3.1-70b",
    messages=messages
)
print(response.choices[0].message.content)

Streaming

Pass stream=True to receive tokens incrementally. The SDK returns an iterator of chunk objects.

python
stream = client.chat.completions.create(
    model="llama-3.1-70b",
    messages=[{"role": "user", "content": "Write a poem about distributed computing"}],
    stream=True
)

full_response = ""
for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)
        full_response += delta.content

print()
print(f"\nFull response length: {len(full_response)} chars")

Streaming with Error Handling

python
from cluster_sdk import ClusterClient
from cluster_sdk.errors import ClusterAPIError, InsufficientBalanceError

client = ClusterClient(api_key="sk-cluster-YOUR_KEY")

try:
    stream = client.chat.completions.create(
        model="llama-3.1-70b",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True
    )
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
except InsufficientBalanceError:
    print("Please deposit more funds.")
except ClusterAPIError as e:
    print(f"API error {e.status_code}: {e.message}")

Async Usage

Use AsyncClusterClient for async/await code. All methods have the same signature as the sync client.

import asyncio
from cluster_sdk import AsyncClusterClient

async def main():
    client = AsyncClusterClient(api_key="sk-cluster-YOUR_KEY")

    response = await client.chat.completions.create(
        model="llama-3.1-70b",
        messages=[{"role": "user", "content": "Hello!"}]
    )
    print(response.choices[0].message.content)

asyncio.run(main())

Models

List available models, get details, upload custom models, and manage deployments.

models = client.models.list()

for model in models:
    print(f"{model.id}: {model.name}")
    print(f"  Category: {model.category}")
    print(f"  Input:  $\{model.input_price}/1K tokens")
    print(f"  Output: $\{model.output_price}/1K tokens")
    print(f"  Runs:   {model.total_runs}")
    print()

Fine-Tuning

Create and manage fine-tuning jobs with LoRA adapters.

python
# Create a fine-tuning job
job = client.fine_tuning.create(
    base_model="llama-3.1-8b",
    training_file="https://example.com/training-data.jsonl",
    suffix="my-custom-model-v1",
    epochs=3,
    learning_rate=2e-5,
    lora_rank=16,
    lora_alpha=32
)

print(f"Job ID: {job.id}")
print(f"Status: {job.status}")

# Monitor progress
import time
while job.status in ("queued", "running"):
    time.sleep(30)
    job = client.fine_tuning.get(job.id)
    if job.progress:
        print(f"Progress: {job.progress * 100:.0f}%  Loss: {job.train_loss:.4f}")

print(f"Done! Fine-tuned model: {job.fine_tuned_model}")

# Use your fine-tuned model
response = client.chat.completions.create(
    model=job.fine_tuned_model,
    messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)

List and Cancel Jobs

python
# List all jobs
jobs = client.fine_tuning.list()
for job in jobs:
    print(f"{job.id}: {job.status} ({job.base_model})")

# Cancel a running job
client.fine_tuning.cancel("ft_job_abc123")

Error Handling

The SDK raises typed exceptions for all API errors. Catch specific errors for graceful handling.

python
from cluster_sdk import ClusterClient
from cluster_sdk.errors import (
    ClusterAPIError,          # Base class for all API errors
    AuthenticationError,      # 401 - Invalid API key
    InsufficientBalanceError, # 402 - Not enough funds
    NotFoundError,            # 404 - Model/resource not found
    RateLimitError,           # 429 - Too many requests
    ServerError,              # 500+ - Internal errors
)

client = ClusterClient(api_key="sk-cluster-YOUR_KEY")

try:
    response = client.chat.completions.create(
        model="llama-3.1-70b",
        messages=[{"role": "user", "content": "Hello!"}]
    )
except AuthenticationError:
    print("Invalid API key. Check your credentials.")
except InsufficientBalanceError as e:
    print(f"Low balance. Current: {e.balance}, needed: {e.required}")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
except NotFoundError:
    print("Model not found. Check available models with client.models.list()")
except ServerError:
    print("Server error. Please retry.")
except ClusterAPIError as e:
    print(f"Unexpected error {e.status_code}: {e.message}")

Advanced Configuration

Configure timeouts, retries, and custom HTTP settings.

python
from cluster_sdk import ClusterClient

client = ClusterClient(
    api_key="sk-cluster-YOUR_KEY",
    base_url="https://api.clusterprotocol.ai",

    # Timeout settings (seconds)
    timeout=60,
    connect_timeout=10,

    # Automatic retries on 5xx errors
    max_retries=3,
    retry_delay=1.0,  # seconds between retries

    # Custom headers (forwarded on every request)
    default_headers={
        "X-Custom-Header": "my-value"
    }
)

OpenAI Compatibility

Since Cluster Protocol is OpenAI-compatible, you can also use the official OpenAI Python SDK by pointing it to our base URL:

python
from openai import OpenAI

# Use the official OpenAI SDK with Cluster Protocol
client = OpenAI(
    api_key="sk-cluster-YOUR_KEY",
    base_url="https://api.clusterprotocol.ai/v1"
)

# Everything works the same as OpenAI
response = client.chat.completions.create(
    model="llama-3.1-70b",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")