Skip to content

Providers Module

Import: from selectools.providers import OpenAIProvider

Stability: stable

providers_quickstart.py
from selectools import Agent, AgentConfig, Message, Role, tool
from selectools.providers.stubs import LocalProvider

@tool(description="Get the weather for a city")
def get_weather(city: str) -> str:
    return f"The weather in {city} is sunny, 22C."

# LocalProvider requires no API key — perfect for testing
provider = LocalProvider()

agent = Agent(
    tools=[get_weather],
    provider=provider,
    config=AgentConfig(max_iterations=2),
)

result = agent.run([Message(role=Role.USER, content="What is the weather in Tokyo?")])
print(result.content)

# In production, swap to a real provider with one line:
# from selectools.providers import OpenAIProvider
# provider = OpenAIProvider()  # uses OPENAI_API_KEY env var

See Also

  • Models - 115-model registry with pricing data
  • Usage - Automatic token counting and cost tracking

Directory: src/selectools/providers/ Files: base.py, openai_provider.py, azure_openai_provider.py, anthropic_provider.py, gemini_provider.py, ollama_provider.py, fallback.py

Table of Contents

  1. Overview
  2. Provider Protocol
  3. Provider Implementations
  4. Message Formatting
  5. Native Tool Calling
  6. Cost Calculation
  7. Implementation Details

Overview

Providers are adapters that translate between selectools' unified interface and specific LLM APIs. They handle:

  • API authentication and configuration
  • Message format conversion
  • Role mapping
  • Image encoding (for vision models)
  • Streaming implementation
  • Usage statistics extraction
  • Error handling

Design Goal

Provider Agnosticism: Switch LLM backends with one line of code, no refactoring required.


Provider Protocol

Interface Definition

from typing import Protocol, runtime_checkable, List, Optional, Union, AsyncGenerator
from ..types import Message, ToolCall
from ..tools import Tool
from ..usage import UsageStats

@runtime_checkable
class Provider(Protocol):
    """Interface every provider adapter must satisfy."""

    name: str                    # Provider identifier
    supports_streaming: bool     # Can stream responses
    supports_async: bool = False # Has async methods

    def complete(
        self,
        *,
        model: str,
        system_prompt: str,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,  # Native tool calling
        temperature: float = 0.0,
        max_tokens: int = 1000,
        timeout: float | None = None,
    ) -> tuple[Message, UsageStats]:
        """Return assistant Message (with optional tool_calls) and usage stats.

        Note: Message.content may be None when the LLM responds with only
        tool_calls. The agent normalizes None content to "" internally.
        """
        ...

    def stream(self, *, model, system_prompt, messages, **kwargs):
        """Yield assistant text chunks (no usage stats)."""
        ...

    async def acomplete(
        self,
        *,
        model: str,
        system_prompt: str,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,
        temperature: float = 0.0,
        max_tokens: int = 1000,
        timeout: float | None = None,
    ) -> tuple[Message, UsageStats]:
        """Async version of complete()."""
        ...

    async def astream(
        self,
        *,
        model: str,
        system_prompt: str,
        messages: List[Message],
        tools: Optional[List[Tool]] = None,
        temperature: float = 0.0,
        max_tokens: int = 1000,
        timeout: float | None = None,
    ) -> AsyncGenerator[Union[str, ToolCall], None]:
        """Async streaming with native tool call support.

        Yields:
            str: Text content deltas
            ToolCall: Complete tool call objects when ready
        """
        ...

Key Requirements

  1. Sync Methods: complete() and stream() must be implemented
  2. Return Types: complete() returns (Message, UsageStats) — Message may contain tool_calls
  3. Streaming: stream() yields strings; astream() yields Union[str, ToolCall]
  4. Native Tool Calling: Pass tools parameter for provider-native function calling
  5. Async: Recommended for performance; acomplete() and astream()

Provider Implementations

All providers support namespace imports from the selectools.providers package:

