Build a Server Side Video Camera Application#
Welcome to our tutorial on building a server-side video camera application using HoloViz Panel! In this fun and engaging guide, we’ll walk you through the process of setting up a video stream from a camera connected to a server, not the user’s machine. This approach uses Python’s threading to handle real-time video processing without freezing the user interface.
Code
server_video_stream.py
import threading
import time
import cv2 as cv
import param
from PIL import Image
import panel as pn
class CannotOpenCamera(Exception):
"""Exception raised if the camera cannot be opened."""
class CannotReadCamera(Exception):
"""Exception raised if the camera cannot be read."""
class ServerVideoStream(pn.viewable.Viewer):
value = param.Parameter(doc="The current snapshot as a Pillow Image")
paused = param.Boolean(default=False, doc="Whether the video stream is paused")
fps = param.Number(10, doc="Frames per second", inclusive_bounds=(0, None))
camera_index = param.Integer(0, doc="The index of the active camera")
def __init__(self, **params):
super().__init__(**params)
self._cameras = {}
self._stop_thread = False
self._thread = threading.Thread(target=self._take_images)
self._thread.daemon = True
def start(self, camera_indices=None):
if camera_indices:
for index in camera_indices:
self.get_camera(index)
if not self._thread.is_alive():
self._thread.start()
def get_camera(self, index):
if index in self._cameras:
return self._cameras[index]
cap = cv.VideoCapture(index)
if not cap.isOpened():
raise CannotOpenCamera(f"Cannot open the camera {index}")
self._cameras[index] = cap
return cap
@staticmethod
def _cv2_to_pil(bgr_image):
rgb_image = cv.cvtColor(bgr_image, cv.COLOR_BGR2RGB)
image = Image.fromarray(rgb_image)
return image
def _take_image(self):
camera = self.get_camera(self.camera_index)
ret, frame = camera.read()
if not ret:
raise CannotReadCamera("Ensure the camera exists and is not in use by other processes.")
else:
self.value = self._cv2_to_pil(frame)
def _take_images(self):
while not self._stop_thread:
start_time = time.time()
if not self.paused:
try:
self._take_image()
except Exception as ex:
print("Error: Could not capture image.")
print(ex)
if self.fps > 0:
interval = 1 / self.fps
elapsed_time = time.time() - start_time
sleep_time = max(0, interval - elapsed_time)
time.sleep(sleep_time)
def __del__(self):
self._stop_thread = True
if self._thread.is_alive():
self._thread.join()
for camera in self._cameras.values():
camera.release()
cv.destroyAllWindows()
def __panel__(self):
settings = pn.Column(
self.param.paused,
self.param.fps,
self.param.camera_index,
width=300,
)
image = pn.pane.Image(self.param.value, sizing_mode="stretch_both")
return pn.Row(settings, image)
server_video_stream = ServerVideoStream()
server_video_stream.start()
app.py
import panel as pn
from server_video_stream import server_video_stream
pn.extension()
server_video_stream.servable()
Let’s dive into the code and see how it all comes together.
Install the Dependencies#
To run the application, you’ll need several packages:
OpenCV (
opencv
): A library for computer vision tasks, here used to interface with the camera.Panel (
panel
): A high-level app and dashboarding solution for Python, used to create the web interface.Pillow (
pillow
): An imaging library, used here to convert images from OpenCV format to a format suitable for web display.
You can install these using conda or pip:
conda install -y -c conda-forge opencv panel pillow watchfiles
pip install opencv-python panel pillow watchfiles
Build the App#
File Breakdown#
This project consists of two Python files:
server_video_stream.py
- Contains the reusableServerVideoStream
component.app.py
- A simple script that utilizes theserver_video_stream
component.
Breakdown of server_video_stream.py
#
Importing Libraries and Handling Exceptions#
import threading
import time
import cv2 as cv
import param
from PIL import Image
import panel as pn
class CannotOpenCamera(Exception):
"""Exception raised if the camera cannot be opened."""
class CannotReadCamera(Exception):
"""Exception raised if the camera cannot be read."""
We begin by importing the necessary libraries:
threading: For running background tasks that do not block the main program.
time: To manage frame rates and delays.
cv2: The OpenCV library for capturing and processing video frames.
param: A component of the HoloViz ecosystem for declaring parameters.
Image from PIL: To convert images from OpenCV’s format to a web-friendly format.
panel: The Panel library for creating web interfaces.
We also define several custom exceptions to manage specific errors related to camera operations.
Defining the ServerVideoStream
Class#
class ServerVideoStream(pn.viewable.Viewer):
value = param.Parameter(doc="The current snapshot as a Pillow Image")
paused = param.Boolean(default=False, doc="Whether the video stream is paused")
fps = param.Number(10, doc="Frames per second", inclusive_bounds=(0, None))
camera_index = param.Integer(0, doc="The index of the active camera")
The ServerVideoStream
class extends pn.viewable.Viewer
, enabling its display in a Panel app. It includes parameters to control the stream:
value: Holds the current video frame.
paused: A toggle to pause or resume the video capture.
fps: Determines the frame rate of the video stream.
camera_index: Specifies which camera to use if multiple are available.
Initializing and Managing Cameras#
def __init__(self, **params):
super().__init__(**params)
self._cameras = {}
self._stop_thread = False
self._thread = threading.Thread(target=self._take_images)
self._thread.daemon = True
def start(self, camera_indices):
if camera_indices:
for index in camera_indices:
self.get_camera(index)
if not self._thread.is_alive():
self._thread.start()
The constructor initializes a thread for capturing images and managing the stream. The start
method activates the cameras and starts the thread if it isn’t already running.
Capturing and Displaying Images#
def get_camera(self, index):
if index in self._cameras:
return self._cameras[index]
cap = cv.VideoCapture(index)
if not cap.isOpened():
raise CannotOpenCamera(f"Cannot open the camera {index}")
self._cameras[index] = cap
return cap
@staticmethod
def _cv2_to_pil(bgr_image):
rgb_image = cv.cvtColor(bgr_image, cv.COLOR_BGR2RGB)
image = Image.fromarray(rgb_image)
return image
def _take_image(self):
camera = self.get_camera(self.camera_index)
ret, frame = camera.read()
if not ret:
raise CannotReadCamera("Ensure the camera exists and is not in use by other processes.")
else:
self.value = self._cv2_to_pil(frame)
def _take_images(self):
while not self._stop_thread:
start_time = time.time()
if not self.paused:
try:
self._take_image()
except Exception as ex:
print("Error: Could not capture image.")
print(ex)
if self.fps > 0:
interval = 1 / self.fps
elapsed_time = time.time() - start_time
sleep_time = max(0, interval - elapsed_time)
time.sleep(sleep_time)
def __del__(self):
self._stop_thread = True
if self._thread.is_alive():
self._thread.join()
for camera in self._cameras.values():
camera.release()
cv.destroyAllWindows()
The _take_images
method runs in a loop within a separate thread, capturing images at the specified fps unless paused. This setup ensures the app remains responsive by not blocking the main thread.
Display Setup#
def __panel__(self):
settings = pn.Column(
self.param.paused,
self.param.fps,
self.param.camera_index,
width=300,
)
image = pn.pane.Image(self.param.value, sizing_mode="stretch_both")
return pn.Row(settings, image)
The __panel__
method defines how the class is rendered in a web page. It creates a user interface with controls for the camera settings and displays the current video frame.
app.py
- Making It Servable#
import panel as pn
from server_video_stream import server_video_stream
pn.extension()
server_video_stream.servable()
This script initializes the Panel extension and makes the server_video_stream
instance available as a web app.
Try serving the app with
panel serve app.py
It should look like: