Scaling with Dask#
This guide demonstrates how you can offload tasks to Dask to scale your apps to bigger datasets, bigger calculations and more users.
Panel supports async
and await
. This means you can easily offload large computations to your Dask cluster asynchronously and keep your app responsive while you await
the results. Please note that off loading the computations to the Dask cluster can add ~250msec of overhead and thus is not suitable for all kinds of use cases.
Installation#
Lets start by installing Panel, hvPlot and Dask Distributed.
pip install panel hvplot dask[distributed]
Start the Cluster#
For development, testing and many use cases a LocalCluster
is more than fine and will allow you to leverage all the CPUs on your machine. When you want to scale out to an entire cluster will you can switch to a non-local cluster. To avoid any issues when combining Panel and Dask we recommend starting the LocalCluster
separately from the Dask Client
and your Panel app.
# cluster.py
from dask.distributed import LocalCluster
DASK_SCHEDULER_PORT = 64719
DASK_SCHEDULER_ADDRESS = f"tcp://127.0.0.1:{DASK_SCHEDULER_PORT}"
N_WORKERS = 4
if __name__ == '__main__':
cluster = LocalCluster(scheduler_port=DASK_SCHEDULER_PORT, n_workers=N_WORKERS)
print(cluster.scheduler_address)
input()
and running
$ python cluster.py
tcp://127.0.0.1:64719
You can now open the Dask Dashboard at http://localhost:8787/status.
So far there is not a lot to see here:
The Dask Client
will serialize any tasks and send them to the Dask Cluster
for execution. This means that the Client
and Cluster
must able to import the same versions of all tasks and python package dependencies.
Dask Distributed#
Fibonacci Task Queue#
In this section we will define a Panel app to submit and monitor Fibonacci tasks.
Let’s start by defining the fibonacci tasks in a tasks.py
file:
# tasks.py
from datetime import datetime as dt
import numpy as np
def _fib(n):
if n < 2:
return n
else:
return _fib(n - 1) + _fib(n - 2)
def fibonacci(n):
start = dt.now()
print(start, "start", n)
result = _fib(n)
end = dt.now()
print(end, "end", (end-start).seconds, n, result)
return result
Lets now define the full app.py
file.
# app.py
from datetime import datetime as dt
from dask.distributed import Client
import panel as pn
from cluster import DASK_SCHEDULER_ADDRESS
from tasks import fibonacci
QUEUE = []
pn.extension("terminal", design="material", sizing_mode="stretch_width")
@pn.cache # We use caching to share the client across all users and sessions
async def get_client():
return await Client(
DASK_SCHEDULER_ADDRESS, asynchronous=True
)
n_input = pn.widgets.IntInput(value=0, width=100, sizing_mode="fixed", name="n")
submit_button = pn.widgets.Button(name="SUBMIT", button_type="primary", align="end")
terminal_widget = pn.widgets.Terminal(
height=200,
)
queue = pn.rx(QUEUE)
@pn.depends(submit_button, watch=True)
async def _handle_click(_):
n = n_input.value
n_input.value += 1
start = dt.now()
QUEUE.append(n)
queue.rx.value = QUEUE
client = await get_client()
fib_n = await client.submit(fibonacci, n)
end = dt.now()
QUEUE.pop(QUEUE.index(n))
queue.rx.value = QUEUE
duration = (end - start).seconds
terminal_widget.write(f"fibonacci({n})={fib_n} in {duration}sec\n")
pn.Column(
"# Fibonacci Tasks",
pn.Row(n_input, submit_button),
pn.rx("## Task queue: {}").format(queue),
"## Results",
terminal_widget,
).servable()
You can now run panel serve app.py
and the app will look like
Dask Dashboard Components#
It can be very useful to include some of the live Dask endpoints in your app. Its easy to do by embedding the specific urls in an iframe.
In the dashboard.py
file we define the DaskViewer
component that can be used to explore the individual dask plots.
# dashboard.py
import os
import param
import panel as pn
DASK_DASHBOARD_ADDRESS = os.getenv("DASK_DASHBOARD", "http://localhost:8787/status")
VIEWS = {
"aggregate-time-per-action": "individual-aggregate-time-per-action",
"bandwidth-types": "individual-bandwidth-types",
"bandwidth-workers": "individual-bandwidth-workers",
"cluster-memory": "individual-cluster-memory",
"compute-time-per-key": "individual-compute-time-per-key",
"cpu": "individual-cpu",
"exceptions": "individual-exceptions",
"gpu-memory": "individual-gpu-memory",
"gpu-utilization": "individual-gpu-utilization",
"graph": "individual-graph",
"groups": "individual-groups",
"memory-by-key": "individual-memory-by-key",
"nprocessing": "individual-nprocessing",
"occupancy": "individual-occupancy",
"profile-server": "individual-profile-server",
"profile": "individual-profile",
"progress": "individual-progress",
"scheduler-system": "individual-scheduler-system",
"task-stream": "individual-task-stream",
"workers-cpu-timeseries": "individual-workers-cpu-timeseries",
"workers-disk-timeseries": "individual-workers-disk-timeseries",
"workers-disk": "individual-workers-disk",
"workers-memory-timeseries": "individual-workers-memory-timeseries",
"workers-memory": "individual-workers-memory",
"workers-network-timeseries": "individual-workers-network-timeseries",
"workers-network": "individual-workers-network",
"workers": "individual-workers",
}
VIEWER_PARAMETERS = ["url", "path"]
def dask_dashboard_view(path="individual-cpu", url=DASK_DASHBOARD_ADDRESS):
url = url.replace("/status", "/") + path
return f"""<iframe src="{url}" frameBorder="0" style="height:100%;width:100%"></iframe>"""
class DaskViewer(pn.viewable.Viewer):
url = param.String(DASK_DASHBOARD_ADDRESS, doc="The url of the Dask status dashboard")
path = param.Selector(default="individual-cpu", objects=VIEWS, doc="the endpoint", label="View")
def __init__(self, size=20, **params):
viewer_params = {k:v for k, v in params.items() if k in VIEWER_PARAMETERS}
layout_params = {k:v for k, v in params.items() if k not in VIEWER_PARAMETERS}
super().__init__(**viewer_params)
view = pn.bind(dask_dashboard_view, self.param.path, self.param.url)
self._iframe = pn.pane.HTML(view, sizing_mode="stretch_both")
self._select = pn.widgets.Select.from_param(self.param.path, size=size, width=300, sizing_mode="fixed", margin=(20,5,10,5))
self._link = pn.panel(f"""<a href="{DASK_DASHBOARD_ADDRESS}" target="_blank">Dask Dashboard</a>""", height=50, margin=(0,20))
self._panel = pn.Column(pn.Row(self._iframe, self._select, sizing_mode="stretch_both"), self._link, **layout_params)
def __panel__(self):
return self._panel
if __name__.startswith("bokeh"):
pn.extension(sizing_mode="stretch_width")
DaskViewer(height=500, size=25).servable()
Try running panel serve dashboard.py
. If your Dask cluster is working, you will see something like