from selectools.providers import (
    OpenAIProvider, AzureOpenAIProvider, AnthropicProvider, GeminiProvider, OllamaProvider,
    LiteLLMProvider,
)

OpenAI Provider

from selectools.providers import OpenAIProvider
from selectools.models import OpenAI

provider = OpenAIProvider(
    api_key="sk-...",  # Or set OPENAI_API_KEY env var
    default_model=OpenAI.GPT_4O.id
)

# Features:
# - Streaming support
# - Async support (acomplete/astream)
# - Vision support (image_path in messages)
# - Full usage stats
# - Native tool calling (function calling API)
# - Auto max_tokens → max_completion_tokens for GPT-5/4.1/o-series

API: OpenAI Chat Completions API

Token Parameter Handling: Newer OpenAI models (GPT-5.x, GPT-4.1, o-series, codex) reject the legacy max_tokens parameter and require max_completion_tokens. The provider auto-detects the model family and sends the correct parameter — no user action needed.

Anthropic Provider

from selectools.providers import AnthropicProvider
from selectools.models import Anthropic

provider = AnthropicProvider(
    api_key="sk-ant-...",  # Or set ANTHROPIC_API_KEY
    default_model=Anthropic.SONNET_4_5.id
)

# Features:
# - Streaming support
# - Async support
# - Vision support (model-dependent)
# - Full usage stats
# - Native tool calling (function calling API)
# - Prompt caching (opt-in)

API: Anthropic Messages API

Prompt Caching: Opt-in flags enable Anthropic prompt caching to cut cost and latency on repeated prefixes:

provider = AnthropicProvider(
    cache_system=True,  # system prompt sent in block form with cache_control
    cache_tools=True,   # cache_control marker on the last tool (caches the whole list)
)

msg, usage = provider.complete(model="", system_prompt="...", messages=[...])
usage.cache_creation_input_tokens  # tokens written to the cache (None if not reported)
usage.cache_read_input_tokens      # tokens served from the cache (None if not reported)

Both flags default to False (behavior unchanged). Anthropic enforces minimum cacheable prefix sizes, so caching short prompts has no effect — enable these only when the system prompt or tool list is large and stable.

Gemini Provider

from selectools.providers import GeminiProvider
from selectools.models import Gemini

provider = GeminiProvider(
    api_key="...",  # Or set GEMINI_API_KEY or GOOGLE_API_KEY
    default_model=Gemini.FLASH_2_5.id
)

# Features:
# - Streaming support
# - Async support
# - Vision support (model-dependent)
# - Free embeddings
# - Native tool calling (function calling API)

API: Google Generative AI

Ollama Provider

from selectools.providers import OllamaProvider
from selectools.models import Ollama

provider = OllamaProvider(
    host="http://localhost:11434",  # Default
    default_model=Ollama.LLAMA_3_2.id
)

# Features:
# - Local execution (privacy-first)
# - Zero cost
# - Streaming support
# - No API key required

API: Ollama REST API

Implementation note: OpenAIProvider and OllamaProvider both inherit from _OpenAICompatibleBase (Template Method pattern), sharing message formatting, response parsing, and streaming logic. Only pricing, error messages, and token parameter naming differ between them.

Azure OpenAI Provider (v0.21.0)

Stability: beta

from selectools.providers import AzureOpenAIProvider

provider = AzureOpenAIProvider(
    azure_endpoint="https://my-resource.openai.azure.com",
    api_key="...",                    # Or set AZURE_OPENAI_API_KEY env var
    azure_deployment="gpt-4o",        # Or set AZURE_OPENAI_DEPLOYMENT env var
    api_version="2024-10-21",         # Azure API version (default)
)

# Features:
# - Inherits all OpenAI capabilities (streaming, async, vision, tool calling)
# - Azure Active Directory (AAD) token authentication
# - Uses the openai SDK's built-in Azure support (no extra deps)

API: Azure OpenAI Service

Environment Variables:

