Skip to content

Sources

lumen.sources

ae5

AE5Source

Bases: Source

The AE5Source queries an Anaconda Enterprise 5 instance for statistics.

Specifically it provides tables with information nodes, deployments, sessions, jobs and resource profiles.

admin_password = param.String(doc='Password to authenticate admin with AE5.') class-attribute instance-attribute
admin_username = param.String(doc='Username to authenticate admin with AE5.') class-attribute instance-attribute
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
hostname = param.String(doc='URL of the AE5 host.') class-attribute instance-attribute
k8s_endpoint = param.String(default='k8s') class-attribute instance-attribute
password = param.String(doc='Password to authenticate with AE5.') class-attribute instance-attribute
pool_size = param.Integer(default=100, doc='\n Size of HTTP socket pool.') class-attribute instance-attribute
private = param.Boolean(default=True, doc='\n Whether to limit the deployments visible to a user based on\n their authorization.') class-attribute instance-attribute
source_type = 'ae5' class-attribute instance-attribute
username = param.String(doc='Username to authenticate with AE5.') class-attribute instance-attribute
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

base

DataFrame = pd.DataFrame | dDataFrame module-attribute

DataFrameTypes = (pd.DataFrame, dd.DataFrame) module-attribute

Series = pd.Series | dSeries module-attribute

BaseSQLSource

Bases: Source

The BaseSQLSource implements the additional API required by a SQL based data source.

dialect = 'any' class-attribute instance-attribute
excluded_tables = param.List(default=[], doc="\n List of table names that should be excluded from the results. Supports:\n - Fully qualified name: 'DATABASE.SCHEMA.TABLE'\n - Schema qualified name: 'SCHEMA.TABLE'\n - Table name only: 'TABLE'\n - Wildcards: 'SCHEMA.*'") class-attribute instance-attribute
load_schema = param.Boolean(default=True, doc='Whether to load the schema') class-attribute instance-attribute
table_params = param.Dict(default={}, doc="\n Dictionary mapping table names to SQL parameters.\n Parameters can be:\n - list: Positional parameters for placeholder (?) syntax\n - dict: Named parameters for :name, %(name)s, etc. syntax\n Each table maps to either a list or dict of parameters.\n Example: {'my_table': [2024, 'active'], 'other_table': {'year': 2024}}") class-attribute instance-attribute
create_sql_expr_source(tables, params=None, **kwargs)

Creates a new SQL Source given a set of table names and corresponding SQL expressions.

Arguments

tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax kwargs: any Additional keyword arguments.

Returns:

Name Type Description
source BaseSQLSource subclass
execute(sql_query, params=None, *args, **kwargs)

Executes a SQL query and returns the result as a DataFrame.

Arguments

sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query

Returns:

Type Description
DataFrame

The result as a pandas DataFrame

execute_async(sql_query, params=None, *args, **kwargs) async

Executes a SQL query asynchronously and returns the result as a DataFrame.

This default implementation runs the synchronous execute() method in a thread to avoid blocking the event loop. Subclasses can override this method to provide truly asynchronous implementations.

Arguments

sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query

Returns:

Type Description
DataFrame

The result as a pandas DataFrame

get_async(table, **query) async

Return a table asynchronously; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_schema(table=None, limit=None, shuffle=False)
get_sql_expr(table)

Returns the SQL expression corresponding to a particular table.

normalize_table(table)

Allows implementing table name normalization to allow fuzze matching of the table name for minor variations such as quoting differences.

DerivedSource

Bases: Source

DerivedSource applies filtering and transforms to tables from other sources.

A DerivedSource references tables on other sources and optionally allows applying filters and transforms to the returned data which is then made available as a new (derived) table.

The DerivedSource has two modes:

Table Mode

When an explicit tables specification is provided full control over the exact tables to filter and transform is available. This is referred to as the 'table' mode.

In 'table' mode the tables can reference any table on any source using the reference syntax and declare filters and transforms to apply to that specific table, e.g. a table specification might look like this::

{
  'derived_table': {
    'source': 'original_source',
    'table': 'original_table'
    'filters': [
      ...
    ],
    'transforms': [
      ...
    ]
  }
}

Mirror mode

When a source is declared all tables on that Source are mirrored and filtered and transformed according to the supplied filters and transforms. This is referred to as 'mirror' mode.

