Skip to content

AI Core Components

Core AI functionality including LLM interfaces, context management, and utilities.

lumen.ai.llm

BASE_MODES = list(Mode) module-attribute

AINavigator

Bases: OpenAI

A LLM implementation that calls the Anaconda AI Navigator API.

display_name = param.String(default='AI Navigator', constant=True, doc='Display name for UI') class-attribute instance-attribute

endpoint = param.String(default='http://localhost:8080/v1', doc='\n The API endpoint; should include the full address, including the port.') class-attribute instance-attribute

mode = param.Selector(default=(Mode.JSON_SCHEMA)) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'server-model'}}) class-attribute instance-attribute

select_models = param.List(default=['server-model'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

Anthropic

Bases: Llm

A LLM implementation that calls Anthropic models such as Claude.

api_key = param.String(default=(os.getenv('ANTHROPIC_API_KEY')), doc='The Anthropic API key.') class-attribute instance-attribute

display_name = param.String(default='Anthropic', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Selector(default=(Mode.ANTHROPIC_TOOLS), objects=[Mode.ANTHROPIC_JSON, Mode.ANTHROPIC_TOOLS]) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'claude-haiku-4-5'}, 'edit': {'model': 'claude-sonnet-4-5'}}) class-attribute instance-attribute

select_models = param.List(default=['claude-sonnet-4-5', 'claude-sonnet-4-0', 'claude-3-7-sonnet-latest', 'claude-opus-4-1', 'claude-opus-4-0', 'claude-haiku-4-5', 'claude-3-5-haiku-latest', 'claude-3-haiku-20240307'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.7, bounds=(0, 1), constant=True) class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for Anthropic API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

AzureMistralAI

Bases: MistralAI

A LLM implementation that calls Mistral AI models on Azure.

api_key = param.String(default=(os.getenv('AZUREAI_ENDPOINT_KEY')), doc='The Azure API key') class-attribute instance-attribute

display_name = param.String(default='Azure Mistral AI', constant=True, doc='Display name for UI') class-attribute instance-attribute

endpoint = param.String(default=(os.getenv('AZUREAI_ENDPOINT_URL')), doc='The Azure API endpoint to invoke.') class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'azureai'}}) class-attribute instance-attribute

select_models = param.List(default=['azureai', 'mistral-large', 'mistral-small'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for Mistral AI API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

AzureOpenAI

Bases: Llm, AzureOpenAIMixin

A LLM implementation that uses the Azure OpenAI integration. Inherits from AzureOpenAIMixin which extends OpenAIMixin, so it has access to all OpenAI functionality plus Azure-specific configuration.

display_name = param.String(default='Azure OpenAI', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Selector(default=(Mode.TOOLS)) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'gpt-4o-mini'}, 'edit': {'model': 'gpt-4o'}}) class-attribute instance-attribute

select_models = param.List(default=['gpt-35-turbo', 'gpt-4-turbo', 'gpt-4o', 'gpt-4o-mini'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=1, bounds=(0, None), constant=True) class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for Azure OpenAI API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

AzureOpenAIMixin

Bases: OpenAIMixin

Mixin class for Azure OpenAI functionality that extends OpenAI functionality with Azure-specific configuration.

api_key = param.String(default=(os.getenv('AZUREAI_ENDPOINT_KEY')), doc='\n The Azure API key.') class-attribute instance-attribute

api_version = param.String(default='2024-10-21', doc='\n The Azure AI Studio API version.') class-attribute instance-attribute

endpoint = param.String(default=(os.getenv('AZUREAI_ENDPOINT_URL')), doc='\n The Azure AI Studio endpoint.') class-attribute instance-attribute

Choice

Bases: BaseModel

delta instance-attribute

finish_reason instance-attribute

message instance-attribute

Google

Bases: Llm

A LLM implementation that calls Google's Gemini models.

api_key = param.String(default=(os.getenv('GEMINI_API_KEY')), doc='The Google API key.') class-attribute instance-attribute

display_name = param.String(default='Google AI', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Selector(default=(Mode.GENAI_TOOLS), objects=[Mode.GENAI_TOOLS, Mode.GENAI_STRUCTURED_OUTPUTS]) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'gemini-2.5-flash'}, 'edit': {'model': 'gemini-2.5-pro'}}) class-attribute instance-attribute

select_models = param.List(default=['gemini-2.5-pro', 'gemini-2.5-flash', 'gemini-2.5-flash-lite', 'gemini-2.0-flash', 'gemini-2.0-flash-lite', 'gemini-1.5-flash', 'gemini-1.5-pro'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=1, bounds=(0, 1), constant=True) class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for Google AI API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

run_client(model_spec, messages, **kwargs) async

Override to handle Gemini-specific message format conversion.

ImageResponse

Bases: BaseModel

output instance-attribute

Interceptor

Bases: Parameterized

conn = self._create_connection() instance-attribute

db_path = param.String(default='messages.db', doc='Path to the SQLite database file') class-attribute instance-attribute

session_id = self._generate_session_id() instance-attribute

delete_session(session_id=None)

Delete the last session from the database.

get_all_sessions()

Retrieve the invocations of messages from all sessions.

Returns: A dictionary containing session_id as keys and the corresponding Session object for each session.

get_session(session_id=None)

Retrieve the session invocations of inputs from the last session, or a specific session if provided.

Args: session_id: The session ID to retrieve invocations from. If not provided, the last session is used.

Returns: A Session object containing invocations for the session.

get_session_ids()

init_db()

Initialize the database by creating necessary tables if they don't exist.

patch_client(client) abstractmethod

Patch the LLM client's create method to store messages and arguments in the database.

Args: client: The LLM client instance to patch.

reset_db()

Reset the database by deleting all tables.

store_invocation(messages, **kwargs)

Store messages and keyword arguments in the database for the current session.

Args: messages: List of message dictionaries to store. kwargs: The keyword arguments passed to the create method.

store_response(content)

unpatch()

Close the database connection and revert the client create.

LiteLLM

Bases: Llm

A LLM implementation using LiteLLM that supports multiple providers through a unified interface.

LiteLLM allows you to call 100+ LLMs using the same OpenAI-compatible input/output format, including providers like OpenAI, Anthropic, Cohere, Hugging Face, Azure, Vertex AI, and more.

display_name = param.String(default='LiteLLM', constant=True, doc='Display name for UI') class-attribute instance-attribute

enable_caching = param.Boolean(default=False, doc="\n Enable LiteLLM's built-in caching for repeated queries.") class-attribute instance-attribute

fallback_models = param.List(default=[], doc='\n List of fallback models to try if the primary model fails.\n Example: ["gpt-4o-mini", "claude-3-haiku", "gemini/gemini-1.5-flash"]') class-attribute instance-attribute

litellm_params = param.Dict(default={}, doc='\n Additional parameters to pass to litellm.acompletion().\n Examples: custom_llm_provider, api_base, api_version, etc.') class-attribute instance-attribute

mode = param.Selector(default=(Mode.TOOLS), objects=BASE_MODES) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'gpt-4.1-mini'}, 'edit': {'model': 'anthropic/claude-sonnet-4-5'}, 'sql': {'model': 'gpt-4.1-mini'}}, doc='\n Model configurations by type. LiteLLM supports model strings like:\n - OpenAI: "gpt-4.1-mini", "gpt-4.1-nano", "gpt-5-mini"\n - Anthropic: "anthropic/claude-sonnet-4-5", "anthropic/claude-haiku-4-5"\n - Google: "gemini/gemini-2.0-flash", "gemini/gemini-2.5-flash"\n - Mistral: "mistral/mistral-medium-latest", "mistral/mistral-small-latest"\n - And many more with format: "provider/model" or just "model" for defaults\n ') class-attribute instance-attribute