Variable Description
AZURE_OPENAI_ENDPOINT Azure resource endpoint URL
AZURE_OPENAI_API_KEY Azure API key (can omit if using AAD token)
AZURE_OPENAI_DEPLOYMENT Default deployment name

AAD Token Authentication:

# Use Azure Active Directory instead of an API key
provider = AzureOpenAIProvider(
    azure_endpoint="https://my-resource.openai.azure.com",
    azure_ad_token="eyJ...",          # AAD token
    azure_deployment="gpt-4o",
)

Model Family Override (v0.22.0 — BUG-28):

Azure deployments use custom names that don't match model family prefixes. When deploying GPT-5-family models with non-standard deployment names, pass model_family explicitly to get the correct max_completion_tokens vs max_tokens handling:

# Deployment "prod-chat" runs gpt-5-mini, but the name doesn't match "gpt-5"
provider = AzureOpenAIProvider(
    azure_endpoint="https://my-resource.openai.azure.com",
    azure_deployment="prod-chat",
    model_family="gpt-5",            # Explicit family hint
)
# Now uses max_completion_tokens instead of max_tokens

Without model_family, selectools uses the deployment name for family detection. If the deployment name happens to start with the model family prefix (e.g., gpt-5-mini), no override is needed.

Implementation note: AzureOpenAIProvider extends OpenAIProvider, overriding only the client initialization to use AzureOpenAI / AsyncAzureOpenAI from the OpenAI SDK. All complete/stream/acomplete/astream behaviour is inherited.

LiteLLM Provider

Stability: beta

Instant access to 100+ models (DeepSeek, Groq, Mistral, Together, Cohere, Fireworks, Bedrock, ...) by delegating to the litellm library. litellm routes provider/model identifiers to the right backend and normalizes everything to the OpenAI wire format.

pip install selectools[litellm]
from selectools import Agent, AgentConfig
from selectools.providers import LiteLLMProvider

provider = LiteLLMProvider(model="deepseek/deepseek-chat")
provider = LiteLLMProvider(model="groq/llama-3.1-70b")
provider = LiteLLMProvider(model="bedrock/anthropic.claude-3-sonnet")

agent = Agent(
    tools,
    provider=provider,
    config=AgentConfig(model="groq/llama-3.1-70b"),  # match the provider model
)

# Features:
# - 100+ models through one adapter (litellm handles provider-specific quirks)
# - Full protocol: complete / acomplete / stream / astream
# - Native tool calling (OpenAI tool schema, translated per backend by litellm)
# - Cost tracking via litellm's own cost map

Configuration:

provider = LiteLLMProvider(
    model="groq/llama-3.1-70b",
    api_key="gsk_...",                  # Optional; litellm reads GROQ_API_KEY etc. when omitted
    api_base="https://my-proxy/v1",     # Optional gateway/proxy override
    drop_params=True,                   # Extra kwargs forwarded to every litellm call
)