In mirror mode the DerivedSource may reference an existing source directly, e.g.::

{
    'type': 'derived',
    'source': 'original_source',
    'filters': [...],
    'transforms': [...],
}
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
filters = param.List(doc='\n A list of filters to apply to all tables of this source.') class-attribute instance-attribute
source = param.ClassSelector(class_=Source, doc='\n A source to mirror the tables on.') class-attribute instance-attribute
source_type = 'derived' class-attribute
tables = param.Dict(default={}, doc='\n The dictionary of tables and associated filters and transforms.') class-attribute instance-attribute
transforms = param.List(doc='\n A list of transforms to apply to all tables of this source.') class-attribute instance-attribute
clear_cache()
get(table, **query)
get_tables()

FileSource

Bases: Source

FileSource loads CSV, Excel and Parquet files using pandas and dask read_* functions.

The FileSource can declare a list or dictionary of local or remote files which are then loaded using either pandas.read_* or dask.dataframe.read_* functions depending on whether use_dask is enabled.

dask = param.Boolean(default=False, doc='\n Whether to return a Dask dataframe.') class-attribute instance-attribute
kwargs = param.Dict(doc='\n Keyword arguments to the pandas/dask loading function.') class-attribute instance-attribute
source_type = 'file' class-attribute
tables = param.ClassSelector(class_=(list, dict), doc="\n List or dictionary of tables to load. If a list is supplied the\n names are computed from the filenames, otherwise the keys are\n the names. The values must filepaths or URLs to the data:\n\n ```\n {\n 'local' : '/home/user/local_file.csv',\n 'remote': 'https://test.com/test.csv'\n }\n ```\n\n if the filepath does not have a declared extension an extension\n may be provided in a list or tuple, e.g.:\n\n ```\n {'table': ['http://test.com/api', 'json']}\n ```\n ") class-attribute instance-attribute
use_dask = param.Boolean(default=True, doc='\n Whether to use dask to load files.') class-attribute instance-attribute
get(table, **query)
get_tables()

InMemorySource

Bases: Source

InMemorySource can be used to work with in-memory data.

tables = param.Dict(default={}) class-attribute instance-attribute
add_table(name, table)
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

JSONSource

Bases: FileSource

The JSONSource is very similar to the FileSource but loads json files.

Both local and remote JSON files can be fetched by declaring them as a list or dictionaries of tables.

cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
chunk_size = param.Integer(default=0, doc='\n Number of items to load per chunk if a template variable\n is provided.') class-attribute instance-attribute
source_type = 'json' class-attribute
tables = param.ClassSelector(class_=(list, dict), doc="\n List or dictionary of tables to load. If a list is supplied the\n names are computed from the filenames, otherwise the keys are\n the names. The values must filepaths or URLs to the data:\n\n ```\n {\n 'local' : '/home/user/local_file.csv',\n 'remote': 'https://test.com/test.csv'\n }\n ```\n ") class-attribute instance-attribute

JoinedSource

Bases: Source

JoinedSource performs a join on tables from one or more sources.

A JoinedSource applies a join on two or more sources returning new table(s) with data from all sources. It iterates over the tables specification and merges the specified tables from the declared sources on the supplied index.

In this way multiple tables from multiple sources can be merged. Individual tables from sources that should not be joined may also be surfaced by declaring a single source and table in the specification.

As a simple example we may have sources A and B, which contain tables 'foo' and 'bar' respectively. We now want to merge these tables on column 'a' in Table A with column 'b' in Table B::

{'new_table': [
  {'source': 'A', 'table': 'foo', 'index': 'a'},
  {'source': 'B', 'table': 'bar', 'index': 'b'}
]}

The joined source will now publish the "new_table" with all columns from tables "foo" and "bar" except for the index column from table "bar", which was merged with the index column "a" from table "foo".

panel property
source_type = 'join' class-attribute
sources = param.ClassSelector(class_=(list, dict), doc='\n A dictionary of sources indexed by their assigned name.') class-attribute instance-attribute
tables = param.Dict(default={}, doc='\n A dictionary with the names of the joined sources as keys\n and a specification of the source, table and index to merge\n on.\n\n ```\n {"new_table": [\n {\'source\': <source_name>,\n \'table\': <table_name>,\n \'index\': <index_name>\n },\n {\'source\': <source_name>,\n \'table\': <table_name>,\n \'index\': <index_name>\n },\n ...\n ]}\n ```\n ') class-attribute instance-attribute
clear_cache()
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

