Skip to content

Pipeline

lumen.pipeline

VARIABLE_RE = re.compile('\\$variables\\.([a-zA-Z_]\\w*)') module-attribute

state = _session_state() module-attribute

Component

Bases: Parameterized

Baseclass for all Lumen component types including Source, Filter, Transform, Variable and View types. Components must implement serialization and deserialization into a specification dictionary via the from_spec and to_spec protocol. Additonally they should implement validation.

refs property

from_spec(spec) classmethod

Creates a Component instance from a specification.

Parameters:

Name Type Description Default
spec dict or str

Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary.

required

Returns:

Type Description
Resolved and instantiated Component object

to_spec(context=None)

Exports the full specification to reconstruct this component.

Parameters:

Name Type Description Default
context Dict[str, Any]

Context contains the specification of all previously serialized components, e.g. to allow resolving of references.

None

Returns:

Type Description
dict

Declarative specification of this component.

validate(spec, context=None) classmethod

Validates the component specification given the validation context.

Arguments

spec: dict | str The specification for the component being validated (or a referene to the component) context: dict Validation context contains the specification of all previously validated components, e.g. to allow resolving of references.

Returns:

Type Description
Validated specification.

DataFrame

Bases: DataFrame

DataFrame parameter that resolves data on access.

Filter

Bases: MultiTypeComponent

Filter components supply the filter values used by Source components to query data. .

field = param.String(doc='The field being filtered.') class-attribute instance-attribute

filter_type = None class-attribute

label = param.String(doc='A label for the Filter.') class-attribute instance-attribute

panel property

Returns:

Type Description
Viewable or None

A Panel Viewable object representing the filter.

query property

Returns:

Type Description
object

The current filter query which will be used by the Source to filter the data.

schema = param.Dict(doc='\n The JSON schema provided by the Source declaring information\n about the data to be filtered.') class-attribute instance-attribute

shared = param.Boolean(default=False, doc='\n Whether the filter is shared across all layouts.') class-attribute instance-attribute

sync_with_url = param.Boolean(default=True, doc='\n Whether to sync the filter state with the URL parameters.') class-attribute instance-attribute

table = param.String(default=None, doc='\n The table being filtered. If None applies to all tables.') class-attribute instance-attribute

value = param.Parameter(doc='The current filter value.') class-attribute instance-attribute

from_spec(spec, source_schema, source_filters=None) classmethod

Resolves a Filter specification given the schema of the Source (and optionally the table) it will be filtering on.

Parameters:

Name Type Description Default
spec dict[str, Any] | str

Specification declared as a dictionary of parameter values.

required
source_schema dict[str, dict[str, Any]]

A dictionary containing the JSON schema of the Source to be filtered on.

required
source_filters dict[str, Filter] | None

A dictionary of filters associated with the Source

None

Returns:

Type Description
The resolved Filter object.

to_spec(context=None)

validate(spec, context=None) classmethod

FilterTransform

Bases: Transform

Filter transform implement the filtering behavior of Filter components.

The filter conditions must be declared as a list of tuple containing the name of the column to be filtered and one of the following:

  • scalar: A scalar value will be matched using equality operators
  • tuple: A tuple value specifies a numeric or date range.
  • list: A list value specifies a set of categories to match against.
  • list(tuple): A list of tuples specifies a list of ranges.

conditions = param.List(doc='\n List of filter conditions expressed as tuples of the column\n name and the filter value.') class-attribute instance-attribute

apply(df)

ParamFilter

Bases: Filter

ParamFilter reflects the value of a parameter declared on a View.

The ParamFilter can be used to implement cross-filtering between different views.

filter_type = 'param' class-attribute

parameter = param.ClassSelector(default=None, class_=(param.Parameter, str), doc='\n Reference to a Parameter on an existing View.') class-attribute instance-attribute

Pipeline

Bases: Viewer, Component

Pipeline encapsulates filters and transformations applied to a :class:lumen.sources.base.Source table.

A Pipeline ingests data from a :class:lumen.sources.base.Source table or another Pipeline applying the declared :class:lumen.filters.base.Filter, :class:lumen.transforms.base.Transform and :class:lumen.transforms.sql.SQLTransform definitions. It can be used to drive one or more visual outputs or leveraged as a standalone component to encapsulate multiple data processing steps.