Notes:

  • litellm is a lazy optional import: constructing the provider without it installed raises ImportError pointing at pip install selectools[litellm].
  • The agent passes AgentConfig.model to the provider on every call, so set it to the same provider/model string (or use model_selector to switch between litellm-routed models mid-run).
  • Reserved kwargs: per-call arguments built by the agent loop take precedence over **litellm_kwargs defaults, so the keys the base supplies on every call -- model, messages, stream, tools, temperature, max_tokens -- are reserved and raise ValueError at construction. Set temperature/max_tokens on AgentConfig instead.
  • Cost: UsageStats.cost_usd comes from litellm.cost_per_token (a local lookup against litellm's cost map, no extra API call). Models missing from the cost map report 0.0. Prompt-cache token fields stay None because litellm does not report cache usage uniformly across backends.
  • Native providers remain the choice for maximum control; LiteLLM is the long-tail solution.

Implementation note: LiteLLMProvider inherits the shared _OpenAICompatibleBase (same Template Method base as OpenAI/Ollama) and adapts litellm.completion / litellm.acompletion to the OpenAI SDK client surface through a small shim, so message formatting, streaming assembly, and malformed-tool-JSON handling are identical to the native OpenAI provider.

Local Provider (Testing)

from selectools.providers.stubs import LocalProvider

provider = LocalProvider()

# Features:
# - No network calls
# - No API costs
# - Returns user's last message
# - Perfect for testing

Message Formatting

Unified Message Format

from selectools.types import Message, Role

Message(role=Role.USER, content="Hello")
Message(role=Role.ASSISTANT, content="Hi there!")
Message(role=Role.TOOL, content="Result", tool_name="search")
Message(role=Role.USER, content="What's in this image?", image_path="./photo.jpg")

Provider-Specific Formatting

OpenAI Format

def _format_messages(self, system_prompt: str, messages: List[Message]):
    payload = [{"role": "system", "content": system_prompt}]

    for message in messages:
        role = message.role.value

        # Map TOOL role to ASSISTANT (OpenAI doesn't have TOOL role)
        if role == Role.TOOL.value:
            role = Role.ASSISTANT.value

        payload.append({
            "role": role,
            "content": self._format_content(message),
        })

    return payload

def _format_content(self, message: Message):
    if message.image_base64:
        # Vision: multimodal content
        return [
            {"type": "text", "text": message.content},
            {
                "type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{message.image_base64}"},
            },
        ]
    return message.content

Anthropic Format

def _format_messages(self, messages: List[Message]):
    formatted = []

    for message in messages:
        role = message.role.value

        # Anthropic uses "user" and "assistant" only
        if role == Role.TOOL.value:
            role = "assistant"

        formatted.append({
            "role": role,
            "content": message.content
        })

    return formatted

# System prompt is separate parameter
client.messages.create(
    model=model,
    system=system_prompt,  # Not in messages array
    messages=formatted
)

Gemini Format

def _format_messages(self, system_prompt: str, messages: List[Message]):
    # Gemini combines system and conversation
    formatted = [{"role": "user", "parts": [system_prompt]}]

    for message in messages:
        role = "user" if message.role == Role.USER else "model"

        formatted.append({
            "role": role,
            "parts": [message.content]
        })

    return formatted

Native Tool Calling

Overview

All providers support native function calling APIs, which provide structured tool calls directly in the response instead of requiring text parsing.

How It Works

  1. Agent passes tools parameter to complete()/acomplete()
  2. Provider converts tool schemas to provider-native format
  3. LLM returns structured tool calls in Message.tool_calls
  4. Agent detects tool_calls and executes them directly (no regex parsing needed)

Provider Formats

OpenAI

# Tools converted to OpenAI function format
tools=[{"type": "function", "function": {"name": "...", "parameters": {...}}}]

# Response contains tool_calls
response.choices[0].message.tool_calls  # List of tool call objects

Anthropic

# Tools converted to Anthropic tool format
tools=[{"name": "...", "description": "...", "input_schema": {...}}]

# Response contains tool_use content blocks
response.content  # May contain ToolUse blocks with name and input

Gemini

# Tools converted to Gemini function declarations
tools=[Tool(function_declarations=[...])]

# Response candidates contain function calls
response.candidates[0].content.parts  # May contain function_call parts

Fallback

If a provider doesn't support native tool calling (e.g., Ollama), or if native calls are not present in the response, the agent falls back to regex-based parsing via ToolCallParser.


Cost Calculation

Usage Stats Extraction

Each provider extracts token counts from API responses:

OpenAI

response = client.chat.completions.create(...)

usage_stats = UsageStats(
    prompt_tokens=response.usage.prompt_tokens,
    completion_tokens=response.usage.completion_tokens,
    total_tokens=response.usage.total_tokens,
    cost_usd=calculate_cost(model, prompt_tokens, completion_tokens),
    model=model,
    provider="openai"
)

Anthropic

response = client.messages.create(...)

usage_stats = UsageStats(
    prompt_tokens=response.usage.input_tokens,
    completion_tokens=response.usage.output_tokens,
    total_tokens=response.usage.input_tokens + response.usage.output_tokens,
    cost_usd=calculate_cost(model, input_tokens, output_tokens),
    model=model,
    provider="anthropic"
)