PanelSessionSource

Bases: Source

" PanelSessionSource queries the session_info endpoint of a Panel application.

Panel applications with --rest-session-info enabled can be queried about session statistics. This source makes this data available to Lumen for monitoring.

cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
endpoint = param.String(default='rest/session_info') class-attribute instance-attribute
source_type = 'session_info' class-attribute
timeout = param.Parameter(default=5) class-attribute instance-attribute
urls = param.List(doc='URL of the websites to monitor.') class-attribute instance-attribute
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

RESTSource

Bases: Source

RESTSource allows querying REST endpoints conforming to the Lumen REST specification.

The url must offer two endpoints, the /data endpoint must return data in a records format while the /schema endpoint must return a valid Lumen JSON schema.

source_type = 'rest' class-attribute
url = param.String(doc='URL of the REST endpoint to monitor.') class-attribute instance-attribute
get(table, **query)
get_async(table, **query) async

Return a table asynchronously; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A pandas DataFrame containing the queried table.

get_schema(table=None, limit=None, shuffle=False)

Source

Bases: MultiTypeComponent

Source components provide allow querying all kinds of data.

A Source can return one or more tables queried using the .get_tables method, a description of the data returned by each table in the form of a JSON schema accessible via the .get_schema method and lastly a .get method that allows filtering the data.

The Source base class also implements both in-memory and disk caching which can be enabled if a cache_dir is provided. Data cached to disk is stored as parquet files.

cache_data = param.Boolean(default=True, doc='\n Whether to cache actual data.') class-attribute instance-attribute
cache_dir = param.String(default=None, doc='\n Whether to enable local cache and write file to disk.') class-attribute instance-attribute
cache_metadata = param.Boolean(default=True, doc='\n Whether to cache metadata.') class-attribute instance-attribute
cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
cache_schema = param.Boolean(default=True, doc='\n Whether to cache table schemas.') class-attribute instance-attribute
cache_with_dask = param.Boolean(default=True, doc='\n Whether to read and write cache files with dask if available.') class-attribute instance-attribute
metadata = param.Dict(default={}, doc='\n Optional metadata about the source tables. Should follow the format:\n {"table_name": {"description": ..., "columns": {"column_name": "..."}}}') class-attribute instance-attribute
metadata_func = param.Callable(default=None, doc='\n Function to implement custom metadata lookup for tables.\n Given a list of tables it should return a dictionary of the form:\n\n {\n <table>: {"description": ..., "columns": {"column_name": "..."}}\n }\n\n May be used to override the default _get_table_metadata\n implementation of the Source.') class-attribute instance-attribute
panel property

A Source can return a Panel object which displays information about the Source or controls how the Source queries data.

root = param.ClassSelector(class_=Path, precedence=(-1), doc='\n Root folder of the cache_dir, default is config.root') class-attribute instance-attribute
shared = param.Boolean(default=False, doc='\n Whether the Source can be shared across all instances of the\n dashboard. If set to `True` the Source will be loaded on\n initial server load.') class-attribute instance-attribute
source_type = None class-attribute
clear_cache(*events)

Clears any cached data.

from_spec(spec) classmethod

Creates a Source object from a specification. If a Source specification references other sources these may be supplied in the sources dictionary and be referenced by name.

Parameters:

Name Type Description Default
spec dict or str

Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary.

required

Returns:

Type Description
Resolved and instantiated Source object
get(table, **query)

Return a table; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_async(table, **query) async

Return a table asynchronously; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_metadata(table)

Returns metadata for one, multiple or all tables provided by the source.

The metadata for a table is structured as:

{ "description": ..., "columns": { : { "description": ..., "data_type": ..., } }, **other_metadata }

If a list of tables or no table is provided the metadata is nested one additional level:

{ "table_name": { { "description": ..., "columns": { : { "description": ..., "data_type": ..., } }, **other_metadata } } }

Parameters:

Name Type Description Default
table str | list[str] | None

The name of the table to return the schema for. If None returns schema for all available tables.

required

Returns:

Name Type Description
metadata dict