auto_update = param.Boolean(default=True, constant=True, doc='\n Whether changes in filters, transforms and references automatically\n trigger updates in the data or whether an update has to be triggered\n manually using the update event or the update button in the UI.') class-attribute instance-attribute

control_panel property

data = DataFrame(doc='The current data on this source.') class-attribute instance-attribute

filters = param.List(item_type=Filter, doc='\n A list of Filters to apply to the source data.') class-attribute instance-attribute

pipeline = param.ClassSelector(class_=None, doc='\n Optionally a pipeline may be chained to another pipeline.') class-attribute instance-attribute

refs property

schema = param.Dict(doc='The schema of the input data.') class-attribute instance-attribute

source = param.ClassSelector(class_=Source, doc='\n The Source this pipeline is fed by.') class-attribute instance-attribute

sql_transforms = param.List(item_type=SQLTransform, doc='\n A list of SQLTransforms to apply to the source data.') class-attribute instance-attribute

table = param.String(doc='\n The name of the table driving this pipeline.') class-attribute instance-attribute

transforms = param.List(item_type=Transform, doc='\n A list of Transforms to apply to the source data.') class-attribute instance-attribute

update = param.Event(label='Apply update', doc='\n Update event trigger (if manual update is set).') class-attribute instance-attribute

add_filter(filt, field=None, **kwargs)

Add a filter to the pipeline.

Arguments

filt: Filter | Type[Filter] The filter instance or filter type to add. field: str | None The field to filter on (required to instantiate Filter type).

add_transform(transform, **kwargs)

Add a (SQL)Transform to the pipeline.

Arguments

filt: Transform The Transform instance to add.

chain(filters=None, transforms=None, sql_transforms=None, **kwargs)

Chains additional filtering, transform and sql_transform operations on an existing pipeline. Note that if one or more sql_transforms are provided the existing table will be mirrored into a DuckDB database.

Arguments

filters: List[Filter] | None Additional filters to apply on top of existing pipeline. transforms: List[Transform] | None Additional transforms to apply on top of existing pipeline. sql_transforms: List[SQLTransform] | None Additional filters to apply on top of existing pipeline.

Returns:

Type Description
Pipeline

clone(**params)

Create a new instance of the pipeline with optionally overridden parameter values.

from_spec(spec, source=None, source_filters=None) classmethod

Creates a Pipeline from a specification.

Parameters:

Name Type Description Default
spec dict or str

Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary.

required

Returns:

Type Description
Resolved and instantiated Pipeline object

get_schema()

Generates a JSON schema for the current data held by the Pipeline.

Returns:

Name Type Description
schema dict[str, any]

JSON schema for each column in the current data.

precache(queries)

Populates the cache of the :class:lumen.sources.base.Source with the provided queries.