Gemini

response = model.generate_content(...)

usage_stats = UsageStats(
    prompt_tokens=response.usage_metadata.prompt_token_count,
    completion_tokens=response.usage_metadata.candidates_token_count,
    total_tokens=response.usage_metadata.total_token_count,
    cost_usd=calculate_cost(model, prompt_tokens, completion_tokens),
    model=model,
    provider="gemini"
)

Cost Calculation

from selectools.pricing import calculate_cost

cost = calculate_cost(
    model="gpt-4o",
    prompt_tokens=1000,
    completion_tokens=500
)

# Looks up pricing from models registry:
# OpenAI.GPT_4O: prompt_cost=2.50, completion_cost=10.00 per 1M tokens
# Cost = (1000/1M * 2.50) + (500/1M * 10.00) = $0.0025 + $0.005 = $0.0075

Implementation Details

OpenAI Provider

class OpenAIProvider(Provider):
    name = "openai"
    supports_streaming = True
    supports_async = True

    def __init__(self, api_key: str | None = None, default_model: str = "gpt-5-mini"):
        from openai import OpenAI, AsyncOpenAI

        self.api_key = api_key or os.getenv("OPENAI_API_KEY")
        if not self.api_key:
            raise ProviderConfigurationError(...)

        self._client = OpenAI(api_key=self.api_key)
        self._async_client = AsyncOpenAI(api_key=self.api_key)
        self.default_model = default_model

    def complete(self, *, model, system_prompt, messages, temperature, max_tokens, timeout):
        formatted = self._format_messages(system_prompt, messages)
        model_name = model or self.default_model

        # Auto-detect max_tokens vs max_completion_tokens per model family
        token_key = (
            "max_completion_tokens"
            if _uses_max_completion_tokens(model_name)
            else "max_tokens"
        )
        args = {
            "model": model_name,
            "messages": formatted,
            "temperature": temperature,
            token_key: max_tokens,
            "timeout": timeout,
        }

        response = self._client.chat.completions.create(**args)

        content = response.choices[0].message.content
        usage_stats = self._extract_usage(response, model_name)

        return content or "", usage_stats

    def stream(self, *, model, system_prompt, messages, temperature, max_tokens, timeout):
        formatted = self._format_messages(system_prompt, messages)
        model_name = model or self.default_model

        token_key = (
            "max_completion_tokens"
            if _uses_max_completion_tokens(model_name)
            else "max_tokens"
        )
        args = {
            "model": model_name,
            "messages": formatted,
            "temperature": temperature,
            token_key: max_tokens,
            "stream": True,
            "timeout": timeout,
        }

        response = self._client.chat.completions.create(**args)

        for chunk in response:
            delta = chunk.choices[0].delta
            if delta and delta.content:
                yield delta.content

Async Streaming (astream)

All providers implement astream() for E2E streaming with native tool support:

async def astream(self, *, model, system_prompt, messages, tools=None, ...):
    """Yield text deltas and ToolCall objects."""
    # Stream response from provider
    async for chunk in self._async_client.chat.completions.create(stream=True, ...):
        # Yield text deltas
        if delta.content:
            yield delta.content

        # Accumulate tool call deltas
        if delta.tool_calls:
            # ... accumulate until complete ...
            yield ToolCall(tool_name=name, parameters=args, id=tc_id)

The agent's astream() method consumes these and: - Yields StreamChunk objects for text - Executes tool calls when received - Continues the agent loop until completion

Error Handling

def complete(self, ...):
    try:
        response = self._client.chat.completions.create(...)
        return content, usage_stats
    except Exception as exc:
        raise ProviderError(f"OpenAI completion failed: {exc}") from exc

Async Implementation