Dictionary of metadata indexed by table (if no table was was provided or individual table metdata.

get_schema(table=None, limit=None, shuffle=False)

Returns JSON schema describing the tables returned by the Source.

Parameters:

Name Type Description Default
table str | None

The name of the table to return the schema for. If None returns schema for all available tables.

None
limit int | None

Limits the number of rows considered for the schema calculation

None

Returns:

Type Description
dict

JSON schema(s) for one or all the tables.

get_tables()

Returns the list of tables available on this source.

Returns:

Type Description
list

The list of available tables on this source.

validate(spec, context=None) classmethod

WebsiteSource

Bases: Source

WebsiteSource queries whether a website responds with a 400 status code.

cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
source_type = 'live' class-attribute
urls = param.List(doc='URLs of the websites to monitor.') class-attribute instance-attribute
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

cached(method, locks=None)

Adds caching to a Source.get query.

Returns:

Type Description
Returns method wrapped in caching functionality.

cached_metadata(method, locks=None)

cached_schema(method, locks=None)

bigquery

BigQuerySource

Bases: BaseSQLSource

BigQuerySource provides access to a Google BigQuery project.

datasets = param.List(default=None, doc='List of datasets to include') class-attribute instance-attribute
dialect = 'bigquery' class-attribute instance-attribute
filter_in_sql = param.Boolean(default=True, doc='Whether to apply filters in SQL or in-memory.') class-attribute instance-attribute
location = param.String(doc='Location where the project resides.') class-attribute instance-attribute
project_id = param.String(doc="The Google Cloud's project ID.") class-attribute instance-attribute
tables = param.ClassSelector(class_=(list, dict), doc='\n A list of tables or a dictionary mapping from table name to a SQL query.') class-attribute instance-attribute
close()

Close the BigQuery client connections, releasing associated resources.

This method should be called when the source is no longer needed to prevent connection leaks and properly clean up resources.

create_sql_expr_source(tables, params=None, **kwargs)

Creates a new SQL Source given a set of table names and corresponding SQL expressions.

Arguments

tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax kwargs: any Additional keyword arguments.

Returns:

Name Type Description
source BigQuerySource
execute(sql_query, params=None, *args, **kwargs)

Executes a SQL query and returns the result as a DataFrame.

Arguments

sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query

Returns:

Type Description
DataFrame

The result as a pandas DataFrame

execute_async(sql_query, params=None, *args, **kwargs) async

Executes a SQL query asynchronously and returns the result as a DataFrame.

This implementation runs queries asynchronously using BigQuery's job API.

Arguments

sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query

Returns:

Type Description
DataFrame

The result as a pandas DataFrame

get(table, **query)
get_async(table, **query) async

Retrieve a table from BigQuery asynchronously with optional filtering.

Parameters:

Name Type Description Default
table str

The table name to query

required
**query dict

Query parameters and filters to apply

{}

Returns:

Type Description
DataFrame

The filtered table data as a pandas DataFrame

get_schema(table=None, limit=None, shuffle=False)

Determine the schema of the given table.

This method overrides the inherited get_schema from the base class Source. The reason why we override

Parameters:

Name Type Description Default
table str | None

The name of the table. Must be in the form: f"{project_id}.{dataset_id}.{table_id}" or reference a table in the tables dictionary.

None
limit int | None

The maximum number of rows to sample from the table.

None
shuffle bool

Whether to shuffle the rows of the table.

False

Returns:

Type Description
dict[str, dict[str, Any]] | dict[str, Any]
get_sql_expr(table)
get_tables()

Get a list of available tables for the project.

Returns:

Type Description
list[str]

Table names are composed of f"{project_id}.{dataset_id}.{table_id}".

duckdb

DuckDBSource

Bases: BaseSQLSource

DuckDBSource provides a simple wrapper around the DuckDB SQL connector.

To specify tables to be queried provide a list or dictionary of tables. A SQL expression to fetch the data from the table will then be generated using the sql_expr, e.g. if we specify a local table flights.db the default sql_expr SELECT * FROM {table} will expand that to SELECT * FROM flights.db. If you want to specify a full SQL expression as a table you must change the sql_expr to '{table}' ensuring no further templating is applied.

Note that certain functionality in DuckDB requires modules to be loaded before making a query. These can be specified using the initializers parameter, providing the ability to define DuckDb statements to be run when initializing the connection.

connection property
dialect = 'duckdb' class-attribute instance-attribute
ephemeral = param.Boolean(default=False, doc='\n Whether the data is ephemeral, i.e. manually inserted into the\n DuckDB table or derived from real data.') class-attribute instance-attribute
filter_in_sql = param.Boolean(default=True, doc='\n Whether to apply filters in SQL or in-memory.') class-attribute instance-attribute
initializers = param.List(default=[], doc='\n SQL statements to run to initialize the connection.') class-attribute instance-attribute
load_schema = param.Boolean(default=True, doc='Whether to load the schema') class-attribute instance-attribute
mirrors = param.Dict(default={}, doc='\n Mirrors the tables into the DuckDB database. The mirrors\n should define a mapping from the table names to the source of\n the mirror which may be defined as a Pipeline or a tuple of\n the source and the table.') class-attribute instance-attribute
source_type = 'duckdb' class-attribute instance-attribute
sql_expr = param.String(default='SELECT * FROM {table}', doc='\n The SQL expression to execute.') class-attribute instance-attribute
table_params = param.Dict(default={}, doc='\n Dictionary mapping table names to lists of SQL parameters.\n Parameters are used with placeholders (?) in SQL expressions.') class-attribute instance-attribute
tables = param.ClassSelector(class_=(list, dict), doc='\n List or dictionary of tables.') class-attribute instance-attribute
uri = param.String(doc='The URI of the DuckDB database') class-attribute instance-attribute
close()

Close the DuckDB connection, releasing associated resources.

This method should be called when the source is no longer needed to prevent connection leaks and properly clean up server-side resources.

create_sql_expr_source(tables, materialize=True, params=None, **kwargs)

Creates a new SQL Source given a set of table names and corresponding SQL expressions.

Arguments

tables: dict[str, str] Mapping from table name to SQL expression. materialize: bool Whether to materialize new tables params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for ? placeholders - dict: Named parameters for $param_name placeholders kwargs: any Additional keyword arguments.

Returns:

Name Type Description
source DuckDBSource
execute(sql_query, params=None, *args, **kwargs)
from_df(tables, **kwargs) classmethod

Creates an ephemeral, in-memory DuckDBSource containing the supplied dataframe.

Arguments

tables: dict[str, pandas.DataFrame] A dictionary mapping from table names to DataFrames kwargs: any Additional keyword arguments for the source

Returns:

Name Type Description
source DuckDBSource
from_spec(spec) classmethod
get(table, **query)
get_tables()
normalize_table(table)
to_spec(context=None)

intake

IntakeBaseSource

Bases: Source

cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
load_schema = param.Boolean(default=True, doc='\n Whether to load the schema') class-attribute instance-attribute
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

IntakeSource

Bases: IntakeBaseSource

An IntakeSource loads data from an Intake catalog.

Intake is a lightweight set of tools for loading and sharing data in data science projects using convenient catalog specifications.

The IntakeSource can be given a dictionary catalog specification OR a URI pointing to a catalog.yaml file on disk.

cat = intake.open_catalog(self.uri) instance-attribute
catalog = param.Dict(doc='An inlined Catalog specification.') class-attribute instance-attribute
dask = param.Boolean(default=False, doc='\n Whether to return a dask DataFrame.') class-attribute instance-attribute
source_type = 'intake' class-attribute instance-attribute
uri = param.String(doc='URI of the catalog file.') class-attribute instance-attribute

intake_dremio

IntakeBaseDremioSource

Bases: IntakeBaseSQLSource

Base class with common parameters for Dremio sources.

cert = param.String(default='Path to certificate file') class-attribute instance-attribute
dask = param.Boolean(default=False, doc='\n Whether to return a dask DataFrame.') class-attribute instance-attribute
dialect = 'dremio' class-attribute instance-attribute
password = param.String(default=None, doc='Dremio password or token') class-attribute instance-attribute
tls = param.Boolean(default=False, doc='Enable encryption') class-attribute instance-attribute
uri = param.String(doc='URI of the Dremio server.') class-attribute instance-attribute
username = param.String(default=None, doc='Dremio username') class-attribute instance-attribute
create_sql_expr_source(tables, **kwargs)

Creates a new SQL Source given a set of table names and corresponding SQL expressions.

normalize_table(table)

IntakeDremioSQLSource

Bases: IntakeBaseDremioSource

IntakeDremioSQLSource allows querying a subset of Dremio catalog tables and views via custom SQL expressions.

When provided with the uri of the Dremio server and credentials to authenticate with the Dremio instance, unlike IntakeDremioSource, only the tables specified in the tables parameter can be used.

Requires the intake-dremio package to be installed.

cat = {table: (DremioSource(sql_expr=sql_expr, cert=(self.cert), uri=(self.uri), tls=(self.tls), username=(self.username), password=(self.password))) for table, sql_expr in (self.tables.items())} instance-attribute
source_type = 'intake_dremio_sql' class-attribute instance-attribute
tables = param.Dict(default={}, doc='\n Mapping of table names to desired SQL expressions') class-attribute instance-attribute

IntakeDremioSource

Bases: IntakeBaseDremioSource

IntakeDremioSource allows querying Dremio catalog tables and views.

When provided with the uri of the Dremio server and credentials to authenticate with the Dremio instance all available tables can be queried via this Source.

Requires the intake-dremio package to be installed.

cat = DremioCatalog(self.uri, cert=(self.cert), tls=(self.tls), username=(self.username), password=(self.password)) instance-attribute
source_type = 'intake_dremio' class-attribute instance-attribute

intake_sql

IntakeBaseSQLSource

Bases: BaseSQLSource, IntakeBaseSource

cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
filter_in_sql = param.Boolean(default=True, doc='') class-attribute instance-attribute
get(table, **query)

Applies SQL Transforms, creating new temp catalog on the fly and querying the database.

get_schema(table=None, limit=None, shuffle=False)
get_sql_expr(table)

IntakeSQLSource

Bases: IntakeBaseSQLSource, IntakeSource

IntakeSQLSource extends the IntakeSource with support for SQL data.

In addition to the standard intake support for reading catalogs the IntakeSQLSource computes the schema by querying the database instead of loading all the data into memory and allows for SQLTransform to be applied when querying the SQL database.

source_type = 'intake_sql' class-attribute instance-attribute

prometheus

PrometheusSource

Bases: Source

PrometheusSource allows querying Prometheus PromQL endpoints.

The PrometheusSource is configured to return timeseries about CPU, memory and network usage as well as restarts for a list of Kubernetes pods specified by ID.

ae5_source = param.Parameter(doc='\n An AE5Source instance to use for querying.') class-attribute instance-attribute
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute
ids = param.List(default=[], doc='\n List of pod IDs to query.') class-attribute instance-attribute
metrics = param.List(default=['memory_usage', 'cpu_usage', 'restarts', 'network_receive_bytes', 'network_transmit_bytes'], doc='Names of metric queries to execute') class-attribute instance-attribute
panel property
period = param.String(default='3h', doc="\n Period to query over specified as a string. Supports:\n\n - Week: '1w'\n - Day: '1d'\n - Hour: '1h'\n - Minute: '1m'\n - Second: '1s'\n ") class-attribute instance-attribute
promql_api = param.String(doc='\n Name of the AE5 deployment exposing the Prometheus API') class-attribute instance-attribute
samples = param.Integer(default=200, doc='\n Number of samples in the selected period to query. May be\n overridden by explicit step value.') class-attribute instance-attribute
shared = param.Boolean(default=False, readonly=True, doc='\n PrometheusSource cannot be shared because it has per-user state.') class-attribute instance-attribute
source_type = 'prometheus' class-attribute instance-attribute
step = param.String(doc='\n Step value to use in PromQL query_range query.') class-attribute instance-attribute
get(table, **query)
get_schema(table=None, limit=None, shuffle=False)
get_tables()

snowflake

SnowflakeSource

Bases: BaseSQLSource

SnowflakeSource uses the snowflake-python-connector library to load data from Snowflake.

account = param.String(default=None, doc='\n The account identifier to connect to.') class-attribute instance-attribute
authenticator = param.Selector(default=None, objects=['externalbrowser', 'oauth', 'snowflake', 'username_password_mfa', 'SNOWFLAKE_JWT'], doc='\n The authentication approach to use.', allow_None=True) class-attribute instance-attribute
conn_kwargs = param.Dict(default={}, doc='\n Additional connection parameters to pass to the Snowflake connector.') class-attribute instance-attribute
database = param.String(default=None, doc='\n The database to connect to.') class-attribute instance-attribute
dialect = 'snowflake' class-attribute instance-attribute
excluded_tables = param.List(default=[], doc='\n List of table names that should be excluded from the results.\n The items can be fully qualified (database.schema.table), partially\n qualified (schema.table), simply table names, or wildcards\n (e.g. database.schema.*).') class-attribute instance-attribute
filter_in_sql = param.Boolean(default=True, doc='\n Whether to apply filters in SQL or in-memory.') class-attribute instance-attribute
host = param.String(default=None, doc='\n The host to authenticate with.') class-attribute instance-attribute
paramstyle = param.Selector(default='qmark', objects=['qmark', 'numeric', 'format', 'pyformat'], doc='\n The paramstyle to use for SQL queries.') class-attribute instance-attribute
password = param.String(default=None, doc='\n The password to authenticate with (if authenticator is set to "snowflake").') class-attribute instance-attribute
private_key = param.ClassSelector(default=None, class_=(str, bytes, Path), doc='\n The path or contents of the private key file.') class-attribute instance-attribute
private_key_password = param.String(default=None, doc='\n The password to decrypt the private key file.') class-attribute instance-attribute
schema = param.String(default=None, doc='\n The database schema to load data from.') class-attribute instance-attribute
schema_timeout_seconds = param.Integer(default=600, doc='\n Timeout in seconds for schema retrieval. If None, no timeout is applied.') class-attribute instance-attribute
sql_expr = param.String(default='SELECT * FROM {table}', doc='\n The SQL expression to execute.') class-attribute instance-attribute
tables = param.ClassSelector(class_=(list, dict), doc='\n List or dictionary of tables.') class-attribute instance-attribute
token = param.String(default=None, doc='\n The OAuth token if authenticator is set to "oauth".') class-attribute instance-attribute
user = param.String(default=None, doc='\n The user to authenticate as.') class-attribute instance-attribute
warehouse = param.String(default=None, doc='\n The warehouse to connect to.') class-attribute instance-attribute
close()

Close the Snowflake connection and cursor, releasing associated resources.

This method should be called when the source is no longer needed to prevent connection leaks and properly clean up server-side resources.

create_sql_expr_source(tables, params=None, **kwargs)

Creates a new SQL Source given a set of table names and corresponding SQL expressions.

Arguments

tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters (for pyformat/format paramstyle) kwargs: any Additional keyword arguments.

Returns:

Name Type Description
source SnowflakeSource
execute(sql_query, params=None, *args, **kwargs)
execute_async(sql_query, params=None, *args, **kwargs) async

Execute a Snowflake SQL query asynchronously and return the result as a DataFrame.

Parameters:

Name Type Description Default
sql_query str

The SQL query to execute

required
params list | dict | None

Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax (qmark paramstyle) - dict: Named parameters (for pyformat/format paramstyle) - None: No parameters

None
*args tuple

Additional positional arguments to pass to the query

()
**kwargs dict

Keyword arguments to pass to the query

{}

Returns:

Type Description
DataFrame

The query result as a pandas DataFrame with supported dtypes

get(table, **query)
get_async(table, **query) async

Retrieve data from a table asynchronously using the same query logic as get().

Parameters:

Name Type Description Default
table str

The name of the table to query

required
**query dict

Query parameters and filters to apply

{}

Returns:

Type Description
DataFrame

The query result as a pandas DataFrame

get_schema(table=None, limit=None, shuffle=False)

Returns JSON schema describing the tables returned by the Source.

Parameters:

Name Type Description Default
table str | None

The name of the table to return the schema for. If None returns schema for all available tables.

None
limit int | None

Limits the number of rows considered for the schema calculation

None
shuffle bool

Whether to shuffle the rows when sampling

False

Returns:

Type Description
dict

JSON schema(s) for one or all the tables.

get_tables()
resolve_private_key()

Converts a PEM encoded private key into a DER binary key.

Returns:

Type Description
bytes or None

DER encoded key if private_key has been provided otherwise returns None.

Raises:

Type Description
InvalidPemFormat

If private key is not in PEM format.

sqlalchemy

DataFrame = pd.DataFrame module-attribute

SQLAlchemySource

Bases: BaseSQLSource

SQLAlchemySource uses SQLAlchemy to connect to various SQL databases.

Supports both synchronous and asynchronous database drivers through SQLAlchemy's engine system. Can connect to PostgreSQL, MySQL, SQLite, Oracle, MSSQL, and more.

async_drivers = {'postgresql+asyncpg', 'mysql+asyncmy', 'mysql+aiomysql', 'sqlite+aiosqlite', 'oracle+oracledb_async'} class-attribute instance-attribute
connect_args = param.Dict(default={}, doc="\n Additional keyword arguments passed to the DBAPI's connect() method.") class-attribute instance-attribute
database = param.String(default=None, doc='\n The database name to connect to.') class-attribute instance-attribute
dialect property

Detect and return the database dialect name.

drivername = param.String(default=None, doc="\n The driver name (e.g., 'postgresql+psycopg2', 'mysql+pymysql', 'sqlite').") class-attribute instance-attribute
engine_kwargs = param.Dict(default={}, doc='\n Additional keyword arguments passed to create_engine().') class-attribute instance-attribute
excluded_tables = param.List(default=[], doc="\n List of table name patterns to exclude from the results.\n Supports wildcards (e.g., 'schema.table*', 'temp_*').") class-attribute instance-attribute
filter_in_sql = param.Boolean(default=True, doc='\n Whether to apply filters in SQL or in-memory.') class-attribute instance-attribute
host = param.String(default=None, doc='\n The database host address.') class-attribute instance-attribute
password = param.String(default=None, doc='\n The password for database authentication.') class-attribute instance-attribute
port = param.Integer(default=None, doc='\n The database port number.') class-attribute instance-attribute
query_params = param.Dict(default=None, doc='\n Additional query parameters for the connection URL.') class-attribute instance-attribute
schema = param.String(default=None, doc='\n The default schema to use for queries.') class-attribute instance-attribute
schema_timeout_seconds = param.Integer(default=600, doc='\n Timeout in seconds for schema retrieval operations.') class-attribute instance-attribute
source_type = 'sqlalchemy' class-attribute instance-attribute
sql_expr = param.String(default='SELECT * FROM {table}', doc='\n The SQL expression template to execute.') class-attribute instance-attribute
tables = param.ClassSelector(class_=(list, dict), doc='\n List or dictionary of tables to expose.') class-attribute instance-attribute
url = param.String(default=None, doc="\n SQLAlchemy database URL string (e.g., 'postgresql://user:pass@host:port/db').") class-attribute instance-attribute
username = param.String(default=None, doc='\n The username for database authentication.') class-attribute instance-attribute
close()

Close the database connection and dispose of the engine.

This method should be called when the source is no longer needed to prevent connection leaks and properly clean up server-side resources.

create_sql_expr_source(tables, params=None, **kwargs)

Creates a new SQL Source given a set of table names and corresponding SQL expressions.

Arguments

tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax kwargs: any Additional keyword arguments.

Returns:

Name Type Description
source SQLAlchemySource
execute(sql_query, params=None, *args, **kwargs)

Executes a SQL query and returns the result as a DataFrame.

Arguments

sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query

Returns:

Type Description
DataFrame

The result as a pandas DataFrame

execute_async(sql_query, params=None, *args, **kwargs) async

Executes a SQL query asynchronously and returns the result as a DataFrame.

This default implementation runs the synchronous execute() method in a thread to avoid blocking the event loop. Subclasses can override this method to provide truly asynchronous implementations.

Arguments

sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query

Returns:

Type Description
DataFrame

The result as a pandas DataFrame

get(table, **query)

Retrieve data from a table with optional filtering.

get_async(table, **query) async

Return a table asynchronously; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_schema(table=None, limit=None, shuffle=False)

Returns JSON schema describing the tables returned by the Source.

Parameters:

Name Type Description Default
table str | None

The name of the table to return the schema for. If None returns schema for all available tables.

None
limit int | None

Limits the number of rows considered for the schema calculation

None
shuffle bool

Whether to shuffle rows when sampling

False

Returns:

Type Description
dict

JSON schema(s) for one or all the tables.

get_tables()

Return the list of available tables.