Set Up Manual Threading#
Enabling threading in Panel, as demonstrated in the automatic threading guide, provides a simple method to achieve concurrency. However, there are situations where greater control is necessary.
Below, we will demonstrate how to safely implement threads either per session or globally across multiple sessions.
Session Thread#
This section illustrates how to use a Thread
to process tasks within a queue, with one thread initiated per session to handle tasks individually per session.
We simulate task processing using time.sleep
, but this could represent any long-running computation.
import datetime
import threading
import time
from typing import Callable
import param
import panel as pn
pn.extension()
class SessionTaskRunner(pn.viewable.Viewer):
value = param.Parameter(
doc="The last result or exception", label="Last Result", constant=True
)
tasks: int = param.Integer(doc="Number of tasks in the queue", constant=True)
status: str = param.String(
default="The queue is empty", doc="Status message", constant=True
)
worker: Callable = param.Callable(
allow_None=False, doc="Function that processes the task"
)
def __init__(self, **params):
super().__init__(**params)
self._queue = []
self._stop_thread = False
self._event = threading.Event()
self._thread = threading.Thread(target=self._task_runner, daemon=True)
self._thread.start()
pn.state.on_session_destroyed(self._session_destroyed)
def _log(self, message):
print(f"{id(self)} - {message}")
def _task_runner(self):
while not self._stop_thread:
while self._queue:
with param.edit_constant(self):
self.status = f"Processing: {len(self._queue)} items left."
self._log(self.status)
task = self._queue.pop(0)
try:
result = self.worker(task)
with param.edit_constant(self):
self.value = result
except Exception as ex:
self.value = ex
with param.edit_constant(self):
self.tasks = len(self._queue)
self.status = self.param.status.default
self._event.clear()
if not self._queue and not self._stop_thread:
self._log("Waiting for a task")
self._event.wait()
self._log("Finished Task Runner")
def _stop_thread_func(self):
self._log(f"{id(self)} - Stopping Task Runner")
self._stop_thread = True
self._event.set()
def _session_destroyed(self, session_context):
self._stop_thread_func()
def __del__(self):
self._stop_thread_func()
def __panel__(self):
return pn.Column(
f"## Session TaskRunner {id(self)}",
pn.pane.Str(self.param.status),
pn.pane.Str(pn.rx("Last Result: {value}").format(value=self.param.value)),
)
def append(self, task):
"""Appends a task to the queue"""
self._queue.append(task)
with param.edit_constant(self):
self.tasks = len(self._queue)
self._event.set()
We will now create a task runner and a callback that queues new tasks for processing when a button is clicked:
def example_worker(task):
time.sleep(1)
return datetime.datetime.now()
task_runner = SessionTaskRunner(worker=example_worker)
def add_task(event):
task_runner.append("task")
button = pn.widgets.Button(name="Add Task", on_click=add_task, button_type="primary")
pn.Column(button, task_runner).servable()
The application should look like:
Since processing occurs on a separate thread, the application remains responsive to further user interactions, such as queuing new tasks.
Note
To use threading efficiently:
We terminate the thread upon session destruction to prevent it from consuming resources indefinitely.
We use daemon threads (
daemon=True
) to allow the server to be stopped using CTRL+C.We employ the
Event.wait
method for efficient task-waiting, which is more resource-efficient compared to repeatedly sleeping and checking for new tasks usingtime.sleep
.
Global Thread#
When we need to share data periodically across all sessions, it is often inefficient to fetch and process this data separately for each session.
Instead, we can utilize a single thread. When initiating global threads, it’s crucial to avoid starting them multiple times, especially in sessions or modules subject to the --dev
flag. To circumvent this issue, we can globally share a worker or thread through the Panel cache (pn.state.cache
).
Let’s create a GlobalTaskRunner
that accepts a function (worker
) and executes it repeatedly, pausing for sleep
seconds between each execution.
This worker can be used to ingest data from a database, the web, or any server resource.
import datetime
import threading
import time
from typing import Callable
import param
import panel as pn
pn.extension()
class GlobalTaskRunner(pn.viewable.Viewer):
"""The GlobalTaskRunner creates a singleton instance for each key."""
value = param.Parameter(doc="The most recent result", label="Last Result", constant=True)
exception: Exception = param.ClassSelector(
class_=Exception,
allow_None=True,
doc="The most recent exception, if any",
label="Last Exception",
constant=True,
)
worker: Callable = param.Callable(
allow_None=False, doc="Function that generates a result"
)
seconds: float = param.Number(
default=1.0, doc="Interval between worker calls", bounds=(0.001, None)
)
key: str = param.String(allow_None=False, constant=True)
_global_task_runner_key = "__global_task_runners__"
def __init__(self, key: str, **params):
super().__init__(key=key, **params)
self._stop_thread = False
self._thread = threading.Thread(target=self._task_runner, daemon=True)
self._thread.start()
self._log("Created")
def __new__(cls, key, **kwargs):
task_runners = pn.state.cache[cls._global_task_runner_key] = pn.state.cache.get(
cls._global_task_runner_key, {}
)
task_runner = task_runners.get(key, None)
if not task_runner:
task_runner = super(GlobalTaskRunner, cls).__new__(cls)
task_runners[key] = task_runner
return task_runner
def _log(self, message):
print(f"{id(self)} - {message}")
def _task_runner(self):
while not self._stop_thread:
try:
result = self.worker()
with param.edit_constant(self):
self.value = result
self.exception = None
except Exception as ex:
with param.edit_constant(self):
self.exception = ex
if not self._stop_thread:
self._log("Sleeping")
time.sleep(self.seconds)
self._log("Task Runner Finished")
def remove(self):
"""Securely stops and removes the GlobalThreadWorker."""
self._log("Removing")
self._stop_thread = True
self._thread.join()
cache = pn.state.cache.get(self._global_task_runner_key, {})
if self.key in cache:
del cache[self.key]
self._log("Removed")
@classmethod
def remove_all(cls):
"""Securely stops and removes all GlobalThreadWorkers."""
for gtw in list(pn.state.cache.get(cls._global_task_runner_key, {}).values()):
gtw.remove()
pn.state.cache[cls._global_task_runner_key] = {}
def __panel__(self):
return pn.Column(
f"## Global TaskRunner {id(self)}",
self.param.seconds,
pn.pane.Str(pn.rx("Last Result: {value}").format(value=self.param.value)),
pn.pane.Str(
pn.rx("Last Exception: {value}").format(value=self.param.exception)
),
)
Let’s test this with a simple example worker that generates timestamps every 0.33 seconds.
def example_worker():
time.sleep(1)
return datetime.datetime.now()
task_runner = GlobalTaskRunner(
key="example-worker", worker=example_worker, seconds=0.33
)
results = []
@pn.depends(task_runner.param.value)
def result_view(value):
results.append(value)
return f"{len(results)} results produced during this session"
pn.Column(
task_runner, result_view,
).servable()
The application should look like:
Note
For efficient use of global threading:
We employ the singleton principle (
__new__
) to create only one instance and thread per key.We use daemon threads (
daemon=True
) to ensure the server can be halted using CTRL+C.