async def acomplete(self, *, model, system_prompt, messages, ...):
    formatted = self._format_messages(system_prompt, messages)
    model_name = model or self.default_model

    token_key = (
        "max_completion_tokens"
        if _uses_max_completion_tokens(model_name)
        else "max_tokens"
    )
    args = {
        "model": model_name,
        "messages": formatted,
        "temperature": temperature,
        token_key: max_tokens,
        "timeout": timeout,
    }

    response = await self._async_client.chat.completions.create(**args)

    content = response.choices[0].message.content
    usage_stats = self._extract_usage(response, model_name)

    return content or "", usage_stats

Best Practices

1. Set API Keys via Environment

export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GEMINI_API_KEY="..."
# No need to pass api_key
provider = OpenAIProvider()

2. Use Model Constants

from selectools.models import OpenAI, Anthropic, Gemini

# ✅ Good - Type-safe, autocomplete
provider = OpenAIProvider(default_model=OpenAI.GPT_4O_MINI.id)

# ❌ Bad - Prone to typos
provider = OpenAIProvider(default_model="gpt-4o-mini")

3. Handle Provider Errors

from selectools.providers.base import ProviderError

try:
    response, stats = provider.complete(...)
except ProviderError as e:
    logger.error(f"Provider failed: {e}")
    # Fallback logic

4. Test with Local Provider

from selectools.providers.stubs import LocalProvider

# Development/testing
if os.getenv("ENV") == "test":
    provider = LocalProvider()
else:
    provider = OpenAIProvider()

Adding a New Provider

Steps

  1. Create provider file in src/selectools/providers/
  2. Implement Provider protocol
  3. Handle message formatting
  4. Extract usage stats
  5. Add to exports in __init__.py

Template

from ..types import Message
from ..usage import UsageStats
from ..pricing import calculate_cost
from .base import Provider, ProviderError

class MyProvider(Provider):
    name = "my_provider"
    supports_streaming = True
    supports_async = False

    def __init__(self, api_key: str, default_model: str = "default-model"):
        self.api_key = api_key
        self.default_model = default_model
        # Initialize client

    def complete(self, *, model, system_prompt, messages, temperature, max_tokens, timeout):
        # Format messages
        formatted = self._format_messages(system_prompt, messages)

        try:
            # Call API
            response = self.client.complete(...)

            # Extract content
            content = response.text

            # Extract usage
            usage_stats = UsageStats(
                prompt_tokens=response.prompt_tokens,
                completion_tokens=response.completion_tokens,
                total_tokens=response.total_tokens,
                cost_usd=calculate_cost(model, ...),
                model=model,
                provider=self.name
            )

            return content, usage_stats

        except Exception as exc:
            raise ProviderError(f"{self.name} failed: {exc}") from exc

    def stream(self, ...):
        # Stream implementation
        for chunk in response:
            yield chunk.text

    def _format_messages(self, system_prompt, messages):
        # Convert to provider's format
        pass

Testing

def test_openai_provider():
    provider = OpenAIProvider(api_key="test-key", default_model="gpt-4o-mini")

    messages = [Message(role=Role.USER, content="Hello")]

    response, stats = provider.complete(
        model="gpt-4o-mini",
        system_prompt="You are helpful",
        messages=messages,
        temperature=0.0,
        max_tokens=100
    )

    assert isinstance(response, str)
    assert stats.total_tokens > 0
    assert stats.cost_usd >= 0

def test_provider_switching():
    # Same agent code works with any provider
    for provider in [OpenAIProvider(), AnthropicProvider(), GeminiProvider()]:
        agent = Agent(tools=[...], provider=provider)
        response = agent.run([Message(role=Role.USER, content="Test")])
        assert response.content

FallbackProvider

Overview

FallbackProvider wraps multiple providers in priority order with automatic failover and circuit breaker protection. If the primary provider fails, the next one is tried automatically.

Usage

from selectools import FallbackProvider, OpenAIProvider, AnthropicProvider
from selectools.providers.stubs import LocalProvider

provider = FallbackProvider([
    OpenAIProvider(default_model="gpt-4o-mini"),
    AnthropicProvider(default_model="claude-haiku"),
    LocalProvider(),
])