router_settings = param.Dict(default={}, doc='\n Settings for LiteLLM Router for load balancing across multiple models.\n Example: {"routing_strategy": "least-busy", "num_retries": 3}') class-attribute instance-attribute

select_models = param.List(default=['gpt-4.1-mini', 'gpt-4.1-nano', 'gpt-5-mini', 'anthropic/claude-sonnet-4-5', 'anthropic/claude-haiku-4-5', 'anthropic/claude-opus-4-1', 'gemini/gemini-2.0-flash', 'gemini/gemini-2.5-flash', 'mistral/mistral-medium-latest', 'mistral/mistral-small-latest', 'mistral/codestral-latest'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.7, bounds=(0, 2), constant=True) class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for LiteLLM API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

Get a client callable that's compatible with instructor.

For LiteLLM, we create a wrapper around litellm.acompletion that can be patched by instructor.

LlamaCpp

Bases: Llm, LlamaCppMixin

A LLM implementation using Llama.cpp Python wrapper together with huggingface_hub to fetch the models.

chat_format = param.String(constant=True) class-attribute instance-attribute

display_name = param.String(default='Llama.cpp', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Selector(default=(Mode.JSON_SCHEMA), objects=BASE_MODES) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'repo_id': 'unsloth/Qwen3-8B-GGUF', 'filename': 'Qwen3-8B-Q5_K_M.gguf', 'chat_format': 'qwen'}}) class-attribute instance-attribute

select_models = param.List(default=['unsloth/Qwen3-8B-GGUF', 'unsloth/Qwen3-Coder-30B-A3B-Instruct-GGUF', 'unsloth/DeepSeek-V3.1-GGUF', 'unsloth/gpt-oss-20b-GGUF', 'unsloth/GLM-4.6-GGUF', 'microsoft/Phi-4-GGUF', 'meta-llama/Llama-3.3-70B-Instruct-GGUF'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.4, bounds=(0, None), constant=True) class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

run_client(model_spec, messages, **kwargs) async

warmup(model_kwargs) classmethod

LlamaCppMixin

Bases: ServiceMixin

Mixin class for llama-cpp-python functionality that can be shared between LLM implementations and embedding classes.

n_batch = param.Integer(default=512, doc='\n Batch size for processing.') class-attribute instance-attribute

n_ctx = param.Integer(default=2048, doc='\n Context length for the model.') class-attribute instance-attribute

n_gpu_layers = param.Integer(default=(-1), doc='\n Number of layers to offload to GPU. -1 for all layers.') class-attribute instance-attribute

seed = param.Integer(default=128, doc='\n Random seed for reproducible outputs.') class-attribute instance-attribute

use_mlock = param.Boolean(default=True, doc='\n Force system to keep model in RAM rather than swapping.') class-attribute instance-attribute

verbose = param.Boolean(default=False, doc='\n Enable verbose output from llama.cpp.') class-attribute instance-attribute

resolve_model_spec(model_spec, base_model_kwargs)

Resolve a model specification string into model kwargs with repo_id/filename. Handles formats like "repo/model:chat_format" or "repo/model".

Llm

Bases: Parameterized

Baseclass for LLM implementations.

An LLM implementation wraps a local or cloud based LLM provider with instructor to enable support for correctly validating Pydantic models.

create_kwargs = param.Dict(default={'max_retries': 1}, doc='\n Additional keyword arguments to pass to the LLM provider\n when calling chat.completions.create. Defaults to no instructor retries\n since agents handle retries themselves.') class-attribute instance-attribute

interceptor = param.ClassSelector(default=None, class_=Interceptor, doc='\n Intercepter instance to intercept LLM calls, e.g. for logging.') class-attribute instance-attribute

logfire_tags = param.List(default=None, doc='\n Whether to log LLM calls and responses to logfire.\n If a list of tags is provided, those tags will be used for logging.\n Suppresses streaming responses if enabled since\n logfire does not track token usage on stream.') class-attribute instance-attribute

mode = param.Selector(default=(Mode.JSON_SCHEMA), objects=BASE_MODES, doc='\n The calling mode used by instructor to guide the LLM towards generating\n outputs matching the schema.') class-attribute instance-attribute

model_kwargs = param.Dict(default={}, doc="\n LLM model definitions indexed by type. Supported types include\n 'default', 'reasoning' and 'sql'. Agents may pick which model to\n invoke for different reasons.") class-attribute instance-attribute

initialize(log_level) async

invoke(messages, system='', response_model=None, allow_partial=False, model_spec='default', **input_kwargs) async

Invokes the LLM and returns its response.

Arguments

messages: list[Message] A list of messages to feed to the LLM. system: str A system message to provide to the LLM. response_model: BaseModel | None A Pydantic model that the LLM should materialize. allow_partial: bool Whether to allow the LLM to only partially fill the provided response_model. model: Literal['default' | 'reasoning' | 'sql'] The model as listed in the model_kwargs parameter to invoke to answer the query.

Returns:

Type Description
The completed response_model.

run_client(model_spec, messages, **kwargs) async

stream(messages, system='', response_model=None, field=None, model_spec='default', **kwargs) async

Invokes the LLM and streams its response.

Arguments

messages: list[Message] A list of messages to feed to the LLM. system: str A system message to provide to the LLM. response_model: BaseModel | None A Pydantic model that the LLM should materialize. field: str The field in the response_model to stream. model: Literal['default' | 'reasoning' | 'sql'] The model as listed in the model_kwargs parameter to invoke to answer the query.

Yields:

Type Description
The string or response_model field.

warmup(model_kwargs) classmethod

Allows LLM provider to perform actions that ensure that the model(s) are ready to run, e.g. downloading the model files.

Message

Bases: TypedDict

content instance-attribute

name instance-attribute

role instance-attribute

MessageModel

Bases: BaseModel

content instance-attribute

name instance-attribute

role instance-attribute

MistralAI

Bases: Llm

A LLM implementation that calls Mistral AI.

api_key = param.String(default=(os.getenv('MISTRAL_API_KEY')), doc='The Mistral AI API key.') class-attribute instance-attribute

display_name = param.String(default='Mistral AI', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Selector(default=(Mode.MISTRAL_TOOLS), objects=[Mode.JSON_SCHEMA, Mode.MISTRAL_TOOLS]) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'mistral-small-latest'}, 'edit': {'model': 'mistral-medium-latest'}}) class-attribute instance-attribute

