Sources¶
lumen.sources
¶
ae5
¶
AE5Source
¶
Bases:
The AE5Source queries an Anaconda Enterprise 5 instance for statistics.
Specifically it provides tables with information nodes, deployments, sessions, jobs and resource profiles.
admin_password = param.String(doc='Password to authenticate admin with AE5.')
class-attribute
instance-attribute
¶
admin_username = param.String(doc='Username to authenticate admin with AE5.')
class-attribute
instance-attribute
¶
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
hostname = param.String(doc='URL of the AE5 host.')
class-attribute
instance-attribute
¶
k8s_endpoint = param.String(default='k8s')
class-attribute
instance-attribute
¶
password = param.String(doc='Password to authenticate with AE5.')
class-attribute
instance-attribute
¶
pool_size = param.Integer(default=100, doc='\n Size of HTTP socket pool.')
class-attribute
instance-attribute
¶
private = param.Boolean(default=True, doc='\n Whether to limit the deployments visible to a user based on\n their authorization.')
class-attribute
instance-attribute
¶
source_type = 'ae5'
class-attribute
instance-attribute
¶
username = param.String(doc='Username to authenticate with AE5.')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_schema(table=None, limit=None, shuffle=False)
¶
get_tables()
¶
base
¶
DataFrame = pd.DataFrame | dDataFrame
module-attribute
¶
DataFrameTypes = (pd.DataFrame, dd.DataFrame)
module-attribute
¶
Series = pd.Series | dSeries
module-attribute
¶
BaseSQLSource
¶
Bases:
The BaseSQLSource implements the additional API required by a SQL based data source.
dialect = 'any'
class-attribute
instance-attribute
¶
excluded_tables = param.List(default=[], doc="\n List of table names that should be excluded from the results. Supports:\n - Fully qualified name: 'DATABASE.SCHEMA.TABLE'\n - Schema qualified name: 'SCHEMA.TABLE'\n - Table name only: 'TABLE'\n - Wildcards: 'SCHEMA.*'")
class-attribute
instance-attribute
¶
load_schema = param.Boolean(default=True, doc='Whether to load the schema')
class-attribute
instance-attribute
¶
table_params = param.Dict(default={}, doc="\n Dictionary mapping table names to SQL parameters.\n Parameters can be:\n - list: Positional parameters for placeholder (?) syntax\n - dict: Named parameters for :name, %(name)s, etc. syntax\n Each table maps to either a list or dict of parameters.\n Example: {'my_table': [2024, 'active'], 'other_table': {'year': 2024}}")
class-attribute
instance-attribute
¶
create_sql_expr_source(tables, params=None, **kwargs)
¶
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
Arguments
tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax kwargs: any Additional keyword arguments.
Returns:
| Name | Type | Description |
|---|---|---|
source |
BaseSQLSource subclass
|
|
execute(sql_query, params=None, *args, **kwargs)
¶
Executes a SQL query and returns the result as a DataFrame.
Arguments
sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query
Returns:
| Type | Description |
|---|---|
|
The result as a pandas DataFrame |
execute_async(sql_query, params=None, *args, **kwargs)
async
¶
Executes a SQL query asynchronously and returns the result as a DataFrame.
This default implementation runs the synchronous execute() method in a thread to avoid blocking the event loop. Subclasses can override this method to provide truly asynchronous implementations.
Arguments
sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query
Returns:
| Type | Description |
|---|---|
|
The result as a pandas DataFrame |
get_async(table, **query)
async
¶
Return a table asynchronously; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the queried table. |
get_schema(table=None, limit=None, shuffle=False)
¶
get_sql_expr(table)
¶
Returns the SQL expression corresponding to a particular table.
normalize_table(table)
¶
Allows implementing table name normalization to allow fuzze matching of the table name for minor variations such as quoting differences.
DerivedSource
¶
Bases:
DerivedSource applies filtering and transforms to tables from other sources.
A DerivedSource references tables on other sources and optionally allows applying filters and transforms to the returned data which is then made available as a new (derived) table.
The DerivedSource has two modes:
Table Mode
When an explicit tables specification is provided full control
over the exact tables to filter and transform is available. This
is referred to as the 'table' mode.
In 'table' mode the tables can reference any table on any source using the reference syntax and declare filters and transforms to apply to that specific table, e.g. a table specification might look like this::
{
'derived_table': {
'source': 'original_source',
'table': 'original_table'
'filters': [
...
],
'transforms': [
...
]
}
}
Mirror mode
When a source is declared all tables on that Source are mirrored
and filtered and transformed according to the supplied filters
and transforms. This is referred to as 'mirror' mode.
In mirror mode the DerivedSource may reference an existing source directly, e.g.::
{
'type': 'derived',
'source': 'original_source',
'filters': [...],
'transforms': [...],
}
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
filters = param.List(doc='\n A list of filters to apply to all tables of this source.')
class-attribute
instance-attribute
¶
source = param.ClassSelector(class_=Source, doc='\n A source to mirror the tables on.')
class-attribute
instance-attribute
¶
source_type = 'derived'
class-attribute
¶
tables = param.Dict(default={}, doc='\n The dictionary of tables and associated filters and transforms.')
class-attribute
instance-attribute
¶
transforms = param.List(doc='\n A list of transforms to apply to all tables of this source.')
class-attribute
instance-attribute
¶
clear_cache()
¶
get(table, **query)
¶
get_tables()
¶
FileSource
¶
Bases:
FileSource loads CSV, Excel and Parquet files using pandas and dask read_* functions.
The FileSource can declare a list or dictionary of local or
remote files which are then loaded using either pandas.read_* or
dask.dataframe.read_* functions depending on whether use_dask
is enabled.
dask = param.Boolean(default=False, doc='\n Whether to return a Dask dataframe.')
class-attribute
instance-attribute
¶
kwargs = param.Dict(doc='\n Keyword arguments to the pandas/dask loading function.')
class-attribute
instance-attribute
¶
source_type = 'file'
class-attribute
¶
tables = param.ClassSelector(class_=(list, dict), doc="\n List or dictionary of tables to load. If a list is supplied the\n names are computed from the filenames, otherwise the keys are\n the names. The values must filepaths or URLs to the data:\n\n ```\n {\n 'local' : '/home/user/local_file.csv',\n 'remote': 'https://test.com/test.csv'\n }\n ```\n\n if the filepath does not have a declared extension an extension\n may be provided in a list or tuple, e.g.:\n\n ```\n {'table': ['http://test.com/api', 'json']}\n ```\n ")
class-attribute
instance-attribute
¶
use_dask = param.Boolean(default=True, doc='\n Whether to use dask to load files.')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_tables()
¶
InMemorySource
¶
JSONSource
¶
Bases:
The JSONSource is very similar to the FileSource but loads json files.
Both local and remote JSON files can be fetched by declaring them
as a list or dictionaries of tables.
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
chunk_size = param.Integer(default=0, doc='\n Number of items to load per chunk if a template variable\n is provided.')
class-attribute
instance-attribute
¶
source_type = 'json'
class-attribute
¶
tables = param.ClassSelector(class_=(list, dict), doc="\n List or dictionary of tables to load. If a list is supplied the\n names are computed from the filenames, otherwise the keys are\n the names. The values must filepaths or URLs to the data:\n\n ```\n {\n 'local' : '/home/user/local_file.csv',\n 'remote': 'https://test.com/test.csv'\n }\n ```\n ")
class-attribute
instance-attribute
¶
JoinedSource
¶
Bases:
JoinedSource performs a join on tables from one or more sources.
A JoinedSource applies a join on two or more sources returning
new table(s) with data from all sources. It iterates over the
tables specification and merges the specified tables from the
declared sources on the supplied index.
In this way multiple tables from multiple sources can be merged. Individual tables from sources that should not be joined may also be surfaced by declaring a single source and table in the specification.
As a simple example we may have sources A and B, which contain tables 'foo' and 'bar' respectively. We now want to merge these tables on column 'a' in Table A with column 'b' in Table B::
{'new_table': [
{'source': 'A', 'table': 'foo', 'index': 'a'},
{'source': 'B', 'table': 'bar', 'index': 'b'}
]}
The joined source will now publish the "new_table" with all columns from tables "foo" and "bar" except for the index column from table "bar", which was merged with the index column "a" from table "foo".
panel
property
¶
source_type = 'join'
class-attribute
¶
sources = param.ClassSelector(class_=(list, dict), doc='\n A dictionary of sources indexed by their assigned name.')
class-attribute
instance-attribute
¶
tables = param.Dict(default={}, doc='\n A dictionary with the names of the joined sources as keys\n and a specification of the source, table and index to merge\n on.\n\n ```\n {"new_table": [\n {\'source\': <source_name>,\n \'table\': <table_name>,\n \'index\': <index_name>\n },\n {\'source\': <source_name>,\n \'table\': <table_name>,\n \'index\': <index_name>\n },\n ...\n ]}\n ```\n ')
class-attribute
instance-attribute
¶
clear_cache()
¶
get(table, **query)
¶
get_schema(table=None, limit=None, shuffle=False)
¶
get_tables()
¶
PanelSessionSource
¶
Bases:
"
PanelSessionSource queries the session_info endpoint of a Panel application.
Panel applications with --rest-session-info enabled can be queried about session statistics. This source makes this data available to Lumen for monitoring.
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
endpoint = param.String(default='rest/session_info')
class-attribute
instance-attribute
¶
source_type = 'session_info'
class-attribute
¶
timeout = param.Parameter(default=5)
class-attribute
instance-attribute
¶
urls = param.List(doc='URL of the websites to monitor.')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_schema(table=None, limit=None, shuffle=False)
¶
get_tables()
¶
RESTSource
¶
Bases:
RESTSource allows querying REST endpoints conforming to the Lumen REST specification.
The url must offer two endpoints, the /data endpoint must
return data in a records format while the /schema endpoint must
return a valid Lumen JSON schema.
source_type = 'rest'
class-attribute
¶
url = param.String(doc='URL of the REST endpoint to monitor.')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_async(table, **query)
async
¶
Return a table asynchronously; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A pandas DataFrame containing the queried table. |
get_schema(table=None, limit=None, shuffle=False)
¶
Source
¶
Bases:
Source components provide allow querying all kinds of data.
A Source can return one or more tables queried using the
.get_tables method, a description of the data returned by each
table in the form of a JSON schema accessible via the .get_schema
method and lastly a .get method that allows filtering the data.
The Source base class also implements both in-memory and disk
caching which can be enabled if a cache_dir is provided. Data
cached to disk is stored as parquet files.
cache_data = param.Boolean(default=True, doc='\n Whether to cache actual data.')
class-attribute
instance-attribute
¶
cache_dir = param.String(default=None, doc='\n Whether to enable local cache and write file to disk.')
class-attribute
instance-attribute
¶
cache_metadata = param.Boolean(default=True, doc='\n Whether to cache metadata.')
class-attribute
instance-attribute
¶
cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
cache_schema = param.Boolean(default=True, doc='\n Whether to cache table schemas.')
class-attribute
instance-attribute
¶
cache_with_dask = param.Boolean(default=True, doc='\n Whether to read and write cache files with dask if available.')
class-attribute
instance-attribute
¶
metadata = param.Dict(default={}, doc='\n Optional metadata about the source tables. Should follow the format:\n {"table_name": {"description": ..., "columns": {"column_name": "..."}}}')
class-attribute
instance-attribute
¶
metadata_func = param.Callable(default=None, doc='\n Function to implement custom metadata lookup for tables.\n Given a list of tables it should return a dictionary of the form:\n\n {\n <table>: {"description": ..., "columns": {"column_name": "..."}}\n }\n\n May be used to override the default _get_table_metadata\n implementation of the Source.')
class-attribute
instance-attribute
¶
panel
property
¶
A Source can return a Panel object which displays information about the Source or controls how the Source queries data.
root = param.ClassSelector(class_=Path, precedence=(-1), doc='\n Root folder of the cache_dir, default is config.root')
class-attribute
instance-attribute
¶
shared = param.Boolean(default=False, doc='\n Whether the Source can be shared across all instances of the\n dashboard. If set to `True` the Source will be loaded on\n initial server load.')
class-attribute
instance-attribute
¶
source_type = None
class-attribute
¶
clear_cache(*events)
¶
Clears any cached data.
from_spec(spec)
classmethod
¶
Creates a Source object from a specification. If a Source specification references other sources these may be supplied in the sources dictionary and be referenced by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
|
Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary. |
required |
Returns:
| Type | Description |
|---|---|
Resolved and instantiated Source object
|
|
get(table, **query)
¶
Return a table; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the queried table. |
get_async(table, **query)
async
¶
Return a table asynchronously; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the queried table. |
get_metadata(table)
¶
Returns metadata for one, multiple or all tables provided by the source.
The metadata for a table is structured as:
{
"description": ...,
"columns": {
If a list of tables or no table is provided the metadata is nested one additional level:
{
"table_name": {
{
"description": ...,
"columns": {
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to return the schema for. If None returns schema for all available tables. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
metadata |
|
Dictionary of metadata indexed by table (if no table was was provided or individual table metdata. |
get_schema(table=None, limit=None, shuffle=False)
¶
Returns JSON schema describing the tables returned by the Source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to return the schema for. If None returns schema for all available tables. |
None
|
limit
|
|
Limits the number of rows considered for the schema calculation |
None
|
Returns:
| Type | Description |
|---|---|
|
JSON schema(s) for one or all the tables. |
get_tables()
¶
Returns the list of tables available on this source.
Returns:
| Type | Description |
|---|---|
|
The list of available tables on this source. |
validate(spec, context=None)
classmethod
¶
WebsiteSource
¶
Bases:
WebsiteSource queries whether a website responds with a 400 status code.
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
source_type = 'live'
class-attribute
¶
urls = param.List(doc='URLs of the websites to monitor.')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_schema(table=None, limit=None, shuffle=False)
¶
get_tables()
¶
cached(method, locks=None)
¶
Adds caching to a Source.get query.
Returns:
| Type | Description |
|---|---|
Returns method wrapped in caching functionality.
|
|
cached_metadata(method, locks=None)
¶
cached_schema(method, locks=None)
¶
bigquery
¶
BigQuerySource
¶
Bases:
BigQuerySource provides access to a Google BigQuery project.
datasets = param.List(default=None, doc='List of datasets to include')
class-attribute
instance-attribute
¶
dialect = 'bigquery'
class-attribute
instance-attribute
¶
filter_in_sql = param.Boolean(default=True, doc='Whether to apply filters in SQL or in-memory.')
class-attribute
instance-attribute
¶
location = param.String(doc='Location where the project resides.')
class-attribute
instance-attribute
¶
project_id = param.String(doc="The Google Cloud's project ID.")
class-attribute
instance-attribute
¶
tables = param.ClassSelector(class_=(list, dict), doc='\n A list of tables or a dictionary mapping from table name to a SQL query.')
class-attribute
instance-attribute
¶
close()
¶
Close the BigQuery client connections, releasing associated resources.
This method should be called when the source is no longer needed to prevent connection leaks and properly clean up resources.
create_sql_expr_source(tables, params=None, **kwargs)
¶
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
Arguments
tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax kwargs: any Additional keyword arguments.
Returns:
| Name | Type | Description |
|---|---|---|
source |
|
|
execute(sql_query, params=None, *args, **kwargs)
¶
Executes a SQL query and returns the result as a DataFrame.
Arguments
sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query
Returns:
| Type | Description |
|---|---|
|
The result as a pandas DataFrame |
execute_async(sql_query, params=None, *args, **kwargs)
async
¶
Executes a SQL query asynchronously and returns the result as a DataFrame.
This implementation runs queries asynchronously using BigQuery's job API.
Arguments
sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query
Returns:
| Type | Description |
|---|---|
|
The result as a pandas DataFrame |
get(table, **query)
¶
get_async(table, **query)
async
¶
Retrieve a table from BigQuery asynchronously with optional filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The table name to query |
required |
**query
|
|
Query parameters and filters to apply |
{}
|
Returns:
| Type | Description |
|---|---|
|
The filtered table data as a pandas DataFrame |
get_schema(table=None, limit=None, shuffle=False)
¶
Determine the schema of the given table.
This method overrides the inherited get_schema from the base class Source. The reason
why we override
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table. Must be in the form: f"{project_id}.{dataset_id}.{table_id}" or reference a table in the tables dictionary. |
None
|
limit
|
|
The maximum number of rows to sample from the table. |
None
|
shuffle
|
|
Whether to shuffle the rows of the table. |
False
|
Returns:
| Type | Description |
|---|---|
|
|
get_sql_expr(table)
¶
get_tables()
¶
Get a list of available tables for the project.
Returns:
| Type | Description |
|---|---|
|
Table names are composed of f"{project_id}.{dataset_id}.{table_id}". |
duckdb
¶
DuckDBSource
¶
Bases:
DuckDBSource provides a simple wrapper around the DuckDB SQL connector.
To specify tables to be queried provide a list or dictionary of
tables. A SQL expression to fetch the data from the table will
then be generated using the sql_expr, e.g. if we specify a local
table flights.db the default sql_expr SELECT * FROM {table} will
expand that to SELECT * FROM flights.db. If you want to specify
a full SQL expression as a table you must change the sql_expr to
'{table}' ensuring no further templating is applied.
Note that certain functionality in DuckDB requires modules to be
loaded before making a query. These can be specified using the
initializers parameter, providing the ability to define DuckDb
statements to be run when initializing the connection.
connection
property
¶
dialect = 'duckdb'
class-attribute
instance-attribute
¶
ephemeral = param.Boolean(default=False, doc='\n Whether the data is ephemeral, i.e. manually inserted into the\n DuckDB table or derived from real data.')
class-attribute
instance-attribute
¶
filter_in_sql = param.Boolean(default=True, doc='\n Whether to apply filters in SQL or in-memory.')
class-attribute
instance-attribute
¶
initializers = param.List(default=[], doc='\n SQL statements to run to initialize the connection.')
class-attribute
instance-attribute
¶
load_schema = param.Boolean(default=True, doc='Whether to load the schema')
class-attribute
instance-attribute
¶
mirrors = param.Dict(default={}, doc='\n Mirrors the tables into the DuckDB database. The mirrors\n should define a mapping from the table names to the source of\n the mirror which may be defined as a Pipeline or a tuple of\n the source and the table.')
class-attribute
instance-attribute
¶
source_type = 'duckdb'
class-attribute
instance-attribute
¶
sql_expr = param.String(default='SELECT * FROM {table}', doc='\n The SQL expression to execute.')
class-attribute
instance-attribute
¶
table_params = param.Dict(default={}, doc='\n Dictionary mapping table names to lists of SQL parameters.\n Parameters are used with placeholders (?) in SQL expressions.')
class-attribute
instance-attribute
¶
tables = param.ClassSelector(class_=(list, dict), doc='\n List or dictionary of tables.')
class-attribute
instance-attribute
¶
uri = param.String(doc='The URI of the DuckDB database')
class-attribute
instance-attribute
¶
close()
¶
Close the DuckDB connection, releasing associated resources.
This method should be called when the source is no longer needed to prevent connection leaks and properly clean up server-side resources.
create_sql_expr_source(tables, materialize=True, params=None, **kwargs)
¶
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
Arguments
tables: dict[str, str] Mapping from table name to SQL expression. materialize: bool Whether to materialize new tables params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for ? placeholders - dict: Named parameters for $param_name placeholders kwargs: any Additional keyword arguments.
Returns:
| Name | Type | Description |
|---|---|---|
source |
|
|
execute(sql_query, params=None, *args, **kwargs)
¶
from_df(tables, **kwargs)
classmethod
¶
Creates an ephemeral, in-memory DuckDBSource containing the supplied dataframe.
Arguments
tables: dict[str, pandas.DataFrame] A dictionary mapping from table names to DataFrames kwargs: any Additional keyword arguments for the source
Returns:
| Name | Type | Description |
|---|---|---|
source |
|
|
from_spec(spec)
classmethod
¶
get(table, **query)
¶
get_tables()
¶
normalize_table(table)
¶
to_spec(context=None)
¶
intake
¶
IntakeBaseSource
¶
Bases:
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
load_schema = param.Boolean(default=True, doc='\n Whether to load the schema')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_schema(table=None, limit=None, shuffle=False)
¶
get_tables()
¶
IntakeSource
¶
Bases:
An IntakeSource loads data from an Intake catalog.
Intake is a lightweight set of tools for loading and sharing data in data science projects using convenient catalog specifications.
The IntakeSource can be given a dictionary catalog specification
OR a URI pointing to a catalog.yaml file on disk.
cat = intake.open_catalog(self.uri)
instance-attribute
¶
catalog = param.Dict(doc='An inlined Catalog specification.')
class-attribute
instance-attribute
¶
dask = param.Boolean(default=False, doc='\n Whether to return a dask DataFrame.')
class-attribute
instance-attribute
¶
source_type = 'intake'
class-attribute
instance-attribute
¶
uri = param.String(doc='URI of the catalog file.')
class-attribute
instance-attribute
¶
intake_dremio
¶
IntakeBaseDremioSource
¶
Bases:
Base class with common parameters for Dremio sources.
cert = param.String(default='Path to certificate file')
class-attribute
instance-attribute
¶
dask = param.Boolean(default=False, doc='\n Whether to return a dask DataFrame.')
class-attribute
instance-attribute
¶
dialect = 'dremio'
class-attribute
instance-attribute
¶
password = param.String(default=None, doc='Dremio password or token')
class-attribute
instance-attribute
¶
tls = param.Boolean(default=False, doc='Enable encryption')
class-attribute
instance-attribute
¶
uri = param.String(doc='URI of the Dremio server.')
class-attribute
instance-attribute
¶
username = param.String(default=None, doc='Dremio username')
class-attribute
instance-attribute
¶
create_sql_expr_source(tables, **kwargs)
¶
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
normalize_table(table)
¶
IntakeDremioSQLSource
¶
Bases:
IntakeDremioSQLSource allows querying a subset of
Dremio catalog tables and views via custom SQL expressions.
When provided with the uri of the Dremio server and credentials
to authenticate with the Dremio instance, unlike IntakeDremioSource,
only the tables specified in the tables parameter can be used.
Requires the intake-dremio package to be installed.
cat = {table: (DremioSource(sql_expr=sql_expr, cert=(self.cert), uri=(self.uri), tls=(self.tls), username=(self.username), password=(self.password))) for table, sql_expr in (self.tables.items())}
instance-attribute
¶
source_type = 'intake_dremio_sql'
class-attribute
instance-attribute
¶
tables = param.Dict(default={}, doc='\n Mapping of table names to desired SQL expressions')
class-attribute
instance-attribute
¶
IntakeDremioSource
¶
Bases:
IntakeDremioSource allows querying Dremio catalog tables and views.
When provided with the uri of the Dremio server and credentials
to authenticate with the Dremio instance all available tables can
be queried via this Source.
Requires the intake-dremio package to be installed.
intake_sql
¶
IntakeBaseSQLSource
¶
Bases: ,
cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
filter_in_sql = param.Boolean(default=True, doc='')
class-attribute
instance-attribute
¶
get(table, **query)
¶
Applies SQL Transforms, creating new temp catalog on the fly and querying the database.
get_schema(table=None, limit=None, shuffle=False)
¶
get_sql_expr(table)
¶
IntakeSQLSource
¶
Bases: ,
IntakeSQLSource extends the IntakeSource with support for SQL data.
In addition to the standard intake support for reading catalogs
the IntakeSQLSource computes the schema by querying the database
instead of loading all the data into memory and allows for
SQLTransform to be applied when querying the SQL database.
source_type = 'intake_sql'
class-attribute
instance-attribute
¶
prometheus
¶
PrometheusSource
¶
Bases:
PrometheusSource allows querying Prometheus PromQL endpoints.
The PrometheusSource is configured to return timeseries about CPU,
memory and network usage as well as restarts for a list of
Kubernetes pods specified by ID.
ae5_source = param.Parameter(doc='\n An AE5Source instance to use for querying.')
class-attribute
instance-attribute
¶
cache_per_query = param.Boolean(default=False, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
ids = param.List(default=[], doc='\n List of pod IDs to query.')
class-attribute
instance-attribute
¶
metrics = param.List(default=['memory_usage', 'cpu_usage', 'restarts', 'network_receive_bytes', 'network_transmit_bytes'], doc='Names of metric queries to execute')
class-attribute
instance-attribute
¶
panel
property
¶
period = param.String(default='3h', doc="\n Period to query over specified as a string. Supports:\n\n - Week: '1w'\n - Day: '1d'\n - Hour: '1h'\n - Minute: '1m'\n - Second: '1s'\n ")
class-attribute
instance-attribute
¶
promql_api = param.String(doc='\n Name of the AE5 deployment exposing the Prometheus API')
class-attribute
instance-attribute
¶
samples = param.Integer(default=200, doc='\n Number of samples in the selected period to query. May be\n overridden by explicit step value.')
class-attribute
instance-attribute
¶
shared = param.Boolean(default=False, readonly=True, doc='\n PrometheusSource cannot be shared because it has per-user state.')
class-attribute
instance-attribute
¶
source_type = 'prometheus'
class-attribute
instance-attribute
¶
step = param.String(doc='\n Step value to use in PromQL query_range query.')
class-attribute
instance-attribute
¶
get(table, **query)
¶
get_schema(table=None, limit=None, shuffle=False)
¶
get_tables()
¶
snowflake
¶
SnowflakeSource
¶
Bases:
SnowflakeSource uses the snowflake-python-connector library to load data from Snowflake.
account = param.String(default=None, doc='\n The account identifier to connect to.')
class-attribute
instance-attribute
¶
authenticator = param.Selector(default=None, objects=['externalbrowser', 'oauth', 'snowflake', 'username_password_mfa', 'SNOWFLAKE_JWT'], doc='\n The authentication approach to use.', allow_None=True)
class-attribute
instance-attribute
¶
conn_kwargs = param.Dict(default={}, doc='\n Additional connection parameters to pass to the Snowflake connector.')
class-attribute
instance-attribute
¶
database = param.String(default=None, doc='\n The database to connect to.')
class-attribute
instance-attribute
¶
dialect = 'snowflake'
class-attribute
instance-attribute
¶
excluded_tables = param.List(default=[], doc='\n List of table names that should be excluded from the results.\n The items can be fully qualified (database.schema.table), partially\n qualified (schema.table), simply table names, or wildcards\n (e.g. database.schema.*).')
class-attribute
instance-attribute
¶
filter_in_sql = param.Boolean(default=True, doc='\n Whether to apply filters in SQL or in-memory.')
class-attribute
instance-attribute
¶
host = param.String(default=None, doc='\n The host to authenticate with.')
class-attribute
instance-attribute
¶
paramstyle = param.Selector(default='qmark', objects=['qmark', 'numeric', 'format', 'pyformat'], doc='\n The paramstyle to use for SQL queries.')
class-attribute
instance-attribute
¶
password = param.String(default=None, doc='\n The password to authenticate with (if authenticator is set to "snowflake").')
class-attribute
instance-attribute
¶
private_key = param.ClassSelector(default=None, class_=(str, bytes, Path), doc='\n The path or contents of the private key file.')
class-attribute
instance-attribute
¶
private_key_password = param.String(default=None, doc='\n The password to decrypt the private key file.')
class-attribute
instance-attribute
¶
schema = param.String(default=None, doc='\n The database schema to load data from.')
class-attribute
instance-attribute
¶
schema_timeout_seconds = param.Integer(default=600, doc='\n Timeout in seconds for schema retrieval. If None, no timeout is applied.')
class-attribute
instance-attribute
¶
sql_expr = param.String(default='SELECT * FROM {table}', doc='\n The SQL expression to execute.')
class-attribute
instance-attribute
¶
tables = param.ClassSelector(class_=(list, dict), doc='\n List or dictionary of tables.')
class-attribute
instance-attribute
¶
token = param.String(default=None, doc='\n The OAuth token if authenticator is set to "oauth".')
class-attribute
instance-attribute
¶
user = param.String(default=None, doc='\n The user to authenticate as.')
class-attribute
instance-attribute
¶
warehouse = param.String(default=None, doc='\n The warehouse to connect to.')
class-attribute
instance-attribute
¶
close()
¶
Close the Snowflake connection and cursor, releasing associated resources.
This method should be called when the source is no longer needed to prevent connection leaks and properly clean up server-side resources.
create_sql_expr_source(tables, params=None, **kwargs)
¶
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
Arguments
tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters (for pyformat/format paramstyle) kwargs: any Additional keyword arguments.
Returns:
| Name | Type | Description |
|---|---|---|
source |
|
|
execute(sql_query, params=None, *args, **kwargs)
¶
execute_async(sql_query, params=None, *args, **kwargs)
async
¶
Execute a Snowflake SQL query asynchronously and return the result as a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql_query
|
|
The SQL query to execute |
required |
params
|
|
Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax (qmark paramstyle) - dict: Named parameters (for pyformat/format paramstyle) - None: No parameters |
None
|
*args
|
|
Additional positional arguments to pass to the query |
()
|
**kwargs
|
|
Keyword arguments to pass to the query |
{}
|
Returns:
| Type | Description |
|---|---|
|
The query result as a pandas DataFrame with supported dtypes |
get(table, **query)
¶
get_async(table, **query)
async
¶
Retrieve data from a table asynchronously using the same query logic as get().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
**query
|
|
Query parameters and filters to apply |
{}
|
Returns:
| Type | Description |
|---|---|
|
The query result as a pandas DataFrame |
get_schema(table=None, limit=None, shuffle=False)
¶
Returns JSON schema describing the tables returned by the Source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to return the schema for. If None returns schema for all available tables. |
None
|
limit
|
|
Limits the number of rows considered for the schema calculation |
None
|
shuffle
|
|
Whether to shuffle the rows when sampling |
False
|
Returns:
| Type | Description |
|---|---|
|
JSON schema(s) for one or all the tables. |
get_tables()
¶
resolve_private_key()
¶
Converts a PEM encoded private key into a DER binary key.
Returns:
| Type | Description |
|---|---|
|
DER encoded key if private_key has been provided otherwise returns None. |
Raises:
| Type | Description |
|---|---|
|
If private key is not in PEM format. |
sqlalchemy
¶
DataFrame = pd.DataFrame
module-attribute
¶
SQLAlchemySource
¶
Bases:
SQLAlchemySource uses SQLAlchemy to connect to various SQL databases.
Supports both synchronous and asynchronous database drivers through SQLAlchemy's engine system. Can connect to PostgreSQL, MySQL, SQLite, Oracle, MSSQL, and more.
async_drivers = {'postgresql+asyncpg', 'mysql+asyncmy', 'mysql+aiomysql', 'sqlite+aiosqlite', 'oracle+oracledb_async'}
class-attribute
instance-attribute
¶
connect_args = param.Dict(default={}, doc="\n Additional keyword arguments passed to the DBAPI's connect() method.")
class-attribute
instance-attribute
¶
database = param.String(default=None, doc='\n The database name to connect to.')
class-attribute
instance-attribute
¶
dialect
property
¶
Detect and return the database dialect name.
drivername = param.String(default=None, doc="\n The driver name (e.g., 'postgresql+psycopg2', 'mysql+pymysql', 'sqlite').")
class-attribute
instance-attribute
¶
engine_kwargs = param.Dict(default={}, doc='\n Additional keyword arguments passed to create_engine().')
class-attribute
instance-attribute
¶
excluded_tables = param.List(default=[], doc="\n List of table name patterns to exclude from the results.\n Supports wildcards (e.g., 'schema.table*', 'temp_*').")
class-attribute
instance-attribute
¶
filter_in_sql = param.Boolean(default=True, doc='\n Whether to apply filters in SQL or in-memory.')
class-attribute
instance-attribute
¶
host = param.String(default=None, doc='\n The database host address.')
class-attribute
instance-attribute
¶
password = param.String(default=None, doc='\n The password for database authentication.')
class-attribute
instance-attribute
¶
port = param.Integer(default=None, doc='\n The database port number.')
class-attribute
instance-attribute
¶
query_params = param.Dict(default=None, doc='\n Additional query parameters for the connection URL.')
class-attribute
instance-attribute
¶
schema = param.String(default=None, doc='\n The default schema to use for queries.')
class-attribute
instance-attribute
¶
schema_timeout_seconds = param.Integer(default=600, doc='\n Timeout in seconds for schema retrieval operations.')
class-attribute
instance-attribute
¶
source_type = 'sqlalchemy'
class-attribute
instance-attribute
¶
sql_expr = param.String(default='SELECT * FROM {table}', doc='\n The SQL expression template to execute.')
class-attribute
instance-attribute
¶
tables = param.ClassSelector(class_=(list, dict), doc='\n List or dictionary of tables to expose.')
class-attribute
instance-attribute
¶
url = param.String(default=None, doc="\n SQLAlchemy database URL string (e.g., 'postgresql://user:pass@host:port/db').")
class-attribute
instance-attribute
¶
username = param.String(default=None, doc='\n The username for database authentication.')
class-attribute
instance-attribute
¶
close()
¶
Close the database connection and dispose of the engine.
This method should be called when the source is no longer needed to prevent connection leaks and properly clean up server-side resources.
create_sql_expr_source(tables, params=None, **kwargs)
¶
Creates a new SQL Source given a set of table names and corresponding SQL expressions.
Arguments
tables: dict[str, str] Mapping from table name to SQL expression. params: dict[str, list | dict] | None Optional mapping from table name to parameters: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax kwargs: any Additional keyword arguments.
Returns:
| Name | Type | Description |
|---|---|---|
source |
|
|
execute(sql_query, params=None, *args, **kwargs)
¶
Executes a SQL query and returns the result as a DataFrame.
Arguments
sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query
Returns:
| Type | Description |
|---|---|
|
The result as a pandas DataFrame |
execute_async(sql_query, params=None, *args, **kwargs)
async
¶
Executes a SQL query asynchronously and returns the result as a DataFrame.
This default implementation runs the synchronous execute() method in a thread to avoid blocking the event loop. Subclasses can override this method to provide truly asynchronous implementations.
Arguments
sql_query : str The SQL Query to execute params : list | dict | None Parameters to use in the SQL query: - list: Positional parameters for placeholder (?) syntax - dict: Named parameters for :name, %(name)s, etc. syntax - None: No parameters args : list Additional positional arguments to pass to the SQL query *kwargs : dict Keyword arguments to pass to the SQL query
Returns:
| Type | Description |
|---|---|
|
The result as a pandas DataFrame |
get(table, **query)
¶
Retrieve data from a table with optional filtering.
get_async(table, **query)
async
¶
Return a table asynchronously; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the queried table. |
get_schema(table=None, limit=None, shuffle=False)
¶
Returns JSON schema describing the tables returned by the Source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to return the schema for. If None returns schema for all available tables. |
None
|
limit
|
|
Limits the number of rows considered for the schema calculation |
None
|
shuffle
|
|
Whether to shuffle rows when sampling |
False
|
Returns:
| Type | Description |
|---|---|
|
JSON schema(s) for one or all the tables. |
get_tables()
¶
Return the list of available tables.