agent = Agent(tools=[...], provider=provider)

Circuit Breaker

After consecutive failures, a provider is temporarily skipped:

provider = FallbackProvider(
    providers=[openai, anthropic, local],
    max_failures=3,          # Skip after 3 consecutive failures
    cooldown_seconds=60,     # Skip for 60 seconds
    on_fallback=lambda name, error: log.warning(f"Skipping {name}: {error}"),
)

Failure Conditions

The provider falls through to the next on:

  • Timeout errors (timeout, 408 Request Timeout, 504 Gateway Timeout)
  • HTTP 5xx (500, 502, 503)
  • HTTP 429 (rate limits) — matches both rate limit (space) and rate_limit_exceeded (underscore)
  • Connection errors
  • Anthropic 529 Overloaded — very common on US-West traffic (v0.22.0, BUG-27)
  • Cloudflare 522/524 — origin connection/timeout errors (v0.22.0, BUG-27)
  • overloaded/service_unavailable — provider body text patterns (v0.22.0, BUG-27)

Protocol Support

FallbackProvider implements the full Provider protocol:

  • complete() — sync completion
  • acomplete() — async completion
  • stream() — sync streaming
  • astream() — async streaming

Properties

  • provider.supports_streamingTrue if any child provider supports streaming
  • provider.supports_asyncTrue if any child provider supports async
  • provider.name"fallback"

RouterProvider

Stability: beta

Overview

RouterProvider wraps multiple providers organized in cost tiers (cheapest to priciest) and routes each request to the cheapest tier capable of handling it, based on a deterministic rule-based complexity classification. On retriable failure it escalates to the next tier up, reusing FallbackProvider's retry detection and circuit breaker.

Usage

from selectools.providers import RouterProvider
from selectools import Agent, OpenAIProvider, AnthropicProvider

router = RouterProvider(
    providers={
        "fast": OpenAIProvider(default_model="gpt-5.4-nano"),           # $0.10/1M input
        "smart": AnthropicProvider(default_model="claude-sonnet-4-6"),  # $3/1M input
        "power": OpenAIProvider(default_model="gpt-5.4-pro"),           # $30/1M input
    },
    strategy="cost_optimized",  # or "quality_first", "balanced"
)

agent = Agent(tools, provider=router)

Each tier's model comes from the provider's default_model attribute (or an explicit tier_models={"fast": "gpt-5.4-nano"} override) and replaces the agent's model argument when that tier serves a request.

Complexity Classification

The classifier (selectools.providers.router.classify_complexity) is rule-based and deterministic — no LLM call. Signals are additive across rows; the token row and the tool row each pick ONE bonus (+2 if the complex threshold is met, else +1 if the moderate threshold is met — never both):

Signal Points
Input tokens: ≥ complex_token_threshold (default 1500) → +2, elsemoderate_token_threshold (default 400) → +1 max +2
Tool count: ≥ complex_tool_threshold (default 8) → +2, elsemoderate_tool_threshold (default 4) → +1 max +2
Code block (triple backticks) present +2
Reasoning keyword ("step by step", "analyze", "refactor", ...) +2
Multi-part question (≥2 ? or a numbered list) +1
Structured-output keyword ("json", "schema", "markdown table", ...) +1

Score ≥ 4 → complex; score ≥ 2 → moderate; else simple. All thresholds, score boundaries, and keyword lists are configurable via RouterConfig. Input tokens are estimated with selectools.token_estimation.estimate_tokens over the system prompt plus all messages; keyword and structure detection runs on the latest user message. Multimodal messages are handled via selectools.types.text_content: text carried in content_parts counts toward both the classified text and the token estimate, so image-bearing requests with substantial text are not misrouted to the cheapest tier.

Strategies

Strategy simple moderate complex On failure
cost_optimized cheapest tier middle tier top tier escalate up-tier
balanced middle tier middle tier top tier escalate up-tier
quality_first top tier top tier top tier degrade down-tier