select_models = param.List(default=['mistral-medium-latest', 'magistral-medium-latest', 'mistral-large-latest', 'magistral-small-latest', 'mistral-small-latest', 'codestral-latest', 'ministral-8b-latest', 'ministral-3b-latest', 'devstral-small-latest'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.7, bounds=(0, 1), constant=True) class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for Mistral AI API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

Ollama

Bases: OpenAI

An LLM implementation using the Ollama cloud.

api_key = param.String(default='ollama', doc='The Ollama API key; required but unused.') class-attribute instance-attribute

display_name = param.String(default='Ollama', constant=True, doc='Display name for UI') class-attribute instance-attribute

endpoint = param.String(default='http://localhost:11434/v1', doc='The Ollama API endpoint.') class-attribute instance-attribute

mode = param.Selector(default=(Mode.JSON)) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'qwen3:8b'}}) class-attribute instance-attribute

select_models = param.List(default=['qwen3:8b', 'qwen3-coder:30b', 'deepseek-r1:7b', 'llama3.3:70b', 'llama4:latest', 'gemma3:12b', 'mistral-small3.2:24b', 'qwen2.5-coder:7b', 'phi4:14b'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.25, bounds=(0, None), constant=True) class-attribute instance-attribute

OpenAI

Bases: Llm, OpenAIMixin

An LLM implementation using the OpenAI cloud.

display_name = param.String(default='OpenAI', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Selector(default=(Mode.TOOLS)) class-attribute instance-attribute

model_kwargs = param.Dict(default={'default': {'model': 'gpt-4.1-mini'}, 'sql': {'model': 'gpt-4.1-mini'}, 'vega_lite': {'model': 'gpt-4.1-mini'}, 'edit': {'model': 'gpt-4.1-mini'}, 'ui': {'model': 'gpt-4.1-nano'}}) class-attribute instance-attribute

select_models = param.List(default=['gpt-4.1-mini', 'gpt-4.1-nano', 'gpt-5-mini', 'gpt-4-turbo', 'gpt-4o', 'gpt-4o-mini'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.25, bounds=(0, None), constant=True) class-attribute instance-attribute

timeout = param.Number(default=120, bounds=(1, None), constant=True, doc='\n The timeout in seconds for OpenAI API calls.') class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

OpenAIMixin

Bases: ServiceMixin

Mixin class for OpenAI functionality that can be shared between LLM implementations and embedding classes.

api_key = param.String(default=None, doc='\n The OpenAI API key. If not provided, will use OPENAI_API_KEY env var.') class-attribute instance-attribute

endpoint = param.String(default=None, doc='\n The OpenAI API endpoint. If not provided, uses default OpenAI endpoint.') class-attribute instance-attribute

organization = param.String(default=None, doc='\n The OpenAI organization to charge.') class-attribute instance-attribute

Response

Bases: BaseModel

choices instance-attribute

WebLLM

Bases: Llm

display_name = param.String(default='WebLLM', constant=True, doc='Display name for UI') class-attribute instance-attribute

mode = param.Parameter(default=(Mode.JSON_SCHEMA)) class-attribute instance-attribute

model_kwargs = param.Dict({'default': {'model_slug': 'Qwen2.5-7B-Instruct-q4f16_1-MLC'}}) class-attribute instance-attribute

select_models = param.List(default=['Llama-3.2-3B-Instruct-q4f16_1-MLC', 'Phi-3.5-mini-instruct-q4f16_1-MLC', 'Qwen2.5-7B-Instruct-q4f16_1-MLC'], constant=True, doc='Available models for selection dropdowns') class-attribute instance-attribute

temperature = param.Number(default=0.4, bounds=(0, None)) class-attribute instance-attribute

get_client(model_spec, response_model=None, **kwargs) async

initialize(log_level) async

status()

format_exception(exc, limit=0)

Format and return a string representation of an exception.

Parameters:

Name Type Description Default
exc Exception

The exception to be formatted.

required
limit int

The maximum number of layers of traceback to include in the output. Default is 0.

0

Returns:

Type Description
str

A formatted string describing the exception and its traceback.

log_debug(msg, offset=24, prefix='', suffix='', show_sep=None, show_length=False)

Log a debug message with a separator line above and below.

truncate_string(s, max_length=30, ellipsis='...')

lumen.ai.embeddings

STOP_WORDS = (Path(__file__).parent / 'embeddings_stop_words.txt').read_text().splitlines() module-attribute

STOP_WORDS_RE = re.compile('\\b(?:{})\\b'.format('|'.join(STOP_WORDS)), re.IGNORECASE) module-attribute

AzureOpenAIEmbeddings

Bases: Embeddings, AzureOpenAIMixin

AzureOpenAIEmbeddings is an embeddings class that uses the Azure OpenAI API to generate embeddings. Inherits from AzureOpenAIMixin which extends OpenAIMixin, so it has access to all OpenAI functionality plus Azure-specific configuration.

:Example:

embeddings = AzureOpenAIEmbeddings() await embeddings.embed(["Hello, world!", "Goodbye, world!"])

client = self._instantiate_client(async_client=True) instance-attribute

model = param.String(default='text-embedding-3-large', doc='The OpenAI model to use.') class-attribute instance-attribute

embed(texts) async

AzureOpenAIMixin

Bases: OpenAIMixin

Mixin class for Azure OpenAI functionality that extends OpenAI functionality with Azure-specific configuration.

api_key = param.String(default=(os.getenv('AZUREAI_ENDPOINT_KEY')), doc='\n The Azure API key.') class-attribute instance-attribute

api_version = param.String(default='2024-10-21', doc='\n The Azure AI Studio API version.') class-attribute instance-attribute

endpoint = param.String(default=(os.getenv('AZUREAI_ENDPOINT_URL')), doc='\n The Azure AI Studio endpoint.') class-attribute instance-attribute

Embeddings

Bases: Parameterized

embed(texts) abstractmethod async

Generate embeddings for a list of texts.

HuggingFaceEmbeddings

Bases: Embeddings

HuggingFaceEmbeddings is an embeddings class that uses sentence-transformers to generate embeddings from Hugging Face models.

:Example:

embeddings = HuggingFaceEmbeddings() await embeddings.embed(["Hello, world!", "Goodbye, world!"])

device = param.String(default='cpu', doc="Device to run the model on (e.g., 'cpu' or 'cuda').") class-attribute instance-attribute

embedding_dim = self._model.get_sentence_embedding_dimension() instance-attribute

model = param.String(default='ibm-granite/granite-embedding-107m-multilingual', doc='\n The Hugging Face model to use.') class-attribute instance-attribute

prompt_name = param.String(default=None, doc='\n The prompt name to use for encoding queries. If None, no prompt is used.') class-attribute instance-attribute

embed(texts) async

LlamaCppEmbeddings

Bases: Embeddings, LlamaCppMixin

LlamaCppEmbeddings is an embeddings class that uses the llama-cpp-python library to generate embeddings from GGUF models.

:Example:

embeddings = LlamaCppEmbeddings( ... model_kwargs={ ... "repo_id": "Qwen/Qwen3-Embedding-4B-GGUF", ... "filename": "Qwen3-Embedding-4B-Q4_K_M.gguf", ... "n_ctx": 512, ... "n_batch": 64 ... } ... ) await embeddings.embed(["Hello, world!", "Goodbye, world!"])

llm = self._instantiate_client(embedding=True) instance-attribute

model_kwargs = param.Dict(default={'default': {'repo_id': 'Qwen/Qwen3-Embedding-4B-GGUF', 'filename': 'Qwen3-Embedding-4B-Q4_K_M.gguf'}}) class-attribute instance-attribute

embed(texts) async

LlamaCppMixin

Bases: ServiceMixin

Mixin class for llama-cpp-python functionality that can be shared between LLM implementations and embedding classes.

n_batch = param.Integer(default=512, doc='\n Batch size for processing.') class-attribute instance-attribute

n_ctx = param.Integer(default=2048, doc='\n Context length for the model.') class-attribute instance-attribute

n_gpu_layers = param.Integer(default=(-1), doc='\n Number of layers to offload to GPU. -1 for all layers.') class-attribute instance-attribute

seed = param.Integer(default=128, doc='\n Random seed for reproducible outputs.') class-attribute instance-attribute

use_mlock = param.Boolean(default=True, doc='\n Force system to keep model in RAM rather than swapping.') class-attribute instance-attribute

verbose = param.Boolean(default=False, doc='\n Enable verbose output from llama.cpp.') class-attribute instance-attribute

resolve_model_spec(model_spec, base_model_kwargs)

Resolve a model specification string into model kwargs with repo_id/filename. Handles formats like "repo/model:chat_format" or "repo/model".

NumpyEmbeddings

Bases: Embeddings

NumpyEmbeddings is a simple embeddings class that uses a hash function to map n-grams to the vocabulary.

Note that the default hash function is not stable across different Python sessions. If you need a stable hash function, you can set the hash_func, e.g. using murmurhash from the mmh3 package.

:Example:

embeddings = NumpyEmbeddings() await embeddings.embed(["Hello, world!", "Goodbye, world!"])

embedding_dim = param.Integer(default=256, doc='The size of the embedding vector') class-attribute instance-attribute

hash_func = param.Callable(default=hash, doc='\n The hashing function to use to map n-grams to the vocabulary.') class-attribute instance-attribute

seed = param.Integer(default=42, doc='The seed for the random number generator.') class-attribute instance-attribute

vocab_size = param.Integer(default=5000, doc='The size of the vocabulary.') class-attribute instance-attribute

embed(texts) async

get_char_ngrams(text, n=3)

OpenAIEmbeddings

Bases: Embeddings, OpenAIMixin

OpenAIEmbeddings is an embeddings class that uses the OpenAI API to generate embeddings.

:Example:

embeddings = OpenAIEmbeddings() await embeddings.embed(["Hello, world!", "Goodbye, world!"])

client = self._instantiate_client(async_client=True) instance-attribute

model = param.String(default='text-embedding-3-small', doc='The OpenAI model to use.') class-attribute instance-attribute

embed(texts) async

OpenAIMixin

Bases: ServiceMixin

Mixin class for OpenAI functionality that can be shared between LLM implementations and embedding classes.

api_key = param.String(default=None, doc='\n The OpenAI API key. If not provided, will use OPENAI_API_KEY env var.') class-attribute instance-attribute

endpoint = param.String(default=None, doc='\n The OpenAI API endpoint. If not provided, uses default OpenAI endpoint.') class-attribute instance-attribute

organization = param.String(default=None, doc='\n The OpenAI organization to charge.') class-attribute instance-attribute

lumen.ai.vector_store

PROMPTS_DIR = THIS_DIR / 'prompts' module-attribute

DuckDBVectorStore

Bases: VectorStore

Vector store implementation using DuckDB for persistent storage.

:Example:

.. code-block:: python

from lumen.ai.vector_store import DuckDBStore

vector_store = DuckDBStore(uri=':memory:)
vector_store.add_file('https://lumen.holoviz.org')
vector_store.query('LLM', threshold=0.1)

Use upsert to avoid adding duplicate content:

.. code-block:: python

from lumen.ai.vector_store import DuckDBStore

vector_store = DuckDBStore(uri=':memory:)
vector_store.upsert([{'text': 'Hello!', 'metadata': {'source': 'greeting'}}])
# Won't add duplicate if content is similar and metadata matches
vector_store.upsert([{'text': 'Hello!', 'metadata': {'source': 'greeting'}}])

connection = connection instance-attribute

embeddings = param.ClassSelector(class_=Embeddings, default=None, allow_None=True, doc='Embeddings object for text processing. If None and a URI is provided, loads from the database; else NumpyEmbeddings.') class-attribute instance-attribute

read_only = param.Boolean(default=False, doc='Whether to open the database in read-only mode') class-attribute instance-attribute

uri = param.String(default=':memory:', doc='The URI of the DuckDB database') class-attribute instance-attribute

clear()

Clear all entries and reset sequence.

Drops the documents table and sequence, then sets up the database again.

close()

Close the DuckDB connection.

delete(ids)

Delete items from the vector store by their IDs.

Parameters ids: List of IDs to delete.

filter_by(filters, limit=None, offset=0)

Filter items by metadata without using embeddings similarity.

Parameters:

Name Type Description Default
filters dict

Dictionary of metadata key-value pairs to filter by.

required
limit int | None

Maximum number of results to return. If None, returns all matches.

None
offset int

Number of results to skip (for pagination).

0

Returns:

Type Description
List of results with 'id', 'text', and 'metadata'.

query(text, top_k=5, filters=None, threshold=0.0) async

Query the vector store for similar items.

Parameters:

Name Type Description Default
text str

The query text.

required
top_k int

Number of top results to return.

5
filters dict | None

Optional metadata filters.

None
threshold float

Minimum similarity score required for a result to be included.

0.0

Returns:

Type Description
List of results with 'id', 'text', 'metadata', and 'similarity' score.

upsert(items, situate=None) async

Add items to the vector store if similar items don't exist, update them if they do.

Parameters:

Name Type Description Default
items list[dict]

List of dictionaries containing 'text' and optional 'metadata'.

required
situate bool | None

Whether to insert a llm_context key in the metadata containing contextual about the chunks. If None, uses the class default.

None

Returns:

Type Description
List of assigned IDs for the added or updated items.

Embeddings

Bases: Parameterized

embed(texts) abstractmethod async

Generate embeddings for a list of texts.

LLMUser

Bases: Parameterized

Mixin for classes that use prompts with LLMs. Provides parameters and methods for prompt templating and LLM interactions.

llm = param.ClassSelector(class_=Llm, doc='\n The LLM implementation to query.') class-attribute instance-attribute

llm_spec_key property

Converts class name to a snake_case model identifier. Used for looking up model configurations in model_kwargs.

prompts = param.Dict(default={'main': {'template': PROMPTS_DIR / 'Actor' / 'main.jinja2'}}, doc="\n A dictionary of prompts, indexed by prompt name.\n Each prompt should be defined as a dictionary containing a template\n 'template' and optionally a 'model' and 'tools'.") class-attribute instance-attribute

steps_layout = param.ClassSelector(default=None, class_=(ListLike, NamedListLike), allow_None=True, doc='\n The layout progress updates will be streamed to.') class-attribute instance-attribute

template_overrides = param.Dict(default={}, doc="\n Overrides the template's blocks (instructions, context, tools, examples).\n Is a nested dictionary with the prompt name (e.g. main) as the key\n and the block names as the inner keys with the new content as the\n values.") class-attribute instance-attribute

NumpyEmbeddings

Bases: Embeddings

NumpyEmbeddings is a simple embeddings class that uses a hash function to map n-grams to the vocabulary.

Note that the default hash function is not stable across different Python sessions. If you need a stable hash function, you can set the hash_func, e.g. using murmurhash from the mmh3 package.

:Example:

embeddings = NumpyEmbeddings() await embeddings.embed(["Hello, world!", "Goodbye, world!"])

embedding_dim = param.Integer(default=256, doc='The size of the embedding vector') class-attribute instance-attribute

hash_func = param.Callable(default=hash, doc='\n The hashing function to use to map n-grams to the vocabulary.') class-attribute instance-attribute

seed = param.Integer(default=42, doc='The seed for the random number generator.') class-attribute instance-attribute

vocab_size = param.Integer(default=5000, doc='The size of the vocabulary.') class-attribute instance-attribute

embed(texts) async

get_char_ngrams(text, n=3)

NumpyVectorStore

Bases: VectorStore

Vector store implementation using NumPy for in-memory storage.

:Example:

.. code-block:: python

from lumen.ai.vector_store import NumpyVectorStore

vector_store = NumpyVectorStore()
vector_store.add_file('https://lumen.holoviz.org')
vector_store.query('LLM', threshold=0.1)

Use upsert to avoid adding duplicate content:

.. code-block:: python

from lumen.ai.vector_store import NumpyVectorStore

vector_store = NumpyVectorStore()
vector_store.upsert([{'text': 'Hello!', 'metadata': {'source': 'greeting'}}])
# Won't add duplicate if content is similar and metadata matches
vector_store.upsert([{'text': 'Hello!', 'metadata': {'source': 'greeting'}}])

ids = [] instance-attribute

metadata = [] instance-attribute

texts = [] instance-attribute

vectors = None instance-attribute

clear()

Clear all items from the vector store.

Resets the vectors, texts, metadata, and IDs to their initial empty states.

close()

Close the vector store and release any resources.

For NumpyVectorStore, this clears vectors to free memory.

delete(ids)

Delete items from the vector store by their IDs.

Parameters:

Name Type Description Default
ids list[int]

List of IDs to delete.

required

filter_by(filters, limit=None, offset=0)

Filter items by metadata without using embeddings similarity.

Parameters:

Name Type Description Default
filters dict

Dictionary of metadata key-value pairs to filter by.

required
limit int | None

Maximum number of results to return. If None, returns all matches.

None
offset int

Number of results to skip (for pagination).

0

Returns:

Type Description
List of results with 'id', 'text', and 'metadata'.

query(text, top_k=5, filters=None, threshold=0.0) async

Query the vector store for similar items.

Parameters:

Name Type Description Default
text str

The query text.

required
top_k int

Number of top results to return.

5
filters dict | None

Optional metadata filters.

None
threshold float

Minimum similarity score required for a result to be included.

0.0

Returns:

Type Description
List of results with 'id', 'text', 'metadata', and 'similarity' score.

upsert(items, situate=None) async

Add items to the vector store if similar items don't exist, update them if they do.

Parameters:

Name Type Description Default
items list[dict]

List of dictionaries containing 'text' and optional 'metadata'.

required
situate bool | None

Whether to insert a llm_context key in the metadata containing contextual about the chunks. If None, uses the class default.

None

Returns:

Type Description
List of assigned IDs for the added or updated items.

VectorStore

Bases: LLMUser

Abstract base class for a vector store.

chunk_func = param.Callable(default=None, doc='\n The function used to split documents into chunks.\n Must accept `text`. If None, defaults to semchunk.chunkerify\n with the chunk_size.') class-attribute instance-attribute

chunk_func_kwargs = param.Dict(default={}, doc='\n Additional keyword arguments to pass to the chunking function.') class-attribute instance-attribute

chunk_size = param.Integer(default=1024, doc='Maximum size of text chunks to split documents into.') class-attribute instance-attribute

chunk_tokenizer = param.String(default='gpt-4o-mini', doc='\n If using the default chunk_func, the tokenizer used to split documents into chunks.\n Must be a valid tokenizer name from the transformers or tiktoken library.\n Otherwise, please pass in a custom chunk_func.') class-attribute instance-attribute

embeddings = param.ClassSelector(class_=Embeddings, default=(NumpyEmbeddings()), allow_None=True, doc='Embeddings object for text processing.') class-attribute instance-attribute

excluded_metadata = param.List(default=['llm_context'], doc='List of metadata keys to exclude when creating the embeddings.') class-attribute instance-attribute

prompts = param.Dict(default={'main': {'template': PROMPTS_DIR / 'VectorStore' / 'main.jinja2'}, 'should_situate': {'template': PROMPTS_DIR / 'VectorStore' / 'should_situate.jinja2', 'response_model': YesNo}}, doc="\n A dictionary of prompts used by the vector store, indexed by prompt name.\n Each prompt should be defined as a dictionary containing a template\n 'template' and optionally a 'model' and 'tools'.") class-attribute instance-attribute

situate = param.Boolean(default=False, doc='\n Whether to insert a `llm_context` key in the metadata containing\n contextual about the chunks.') class-attribute instance-attribute

add(items, force_ids=None, situate=None) async

Add items to the vector store.

Parameters:

Name Type Description Default
items list[dict]

List of dictionaries containing 'text' and optional 'metadata'.

required
force_ids list[int] | None

Optional list of IDs to use instead of generating new ones.

None
situate bool | None

Whether to insert a llm_context key in the metadata containing contextual about the chunks. If None, uses the class default.

None

Returns:

Type Description
List of assigned IDs for the added items.

add_directory(directory, pattern='*', exclude_patterns=None, metadata=None, situate=None, upsert=False, raise_on_error=False, max_concurrent=5) async

Recursively add files from a directory that match the pattern and don't match exclude patterns.

Parameters:

Name Type Description Default
directory Union[str, PathLike]

The path to the directory to search for files.

required
pattern str

Glob pattern to match files against (e.g., ".txt", ".py"). Default is "*".

'*'
exclude_patterns Optional[List[str]]

List of patterns to exclude. Files matching any of these patterns will be skipped.

None
metadata Optional[dict[str, Any]]

Base metadata to apply to all files. Will be extended with filename-specific metadata.

None
situate Optional[bool]

Whether to insert a llm_context key in the metadata. If None, uses the class default.

None
upsert bool

If True, will update existing items if similar content is found. Default is False.

False
raise_on_error

If True, will raise an error if any file fails to process. Default is False.

False
max_concurrent int

Maximum number of files to process concurrently. Default is 5.

5

Returns:

Type Description
List[int]

Combined list of IDs for all added files.

add_file(filename, ext=None, metadata=None, situate=None, upsert=False) async

Adds a file or a URL to the collection.

Parameters:

Name Type Description Default
filename str | IO | Any | PathLike

The path to the file, a file-like object or a URL to be added.

required
ext str | None

The file extension to associate with the added file. If not provided, it will be determined from the file or URL.

None
metadata dict | None

A dictionary containing metadata related to the file (e.g., title, author, description). Defaults to None.

None
situate bool | None

Whether to insert a llm_context key in the metadata containing contextual about the chunks. If None, uses the class default.

None
upsert bool

If True, will update existing items if similar content is found. Defaults to False.

False

Returns:

Type Description
List of assigned IDs for the added items.

clear() abstractmethod

Clear all items from the vector store.

close() abstractmethod

Close the vector store and release any resources.

delete(ids) abstractmethod

Delete items from the vector store by their IDs.

Parameters:

Name Type Description Default
ids list[int]

List of IDs to delete.

required

filter_by(filters, limit=None, offset=0) abstractmethod

Filter items by metadata without using embeddings similarity.

Parameters:

Name Type Description Default
filters dict

Dictionary of metadata key-value pairs to filter by.

required
limit int | None

Maximum number of results to return. If None, returns all matches.

None
offset int

Number of results to skip (for pagination).

0

Returns:

Type Description
List of results with 'id', 'text', and 'metadata'.

query(text, top_k=5, filters=None, threshold=0.0, situate=None) abstractmethod async

Query the vector store for similar items.

Parameters:

Name Type Description Default
text str

The query text.

required
top_k int

Number of top results to return.

5
filters dict | None

Optional metadata filters.

None
threshold float

Minimum similarity score required for a result to be included.

0.0
situate bool | None

Whether to insert a llm_context key in the metadata containing contextual about the chunks. If None, uses the class default.

None

Returns:

Type Description
List of results with 'id', 'text', 'metadata', and 'similarity' score.

should_situate_chunk(chunk) async

Determine whether a chunk should be situated based on its content.

Parameters:

Name Type Description Default
chunk str

The chunk text to evaluate

required

Returns:

Type Description
bool

Whether the chunk should be situated

upsert(items, situate=None) async

Add items to the vector store if similar items don't exist, update them if they do.

Parameters:

Name Type Description Default
items list[dict]

List of dictionaries containing 'text' and optional 'metadata'.

required
situate bool | None

Whether to insert a llm_context key in the metadata containing contextual about the chunks. If None, uses the class default.

None

Returns:

Type Description
List of assigned IDs for the added or updated items.

YesNo

Bases: BaseModel

yes = Field(description='True if yes, otherwise False.') class-attribute instance-attribute

log_debug(msg, offset=24, prefix='', suffix='', show_sep=None, show_length=False)

Log a debug message with a separator line above and below.

lumen.ai.utils

LineEdit = Annotated[InsertLine | ReplaceLine | DeleteLine, Field(discriminator='op')] module-attribute

PROMPTS_DIR = THIS_DIR / 'prompts' module-attribute

SOURCE_TABLE_SEPARATOR = ' ⦙ ' module-attribute

UNRECOVERABLE_ERRORS = (ImportError, LlmSetupError, RecursionError, MissingContextError, asyncio.CancelledError) module-attribute

_BLOCK_RE = re.compile('{%-?\\s*block\\s+(?P<name>\\w+)\\s*-?%}(?P<body>.*?){%-?\\s*endblock(?:\\s+(?P=name))?\\s*-?%}', re.DOTALL) module-attribute

log = getLogger(__name__) module-attribute

BlockNameCollector

Bases: NodeVisitor

blocks = [] instance-attribute

visit_Block(node)

DeleteLine

Bases: BaseModel

line_no = Field(ge=1, description='The 1-based line number to delete.') class-attribute instance-attribute

op = 'delete' class-attribute instance-attribute

Details

Bases: JSComponent

A component wrapper for the HTML details element with proper content rendering.

This component provides an expandable details/summary element that can be used to hide/show content. It supports any Panel component as content, including Markdown panes for rendering markdown content.

collapsed = param.Boolean(default=True, doc='\n Whether the details element is collapsed by default.') class-attribute instance-attribute

object = Child(doc='\n The content to display within the details element. Can be any Panel component,\n including a Markdown pane for rendering markdown content.') class-attribute instance-attribute

title = param.String(default='Details', doc='\n The text to display in the summary element (the clickable header).') class-attribute instance-attribute

InsertLine

Bases: BaseModel

line = Field(min_length=1, description='Content for the new line (must be non-empty).') class-attribute instance-attribute

line_no = Field(ge=1, description='Insert BEFORE this 1-based line number. Use line_no == len(lines) to append at the end.') class-attribute instance-attribute

op = 'insert' class-attribute instance-attribute

MissingContextError

Bases: Exception

Raise to indicate missing context for a query.

Pipeline

Bases: Viewer, Component

Pipeline encapsulates filters and transformations applied to a :class:lumen.sources.base.Source table.

A Pipeline ingests data from a :class:lumen.sources.base.Source table or another Pipeline applying the declared :class:lumen.filters.base.Filter, :class:lumen.transforms.base.Transform and :class:lumen.transforms.sql.SQLTransform definitions. It can be used to drive one or more visual outputs or leveraged as a standalone component to encapsulate multiple data processing steps.

auto_update = param.Boolean(default=True, constant=True, doc='\n Whether changes in filters, transforms and references automatically\n trigger updates in the data or whether an update has to be triggered\n manually using the update event or the update button in the UI.') class-attribute instance-attribute

control_panel property

data = DataFrame(doc='The current data on this source.') class-attribute instance-attribute

filters = param.List(item_type=Filter, doc='\n A list of Filters to apply to the source data.') class-attribute instance-attribute

pipeline = param.ClassSelector(class_=None, doc='\n Optionally a pipeline may be chained to another pipeline.') class-attribute instance-attribute

refs property

schema = param.Dict(doc='The schema of the input data.') class-attribute instance-attribute

source = param.ClassSelector(class_=Source, doc='\n The Source this pipeline is fed by.') class-attribute instance-attribute

sql_transforms = param.List(item_type=SQLTransform, doc='\n A list of SQLTransforms to apply to the source data.') class-attribute instance-attribute

table = param.String(doc='\n The name of the table driving this pipeline.') class-attribute instance-attribute

transforms = param.List(item_type=Transform, doc='\n A list of Transforms to apply to the source data.') class-attribute instance-attribute

update = param.Event(label='Apply update', doc='\n Update event trigger (if manual update is set).') class-attribute instance-attribute

add_filter(filt, field=None, **kwargs)

Add a filter to the pipeline.

Arguments

filt: Filter | Type[Filter] The filter instance or filter type to add. field: str | None The field to filter on (required to instantiate Filter type).

add_transform(transform, **kwargs)

Add a (SQL)Transform to the pipeline.

Arguments

filt: Transform The Transform instance to add.

chain(filters=None, transforms=None, sql_transforms=None, **kwargs)

Chains additional filtering, transform and sql_transform operations on an existing pipeline. Note that if one or more sql_transforms are provided the existing table will be mirrored into a DuckDB database.

Arguments

filters: List[Filter] | None Additional filters to apply on top of existing pipeline. transforms: List[Transform] | None Additional transforms to apply on top of existing pipeline. sql_transforms: List[SQLTransform] | None Additional filters to apply on top of existing pipeline.

Returns:

Type Description
Pipeline

clone(**params)

Create a new instance of the pipeline with optionally overridden parameter values.

from_spec(spec, source=None, source_filters=None) classmethod

Creates a Pipeline from a specification.

Parameters:

Name Type Description Default
spec dict or str

Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary.

required

Returns:

Type Description
Resolved and instantiated Pipeline object

get_schema()

Generates a JSON schema for the current data held by the Pipeline.

Returns:

Name Type Description
schema dict[str, any]

JSON schema for each column in the current data.

precache(queries)

Populates the cache of the :class:lumen.sources.base.Source with the provided queries.

Queries can be provided in two formats:

  • A dictionary containing 'filters' and 'variables' dictionaries each containing lists of values to compute a cross-product for, e.g.

    { 'filters': { ': ['a', 'b', 'c', ...], ... }, 'variables': { : [0, 2, 4, ...], ... } } - A list containing dictionaries of explicit values for each filter and variables.

    [{ 'filters': {: 'a'}, 'variables': {: 0} }, { 'filters': {: 'a'}, 'variables': {: 1} }, ... ]

to_spec(context=None)

Exports the full specification to reconstruct this component.

Parameters:

Name Type Description Default
context dict[str, Any] | None

Context contains the specification of all previously serialized components, e.g. to allow resolving of references.

None

Returns:

Type Description
Declarative specification of this component.

traverse(type)

Returns all Filter or Transform objects in a potentially chained pipeline.

validate(spec, context=None) classmethod

ReplaceLine

Bases: BaseModel

line = Field(description='The new content for the line (empty string is allowed).') class-attribute instance-attribute

line_no = Field(ge=1, description='The 1-based line number to replace.') class-attribute instance-attribute

op = 'replace' class-attribute instance-attribute

RetriesExceededError

Bases: Exception

Raised when the maximum number of retries is exceeded.

Source

Bases: MultiTypeComponent

Source components provide allow querying all kinds of data.

A Source can return one or more tables queried using the .get_tables method, a description of the data returned by each table in the form of a JSON schema accessible via the .get_schema method and lastly a .get method that allows filtering the data.

The Source base class also implements both in-memory and disk caching which can be enabled if a cache_dir is provided. Data cached to disk is stored as parquet files.

cache_data = param.Boolean(default=True, doc='\n Whether to cache actual data.') class-attribute instance-attribute

cache_dir = param.String(default=None, doc='\n Whether to enable local cache and write file to disk.') class-attribute instance-attribute

cache_metadata = param.Boolean(default=True, doc='\n Whether to cache metadata.') class-attribute instance-attribute

cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute

cache_schema = param.Boolean(default=True, doc='\n Whether to cache table schemas.') class-attribute instance-attribute

cache_with_dask = param.Boolean(default=True, doc='\n Whether to read and write cache files with dask if available.') class-attribute instance-attribute

metadata = param.Dict(default={}, doc='\n Optional metadata about the source tables. Should follow the format:\n {"table_name": {"description": ..., "columns": {"column_name": "..."}}}') class-attribute instance-attribute

metadata_func = param.Callable(default=None, doc='\n Function to implement custom metadata lookup for tables.\n Given a list of tables it should return a dictionary of the form:\n\n {\n <table>: {"description": ..., "columns": {"column_name": "..."}}\n }\n\n May be used to override the default _get_table_metadata\n implementation of the Source.') class-attribute instance-attribute

panel property

A Source can return a Panel object which displays information about the Source or controls how the Source queries data.

root = param.ClassSelector(class_=Path, precedence=(-1), doc='\n Root folder of the cache_dir, default is config.root') class-attribute instance-attribute

shared = param.Boolean(default=False, doc='\n Whether the Source can be shared across all instances of the\n dashboard. If set to `True` the Source will be loaded on\n initial server load.') class-attribute instance-attribute

source_type = None class-attribute

clear_cache(*events)

Clears any cached data.

from_spec(spec) classmethod

Creates a Source object from a specification. If a Source specification references other sources these may be supplied in the sources dictionary and be referenced by name.

Parameters:

Name Type Description Default
spec dict or str

Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary.

required

Returns:

Type Description
Resolved and instantiated Source object

get(table, **query)

Return a table; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_async(table, **query) async

Return a table asynchronously; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_metadata(table)

Returns metadata for one, multiple or all tables provided by the source.

The metadata for a table is structured as:

{ "description": ..., "columns": { : { "description": ..., "data_type": ..., } }, **other_metadata }

If a list of tables or no table is provided the metadata is nested one additional level:

{ "table_name": { { "description": ..., "columns": { : { "description": ..., "data_type": ..., } }, **other_metadata } } }

Parameters:

Name Type Description Default
table str | list[str] | None

The name of the table to return the schema for. If None returns schema for all available tables.

required

Returns:

Name Type Description
metadata dict

Dictionary of metadata indexed by table (if no table was was provided or individual table metdata.

get_schema(table=None, limit=None, shuffle=False)

Returns JSON schema describing the tables returned by the Source.

Parameters:

Name Type Description Default
table str | None

The name of the table to return the schema for. If None returns schema for all available tables.

None
limit int | None

Limits the number of rows considered for the schema calculation

None

Returns:

Type Description
dict

JSON schema(s) for one or all the tables.

get_tables()

Returns the list of tables available on this source.

Returns:

Type Description
list

The list of available tables on this source.

validate(spec, context=None) classmethod

apply_changes(lines, edits)

Apply a Patch to a list of lines (no trailing newline characters in elements). Indices in the Patch refer to the ORIGINAL lines.

Strategy: - To keep indices stable, apply all non-insert edits in DESCENDING order of line_no. - Apply inserts in ASCENDING order of line_no (grouped), inserting BEFORE that index. Inserts at the same index are applied in the order they appear in patch.edits.

cast_value(value)

class_name_to_llm_spec_key(class_name)

Convert class name to llm_spec_key using the same logic as Actor.llm_spec_key. Removes "Agent" suffix and converts to snake_case.

clean_sql(sql_expr, dialect=None, prettify=False)

Cleans up a SQL expression generated by an LLM by removing backticks, fencing and extraneous space and semi-colons.

describe_data(df, enum_limit=3, reduce_enums=True) async

extract_block_source(template_path, block_name, relative_to=PROMPTS_DIR)

format_exception(exc, limit=0)

Format and return a string representation of an exception.

Parameters:

Name Type Description Default
exc Exception

The exception to be formatted.

required
limit int

The maximum number of layers of traceback to include in the output. Default is 0.

0

Returns:

Type Description
str

A formatted string describing the exception and its traceback.

format_float(num)

Process a float value, returning numeric types instead of strings. For very large/small numbers, returns a float that will display in scientific notation.

fuse_messages(messages, max_user_messages=2)

Fuses the chat history into a single system message, followed by the last user message. This function is reusable across different components that need to combine multiple messages into a simplified format for LLM processing.

Parameters:

Name Type Description Default
messages list[dict]

List of message dictionaries with 'role' and 'content' keys

required
max_user_messages int

Maximum number of user messages to include in the history

2

Returns:

Type Description
list[dict]

Processed messages with the chat history as a system message

generate_diff(old_text, new_text, filename='spec')

Generate a unified diff between old and new text.

Parameters:

Name Type Description Default
old_text str

The original text

required
new_text str

The modified text

required
filename str

The filename to use in the diff header

'spec'

Returns:

Type Description
str

A unified diff string showing the changes

get_block_names(template_path, relative_to=PROMPTS_DIR)

get_data(pipeline) async

A wrapper be able to use asyncio.to_thread and not block the main thread when calling pipeline.data

get_pipeline(**kwargs) async

A wrapper be able to use asyncio.to_thread and not block the main thread when calling Pipeline

get_root_exception(e, depth=5, exceptions=None)

Recursively get the root cause of an exception up to a specified depth.

Parameters:

Name Type Description Default
e Exception

The exception to analyze

required
depth int

Maximum recursion depth to avoid infinite loops

5
exceptions list[Exception]

List of exception types to consider as root causes. If None, all exceptions are considered.

None

Returns:

Type Description
Exception | None

The root cause exception if found, otherwise None

get_schema(source, table=None, include_type=True, include_min_max=True, include_enum=True, include_count=False, include_empty_fields=False, include_nans=False, reduce_enums=True, shuffle=True, **get_kwargs) async

get_template_loader(template_path, relative_to=PROMPTS_DIR)

load_json(json_spec)

Load a JSON string, handling unicode escape sequences.

log_debug(msg, offset=24, prefix='', suffix='', show_sep=None, show_length=False)

Log a debug message with a separator line above and below.

mutate_user_message(content, messages, suffix=True, wrap=False, inplace=True)

Helper to mutate the last user message in a list of messages. Suffixes the content by default, else prefixes.

normalized_name(inst)

Returns the name of a Parameterized instance, stripping the auto-generated object count.

parse_huggingface_url(url)

Parse a HuggingFace URL to extract repo, model file, and model_kwargs.

Args: url: HuggingFace URL in format https://huggingface.co/org/repo/blob/main/file.gguf?param1=value1&param2=value2

Returns: tuple: (repo, model_file, model_kwargs)

Raises: ValueError: If URL format is invalid

parse_table_slug(table, sources, normalize=True)

process_enums(spec, num_cols, limit=None, include_enum=True, reduce_enums=True, include_empty_fields=False)

Process enum values in a schema field specification.

Parameters:

Name Type Description Default
spec dict

The specification dictionary for this field

required
num_cols int

The total number of columns in the schema (used for enum reduction calculation)

required
limit int

The limit used in schema generation

None
include_enum bool

Whether to include enum values

True
reduce_enums bool

Whether to reduce the number of enum values for readability

True
include_empty_fields bool

Whether to include fields that are empty

False

Returns:

Type Description
tuple[dict, bool]

The updated spec dictionary and a boolean indicating if this field is empty

render_template(template_path, overrides=None, relative_to=PROMPTS_DIR, **context)

report_error(exc, step, language='python', context='', status='failed')

retry_llm_output(retries=3, sleep=1)

Retry a function that returns a response from the LLM API. If the function raises an exception, retry it up to retries times. If sleep is provided, wait that many seconds between retries. If an error occurs, pass the error message into kwargs for the function to manually handle by including it.

set_nested(data, keys, value)

Set nested dictionary value

stream_details(content, step, title='Expand for details', auto=True, **stream_kwargs)

Process content to place code blocks inside collapsible details elements

Parameters:

Name Type Description Default
content str

The content to format

required
step Any

The chat step to stream the formatted content to, or NullStep if interface is None

required
title str

The title of the details component

'Expand for details'
auto bool

Whether to automatically determine what to put in details or all. If False, all of content will be placed in details components.

True

Returns:

Type Description
str

The formatted content with code blocks in details components

truncate_iterable(iterable, max_length=150)

truncate_string(s, max_length=30, ellipsis='...')

warn_on_unused_variables(string, kwargs, prompt_label)

with_timeout(coro, timeout_seconds=10, default_value=None, error_message=None) async

Executes a coroutine with a timeout.

Parameters:

Name Type Description Default
coro coroutine

The coroutine to execute with a timeout

required
timeout_seconds float

Maximum number of seconds to wait before timing out

10
default_value Any

Value to return if timeout occurs

None
error_message str

Custom error message to log on timeout

None

Returns:

Type Description
Any

Result of the coroutine if it completes in time, otherwise default_value

wrap_logfire(span_name=None, extract_args=True, **instrument_kwargs)

Decorator to instrument a function with logfire if llm.logfire_tags is provided. Expects the decorated function to have an 'llm' attribute (on self) or be passed via instrument_kwargs.

wrap_logfire_on_method(cls, method_name)

Automatically adds logfire wrapping to a method.