Pipeline¶
lumen.pipeline
¶
VARIABLE_RE = re.compile('\\$variables\\.([a-zA-Z_]\\w*)')
module-attribute
¶
state = _session_state()
module-attribute
¶
Component
¶
Bases:
Baseclass for all Lumen component types including Source, Filter,
Transform, Variable and View types. Components must implement
serialization and deserialization into a specification dictionary
via the from_spec and to_spec protocol. Additonally they
should implement validation.
refs
property
¶
from_spec(spec)
classmethod
¶
Creates a Component instance from a specification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
|
Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary. |
required |
Returns:
| Type | Description |
|---|---|
Resolved and instantiated Component object
|
|
to_spec(context=None)
¶
Exports the full specification to reconstruct this component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
|
Context contains the specification of all previously serialized components, e.g. to allow resolving of references. |
None
|
Returns:
| Type | Description |
|---|---|
|
Declarative specification of this component. |
validate(spec, context=None)
classmethod
¶
Validates the component specification given the validation context.
Arguments
spec: dict | str The specification for the component being validated (or a referene to the component) context: dict Validation context contains the specification of all previously validated components, e.g. to allow resolving of references.
Returns:
| Type | Description |
|---|---|
Validated specification.
|
|
DataFrame
¶
Bases:
DataFrame parameter that resolves data on access.
Filter
¶
Bases:
Filter components supply the filter values used by Source components to query data. .
field = param.String(doc='The field being filtered.')
class-attribute
instance-attribute
¶
filter_type = None
class-attribute
¶
label = param.String(doc='A label for the Filter.')
class-attribute
instance-attribute
¶
panel
property
¶
Returns:
| Type | Description |
|---|---|
|
A Panel Viewable object representing the filter. |
query
property
¶
Returns:
| Type | Description |
|---|---|
|
The current filter query which will be used by the Source to filter the data. |
schema = param.Dict(doc='\n The JSON schema provided by the Source declaring information\n about the data to be filtered.')
class-attribute
instance-attribute
¶
shared = param.Boolean(default=False, doc='\n Whether the filter is shared across all layouts.')
class-attribute
instance-attribute
¶
sync_with_url = param.Boolean(default=True, doc='\n Whether to sync the filter state with the URL parameters.')
class-attribute
instance-attribute
¶
table = param.String(default=None, doc='\n The table being filtered. If None applies to all tables.')
class-attribute
instance-attribute
¶
value = param.Parameter(doc='The current filter value.')
class-attribute
instance-attribute
¶
from_spec(spec, source_schema, source_filters=None)
classmethod
¶
Resolves a Filter specification given the schema of the Source (and optionally the table) it will be filtering on.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
|
Specification declared as a dictionary of parameter values. |
required |
source_schema
|
|
A dictionary containing the JSON schema of the Source to be filtered on. |
required |
source_filters
|
|
A dictionary of filters associated with the Source |
None
|
Returns:
| Type | Description |
|---|---|
The resolved Filter object.
|
|
to_spec(context=None)
¶
validate(spec, context=None)
classmethod
¶
FilterTransform
¶
Bases:
Filter transform implement the filtering behavior of Filter components.
The filter conditions must be declared as a list of tuple containing
the name of the column to be filtered and one of the following:
- scalar: A scalar value will be matched using equality operators
- tuple: A tuple value specifies a numeric or date range.
- list: A list value specifies a set of categories to match against.
- list(tuple): A list of tuples specifies a list of ranges.
ParamFilter
¶
Bases:
ParamFilter reflects the value of a parameter declared on a View.
The ParamFilter can be used to implement cross-filtering between
different views.
Pipeline
¶
Bases: ,
Pipeline encapsulates filters and transformations applied to a
:class:lumen.sources.base.Source table.
A Pipeline ingests data from a
:class:lumen.sources.base.Source table or another Pipeline
applying the declared :class:lumen.filters.base.Filter,
:class:lumen.transforms.base.Transform and
:class:lumen.transforms.sql.SQLTransform definitions. It can be
used to drive one or more visual outputs or leveraged as a
standalone component to encapsulate multiple data processing
steps.
auto_update = param.Boolean(default=True, constant=True, doc='\n Whether changes in filters, transforms and references automatically\n trigger updates in the data or whether an update has to be triggered\n manually using the update event or the update button in the UI.')
class-attribute
instance-attribute
¶
control_panel
property
¶
data = DataFrame(doc='The current data on this source.')
class-attribute
instance-attribute
¶
filters = param.List(item_type=Filter, doc='\n A list of Filters to apply to the source data.')
class-attribute
instance-attribute
¶
pipeline = param.ClassSelector(class_=None, doc='\n Optionally a pipeline may be chained to another pipeline.')
class-attribute
instance-attribute
¶
refs
property
¶
schema = param.Dict(doc='The schema of the input data.')
class-attribute
instance-attribute
¶
source = param.ClassSelector(class_=Source, doc='\n The Source this pipeline is fed by.')
class-attribute
instance-attribute
¶
sql_transforms = param.List(item_type=SQLTransform, doc='\n A list of SQLTransforms to apply to the source data.')
class-attribute
instance-attribute
¶
table = param.String(doc='\n The name of the table driving this pipeline.')
class-attribute
instance-attribute
¶
transforms = param.List(item_type=Transform, doc='\n A list of Transforms to apply to the source data.')
class-attribute
instance-attribute
¶
update = param.Event(label='Apply update', doc='\n Update event trigger (if manual update is set).')
class-attribute
instance-attribute
¶
add_filter(filt, field=None, **kwargs)
¶
Add a filter to the pipeline.
Arguments
filt: Filter | Type[Filter] The filter instance or filter type to add. field: str | None The field to filter on (required to instantiate Filter type).
add_transform(transform, **kwargs)
¶
Add a (SQL)Transform to the pipeline.
Arguments
filt: Transform The Transform instance to add.
chain(filters=None, transforms=None, sql_transforms=None, **kwargs)
¶
Chains additional filtering, transform and sql_transform operations on an existing pipeline. Note that if one or more sql_transforms are provided the existing table will be mirrored into a DuckDB database.
Arguments
filters: List[Filter] | None Additional filters to apply on top of existing pipeline. transforms: List[Transform] | None Additional transforms to apply on top of existing pipeline. sql_transforms: List[SQLTransform] | None Additional filters to apply on top of existing pipeline.
Returns:
| Type | Description |
|---|---|
|
|
clone(**params)
¶
Create a new instance of the pipeline with optionally overridden parameter values.
from_spec(spec, source=None, source_filters=None)
classmethod
¶
Creates a Pipeline from a specification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
|
Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary. |
required |
Returns:
| Type | Description |
|---|---|
Resolved and instantiated Pipeline object
|
|
get_schema()
¶
Generates a JSON schema for the current data held by the Pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
schema |
|
JSON schema for each column in the current data. |
precache(queries)
¶
Populates the cache of the :class:lumen.sources.base.Source with the provided queries.
Queries can be provided in two formats:
-
A dictionary containing 'filters' and 'variables' dictionaries each containing lists of values to compute a cross-product for, e.g.
{ 'filters': {
': ['a', 'b', 'c', ...], ... }, 'variables': { : [0, 2, 4, ...], ... } } - A list containing dictionaries of explicit values for each filter and variables. [{ 'filters': {
: 'a'}, 'variables': { : 0} }, { 'filters': { : 'a'}, 'variables': { : 1} }, ... ]
to_spec(context=None)
¶
Exports the full specification to reconstruct this component.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
|
Context contains the specification of all previously serialized components, e.g. to allow resolving of references. |
None
|
Returns:
| Type | Description |
|---|---|
Declarative specification of this component.
|
|
traverse(type)
¶
Returns all Filter or Transform objects in a potentially chained pipeline.
validate(spec, context=None)
classmethod
¶
SQLTransform
¶
Bases:
Base class for SQL transforms using sqlglot.
comments = param.Boolean(default=False, doc='Whether to include comments in the output SQL')
class-attribute
instance-attribute
¶
error_level = param.ClassSelector(class_=(sqlglot.ErrorLevel), default=(sqlglot.ErrorLevel.RAISE), doc='Error level for parsing')
class-attribute
instance-attribute
¶
identify = param.Boolean(default=False, doc='\n Delimit all identifiers, e.g. turn `FROM database.table` into `FROM "database"."table"`.\n This is useful for dialects that don\'t support unquoted identifiers.')
class-attribute
instance-attribute
¶
optimize = param.Boolean(default=False, doc="\n Whether to optimize the generated SQL query; may produce invalid results, especially with\n duckdb's read_* functions.")
class-attribute
instance-attribute
¶
pretty = param.Boolean(default=False, doc='Prettify output SQL, i.e. add newlines and indentation')
class-attribute
instance-attribute
¶
read = param.String(default=None, doc='Source dialect for parsing; if None, automatically detects')
class-attribute
instance-attribute
¶
unsupported_level = param.ClassSelector(class_=(sqlglot.ErrorLevel), default=(sqlglot.ErrorLevel.WARN), doc='When using `to_sql`, how to handle unsupported dialect features.')
class-attribute
instance-attribute
¶
write = param.String(default=None, doc='Target dialect for output; if None, defaults to read dialect')
class-attribute
instance-attribute
¶
apply(sql_in)
¶
Given an SQL statement, manipulate it, and return a new SQL statement.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql_in
|
|
The initial SQL query to be manipulated. |
required |
Returns:
| Type | Description |
|---|---|
|
New SQL query derived from the above query. |
apply_to(sql_in, **kwargs)
classmethod
¶
Calls the apply method based on keyword arguments passed to define transform.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql_in
|
|
|
required |
Returns:
| Type | Description |
|---|---|
SQL statement after application of transformation.
|
|
parse_sql(sql_in)
¶
Parse SQL string into sqlglot AST.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql_in
|
|
SQL string to parse |
required |
Returns:
| Type | Description |
|---|---|
|
Parsed SQL expression |
to_sql(expression)
¶
Convert sqlglot expression back to SQL string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
expression
|
|
Expression to convert to SQL |
required |
Returns:
| Type | Description |
|---|---|
|
SQL string representation |
Source
¶
Bases:
Source components provide allow querying all kinds of data.
A Source can return one or more tables queried using the
.get_tables method, a description of the data returned by each
table in the form of a JSON schema accessible via the .get_schema
method and lastly a .get method that allows filtering the data.
The Source base class also implements both in-memory and disk
caching which can be enabled if a cache_dir is provided. Data
cached to disk is stored as parquet files.
cache_data = param.Boolean(default=True, doc='\n Whether to cache actual data.')
class-attribute
instance-attribute
¶
cache_dir = param.String(default=None, doc='\n Whether to enable local cache and write file to disk.')
class-attribute
instance-attribute
¶
cache_metadata = param.Boolean(default=True, doc='\n Whether to cache metadata.')
class-attribute
instance-attribute
¶
cache_per_query = param.Boolean(default=True, doc='\n Whether to query the whole dataset or individual queries.')
class-attribute
instance-attribute
¶
cache_schema = param.Boolean(default=True, doc='\n Whether to cache table schemas.')
class-attribute
instance-attribute
¶
cache_with_dask = param.Boolean(default=True, doc='\n Whether to read and write cache files with dask if available.')
class-attribute
instance-attribute
¶
metadata = param.Dict(default={}, doc='\n Optional metadata about the source tables. Should follow the format:\n {"table_name": {"description": ..., "columns": {"column_name": "..."}}}')
class-attribute
instance-attribute
¶
metadata_func = param.Callable(default=None, doc='\n Function to implement custom metadata lookup for tables.\n Given a list of tables it should return a dictionary of the form:\n\n {\n <table>: {"description": ..., "columns": {"column_name": "..."}}\n }\n\n May be used to override the default _get_table_metadata\n implementation of the Source.')
class-attribute
instance-attribute
¶
panel
property
¶
A Source can return a Panel object which displays information about the Source or controls how the Source queries data.
root = param.ClassSelector(class_=Path, precedence=(-1), doc='\n Root folder of the cache_dir, default is config.root')
class-attribute
instance-attribute
¶
shared = param.Boolean(default=False, doc='\n Whether the Source can be shared across all instances of the\n dashboard. If set to `True` the Source will be loaded on\n initial server load.')
class-attribute
instance-attribute
¶
source_type = None
class-attribute
¶
clear_cache(*events)
¶
Clears any cached data.
from_spec(spec)
classmethod
¶
Creates a Source object from a specification. If a Source specification references other sources these may be supplied in the sources dictionary and be referenced by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
|
Specification declared as a dictionary of parameter values or a string referencing a source in the sources dictionary. |
required |
Returns:
| Type | Description |
|---|---|
Resolved and instantiated Source object
|
|
get(table, **query)
¶
Return a table; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the queried table. |
get_async(table, **query)
async
¶
Return a table asynchronously; optionally filtered by the given query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to query |
required |
query
|
|
A dictionary containing all the query parameters |
{}
|
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the queried table. |
get_metadata(table)
¶
Returns metadata for one, multiple or all tables provided by the source.
The metadata for a table is structured as:
{
"description": ...,
"columns": {
If a list of tables or no table is provided the metadata is nested one additional level:
{
"table_name": {
{
"description": ...,
"columns": {
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to return the schema for. If None returns schema for all available tables. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
metadata |
|
Dictionary of metadata indexed by table (if no table was was provided or individual table metdata. |
get_schema(table=None, limit=None, shuffle=False)
¶
Returns JSON schema describing the tables returned by the Source.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The name of the table to return the schema for. If None returns schema for all available tables. |
None
|
limit
|
|
Limits the number of rows considered for the schema calculation |
None
|
Returns:
| Type | Description |
|---|---|
|
JSON schema(s) for one or all the tables. |
get_tables()
¶
Returns the list of tables available on this source.
Returns:
| Type | Description |
|---|---|
|
The list of available tables on this source. |
validate(spec, context=None)
classmethod
¶
Transform
¶
Bases:
Transform components implement transforms of DataFrame objects.
control_panel
property
¶
controls = param.List(default=[], doc='\n Parameters that should be exposed as widgets in the UI.')
class-attribute
instance-attribute
¶
transform_type = None
class-attribute
¶
apply(table)
¶
Given a table transform it in some way and return it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
The queried table as a DataFrame. |
required |
Returns:
| Type | Description |
|---|---|
|
A DataFrame containing the transformed data. |
apply_to(table, **kwargs)
classmethod
¶
Calls the apply method based on keyword arguments passed to define transform.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
|
|
required |
Returns:
| Type | Description |
|---|---|
A DataFrame with the results of the transformation.
|
|
from_spec(spec)
classmethod
¶
Resolves a Transform specification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec
|
|
Specification declared as a dictionary of parameter values. |
required |
Returns:
| Type | Description |
|---|---|
The resolved Transform object.
|
|
ValidationError
¶
Bases:
A ValidationError is raised when the specification of a component has missing required keys, an incorrect value or is otherwise malformed.
WidgetFilter
¶
Bases:
WidgetFilter generates a Widget from the table schema provided by a Source.
By default the widget type will be inferred from the data and
depending on whether multi value selection is enabled.
empty_select = param.Boolean(default=True, doc='\n Add an option to Select widgets to indicate no filtering.')
class-attribute
instance-attribute
¶
filter_type = 'widget'
class-attribute
¶
max_options = param.Integer(default=500, doc='\n Maximum number of options to render.')
class-attribute
instance-attribute
¶
multi = param.Boolean(default=True, doc='\n Whether to use a single-value or multi-value selection widget,\n e.g. for a numeric value this could be a regular slider or a\n range slider.')
class-attribute
instance-attribute
¶
query
property
¶
widget = JSONSchema(schema=(self.schema), sizing_mode='stretch_width', multi=(self.multi), widgets=({self.field: wtype} if wtype else {}))._widgets[self.field]
class-attribute
instance-attribute
¶
to_spec(context=None)
¶
auto_filters(schema)
¶
Automatically generates filter specifications from a schema.
Arguments
schema: A schema describing the types of various fields.
Returns:
| Name | Type | Description |
|---|---|---|
filter_specs |
A list of filter specifications.
|
|
catch_and_notify(message=None)
¶
Catch exception and notify user
A decorator which catches all the exception of a function. When an error occurs a panel notification will be send to the dashboard with the message and logged the error and which method it arrived from.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
|
The notification message, by default None. None will give this "Error: {e}" where e is the exception message. |
None
|
expand_queries(values, groups=('filters', 'variables'))
¶
get_dataframe_schema(df, columns=None)
¶
Returns a JSON schema optionally filtered by a subset of the columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
|
The DataFrame to describe with the schema |
required |
columns
|
List of columns to include in schema |
None
|
Returns:
| Type | Description |
|---|---|
|
The JSON schema describing the DataFrame |
is_ref(value)
¶
Whether the value is a reference.