The middle tier is index len(tier_order) // 2 of the cheapest-first ordering, which rounds toward the pricier tier for even tier counts: with 2 tiers the middle IS the top tier (so balanced never routes to the cheapest tier, and cost_optimized sends moderate requests to the top); with 4 tiers it is the upper-middle tier (index 2).

Tier Ordering

  • The providers dict is treated as cheapest-first by convention.
  • When every tier's model is known to the pricing registry (selectools.pricing), the ordering is verified and re-sorted by cost (a warning is logged on disagreement).
  • tier_order=["fast", "smart", "power"] overrides both.

Failure Escalation

Internally each escalation chain is a FallbackProvider over the remaining tiers, so RouterProvider inherits its semantics: retriable errors (429, 5xx, timeouts, 529 Overloaded, ...) trigger escalation; non-retriable errors (auth failures) propagate immediately; tiers that fail repeatedly are circuit-broken (circuit_breaker_threshold, circuit_breaker_cooldown). Streams never switch tiers after the first chunk has been yielded.

Inspecting Routing Decisions

router.tier_used        # tier that served the most recent request, e.g. "smart"
router.complexity_used  # "simple" | "moderate" | "complex"
router.tier_order       # resolved cheapest-first ordering

# Callbacks
RouterProvider(..., on_route=lambda complexity, tier: ...,
               on_escalation=lambda failed_tier, next_tier, exc: ...)

tier_used and complexity_used are diagnostic only: they are plain last-write-wins attributes and unreliable under concurrent use (interleaved async requests can overwrite each other's values between a call returning and the attribute being read). For per-request attribution use the on_route/on_escalation callbacks, which fire within each request's own flow. Also note stream/astream are generators: routing and the on_route callback fire at first iteration, not at call time, and tier_used only updates once the stream is fully consumed (it is stale if a stream is abandoned midway).

UsageStats is untouched — cost and provider attribution flow through from whichever underlying provider served the request.

Limitations / Future Work

  • The roadmap's "quality threshold" re-route (retry a pricier tier when the cheap answer is low-quality) is deferred: scoring answer quality without an LLM judge is guesswork. An optional LLM-based classifier/judge is future work.
  • Circuit-breaker state is tracked per escalation chain, not globally across chains.

Observability Integrations (v0.21.0)

Two new observer implementations let you ship agent traces to external observability platforms.

OTelObserver

Stability: beta

Sends agent traces to OpenTelemetry following the GenAI semantic conventions.

pip install opentelemetry-api
from selectools.observe.otel import OTelObserver

agent = Agent(
    tools=[...],
    provider=provider,
    config=AgentConfig(observers=[OTelObserver(tracer_name="my-app")]),
)

Creates spans for agent.run, gen_ai.chat (LLM calls), and tool.execute (tool executions) with standard GenAI attributes like gen_ai.usage.input_tokens and gen_ai.request.model.

LangfuseObserver

Stability: beta

Sends agent traces to Langfuse for observability, cost tracking, and debugging.

pip install langfuse
from selectools.observe.langfuse import LangfuseObserver

observer = LangfuseObserver(
    public_key="pk-...",     # Or set LANGFUSE_PUBLIC_KEY env var
    secret_key="sk-...",     # Or set LANGFUSE_SECRET_KEY env var
    host="https://...",      # Or set LANGFUSE_HOST env var (for self-hosted)
)

agent = Agent(
    tools=[...],
    provider=provider,
    config=AgentConfig(observers=[observer]),
)

# On application shutdown
observer.shutdown()

Both observers implement the standard AgentObserver protocol and can be composed with other observers (e.g. LoggingObserver, AuditLogger).


Further Reading


Next Steps: Learn about usage tracking in the Usage Module.


# Script Description
01 01_hello_world.py Minimal agent with a single provider
17 17_rag_multi_provider.py RAG across multiple provider backends
25 25_provider_fallback.py FallbackProvider with circuit breaker failover
102 102_router_provider.py RouterProvider cost-optimized tier routing (offline)