Queries can be provided in two formats:

  • A dictionary containing 'filters' and 'variables' dictionaries each containing lists of values to compute a cross-product for, e.g.

    { 'filters': { ': ['a', 'b', 'c', ...], ... }, 'variables': { : [0, 2, 4, ...], ... } } - A list containing dictionaries of explicit values for each filter and variables.

    [{ 'filters': {: 'a'}, 'variables': {: 0} }, { 'filters': {: 'a'}, 'variables': {: 1} }, ... ]

to_spec(context=None)

Exports the full specification to reconstruct this component.

Parameters:

Name Type Description Default
context dict[str, Any] | None

Context contains the specification of all previously serialized components, e.g. to allow resolving of references.

None

Returns:

Type Description
Declarative specification of this component.

traverse(type)

Returns all Filter or Transform objects in a potentially chained pipeline.

validate(spec, context=None) classmethod

SQLTransform

Bases: Transform

Base class for SQL transforms using sqlglot.

comments = param.Boolean(default=False, doc='Whether to include comments in the output SQL') class-attribute instance-attribute

error_level = param.ClassSelector(class_=(sqlglot.ErrorLevel), default=(sqlglot.ErrorLevel.RAISE), doc='Error level for parsing') class-attribute instance-attribute

identify = param.Boolean(default=False, doc='\n Delimit all identifiers, e.g. turn `FROM database.table` into `FROM "database"."table"`.\n This is useful for dialects that don\'t support unquoted identifiers.') class-attribute instance-attribute

optimize = param.Boolean(default=False, doc="\n Whether to optimize the generated SQL query; may produce invalid results, especially with\n duckdb's read_* functions.") class-attribute instance-attribute

pretty = param.Boolean(default=False, doc='Prettify output SQL, i.e. add newlines and indentation') class-attribute instance-attribute

read = param.String(default=None, doc='Source dialect for parsing; if None, automatically detects') class-attribute instance-attribute

unsupported_level = param.ClassSelector(class_=(sqlglot.ErrorLevel), default=(sqlglot.ErrorLevel.WARN), doc='When using `to_sql`, how to handle unsupported dialect features.') class-attribute instance-attribute

write = param.String(default=None, doc='Target dialect for output; if None, defaults to read dialect') class-attribute instance-attribute

apply(sql_in)

Given an SQL statement, manipulate it, and return a new SQL statement.

Parameters:

Name Type Description Default
sql_in str

The initial SQL query to be manipulated.

required

Returns:

Type Description
string

New SQL query derived from the above query.

apply_to(sql_in, **kwargs) classmethod

Calls the apply method based on keyword arguments passed to define transform.

Parameters:

Name Type Description Default
sql_in str
required

Returns:

Type Description
SQL statement after application of transformation.

parse_sql(sql_in)

Parse SQL string into sqlglot AST.

Parameters:

Name Type Description Default
sql_in str

SQL string to parse

required

Returns:

Type Description
Expression

Parsed SQL expression

to_sql(expression)

Convert sqlglot expression back to SQL string.

Parameters:

Name Type Description Default
expression Expression

Expression to convert to SQL

required

Returns:

Type Description
string

SQL string representation

Source

Bases: MultiTypeComponent

Source components provide allow querying all kinds of data.

A Source can return one or more tables queried using the .get_tables method, a description of the data returned by each table in the form of a JSON schema accessible via the .get_schema method and lastly a .get method that allows filtering the data.

The Source base class also implements both in-memory and disk caching which can be enabled if a cache_dir is provided. Data cached to disk is stored as parquet files.

cache_data = param.Boolean(default=True, doc='\n Whether to cache actual data.') class-attribute instance-attribute

cache_dir = param.String(default=None, doc='\n Whether to enable local cache and write file to disk.') class-attribute instance-attribute

cache_metadata = param.Boolean(default=True, doc='\n Whether to cache metadata.') class-attribute instance-attribute

cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.') class-attribute instance-attribute

cache_schema = param.Boolean(default=True, doc='\n Whether to cache table schemas.') class-attribute instance-attribute

cache_with_dask = param.Boolean(default=True, doc='\n Whether to read and write cache files with dask if available.') class-attribute instance-attribute

metadata = param.Dict(default={}, doc='\n Optional metadata about the source tables. Should follow the format:\n {"table_name": {"description": ..., "columns": {"column_name": "..."}}}') class-attribute instance-attribute

metadata_func = param.Callable(default=None, doc='\n Function to implement custom metadata lookup for tables.\n Given a list of tables it should return a dictionary of the form:\n\n {\n <table>: {"description": ..., "columns": {"column_name": "..."}}\n }\n\n May be used to override the default _get_table_metadata\n implementation of the Source.') class-attribute instance-attribute

panel property

A Source can return a Panel object which displays information about the Source or controls how the Source queries data.

root = param.ClassSelector(class_=Path, precedence=(-1), doc='\n Root folder of the cache_dir, default is config.root') class-attribute instance-attribute

shared = param.Boolean(default=False, doc='\n Whether the Source can be shared across all instances of the\n dashboard. If set to `True` the Source will be loaded on\n initial server load.') class-attribute instance-attribute

source_type = None class-attribute

clear_cache(*events)

Clears any cached data.

from_spec(spec) classmethod

Creates a Source object from a specification. If a Source specification references other sources these may be supplied in the sources dictionary and be referenced by name.

Parameters:

Name Type Description Default
spec dict or str

Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary.

required

Returns:

Type Description
Resolved and instantiated Source object

get(table, **query)

Return a table; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_async(table, **query) async

Return a table asynchronously; optionally filtered by the given query.

Parameters:

Name Type Description Default
table str

The name of the table to query

required
query dict

A dictionary containing all the query parameters

{}

Returns:

Type Description
DataFrame

A DataFrame containing the queried table.

get_metadata(table)

Returns metadata for one, multiple or all tables provided by the source.

The metadata for a table is structured as:

{ "description": ..., "columns": { : { "description": ..., "data_type": ..., } }, **other_metadata }

If a list of tables or no table is provided the metadata is nested one additional level:

{ "table_name": { { "description": ..., "columns": { : { "description": ..., "data_type": ..., } }, **other_metadata } } }

Parameters:

Name Type Description Default
table str | list[str] | None

The name of the table to return the schema for. If None returns schema for all available tables.

required

Returns:

Name Type Description
metadata dict

Dictionary of metadata indexed by table (if no table was was provided or individual table metdata.

get_schema(table=None, limit=None, shuffle=False)

Returns JSON schema describing the tables returned by the Source.

Parameters:

Name Type Description Default
table str | None

The name of the table to return the schema for. If None returns schema for all available tables.

None
limit int | None

Limits the number of rows considered for the schema calculation

None

Returns:

Type Description
dict

JSON schema(s) for one or all the tables.

get_tables()

Returns the list of tables available on this source.

Returns:

Type Description
list

The list of available tables on this source.

validate(spec, context=None) classmethod

Transform

Bases: MultiTypeComponent

Transform components implement transforms of DataFrame objects.

control_panel property

controls = param.List(default=[], doc='\n Parameters that should be exposed as widgets in the UI.') class-attribute instance-attribute

transform_type = None class-attribute

apply(table)

Given a table transform it in some way and return it.

Parameters:

Name Type Description Default
table DataFrame

The queried table as a DataFrame.

required

Returns:

Type Description
DataFrame

A DataFrame containing the transformed data.

apply_to(table, **kwargs) classmethod

Calls the apply method based on keyword arguments passed to define transform.

Parameters:

Name Type Description Default
table DataFrame
required

Returns:

Type Description
A DataFrame with the results of the transformation.

from_spec(spec) classmethod

Resolves a Transform specification.

Parameters:

Name Type Description Default
spec dict[str, Any] | str

Specification declared as a dictionary of parameter values.

required

Returns:

Type Description
The resolved Transform object.

ValidationError

Bases: ValueError

A ValidationError is raised when the specification of a component has missing required keys, an incorrect value or is otherwise malformed.

WidgetFilter

Bases: BaseWidgetFilter

WidgetFilter generates a Widget from the table schema provided by a Source.

By default the widget type will be inferred from the data and depending on whether multi value selection is enabled.

empty_select = param.Boolean(default=True, doc='\n Add an option to Select widgets to indicate no filtering.') class-attribute instance-attribute

filter_type = 'widget' class-attribute

max_options = param.Integer(default=500, doc='\n Maximum number of options to render.') class-attribute instance-attribute

multi = param.Boolean(default=True, doc='\n Whether to use a single-value or multi-value selection widget,\n e.g. for a numeric value this could be a regular slider or a\n range slider.') class-attribute instance-attribute

query property

widget = JSONSchema(schema=(self.schema), sizing_mode='stretch_width', multi=(self.multi), widgets=({self.field: wtype} if wtype else {}))._widgets[self.field] class-attribute instance-attribute

to_spec(context=None)

auto_filters(schema)

Automatically generates filter specifications from a schema.

Arguments

schema: A schema describing the types of various fields.

Returns:

Name Type Description
filter_specs A list of filter specifications.

catch_and_notify(message=None)

Catch exception and notify user

A decorator which catches all the exception of a function. When an error occurs a panel notification will be send to the dashboard with the message and logged the error and which method it arrived from.

Parameters:

Name Type Description Default
message str | None

The notification message, by default None. None will give this "Error: {e}" where e is the exception message.

None

expand_queries(values, groups=('filters', 'variables'))

get_dataframe_schema(df, columns=None)

Returns a JSON schema optionally filtered by a subset of the columns.

Parameters:

Name Type Description Default
df DataFrame or DataFrame

The DataFrame to describe with the schema

required
columns

List of columns to include in schema

None

Returns:

Type Description
dict

The JSON schema describing the DataFrame

is_ref(value)

Whether the value is a reference.

match_suggestion_message(word, possibilities, msg='', n=3)