savant_rs.py.client
Source/Sink framework for development and QA purposes.
- class savant_rs.py.client.FrameSource
Interface for frame sources.
- abstractmethod build_frame() Tuple[VideoFrame, bytes]
Build a frame.
- Returns:
A tuple of a frame metadata and its content.
- abstractmethod with_update(update: VideoFrameUpdate) T
Apply an update to a frame.
- class savant_rs.py.client.JaegerLogProvider(endpoint: str, login: str | None = None, password: str | None = None)
Log provider for Jaeger.
- Parameters:
endpoint – Jaeger endpoint URL.
login – Jaeger login.
password – Jaeger password.
- class savant_rs.py.client.JpegSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)
Frame source for JPEG files.
- Parameters:
source_id – Source ID.
file – Path to a JPEG file or a file handle to a JPEG file opened as binary.
pts – Frame presentation timestamp.
framerate – Framerate (numerator, denominator).
updates – List of frame updates.
- class savant_rs.py.client.LogProvider
Interface for log providers.
- class savant_rs.py.client.PngSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)
Frame source for PNG files.
- Parameters:
source_id – Source ID.
file – Path to a PNG file or a file handle to a PNG file opened as binary.
pts – Frame presentation timestamp.
framerate – Framerate (numerator, denominator).
updates – List of frame updates.
- class savant_rs.py.client.SinkBuilder(socket: str | None = None, log_provider: LogProvider | None = None, idle_timeout: int | None = None, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, source_id: str | None = None, source_id_prefix: str | None = None)
Builder for Sink.
Usage example:
sink = ( SinkBuilder() .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc') .with_idle_timeout(60) .with_log_provider(JaegerLogProvider('http://localhost:16686')) # Note: healthcheck port should be configured in the module. .with_module_health_check_url('http://172.17.0.1:8888/status') .build() ) for result in sink: print(result.frame_meta) result.logs().pretty_print()
Usage example (async):
sink = ( SinkBuilder() .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc') .with_idle_timeout(60) .with_log_provider(JaegerLogProvider('http://localhost:16686')) .build_async() ) async for result in sink: print(result.frame_meta) result.logs().pretty_print()
- build() SinkRunner
Build Sink.
- build_async() AsyncSinkRunner
Build async Sink.
- with_idle_timeout(idle_timeout: int | None) SinkBuilder
Set idle timeout for Sink.
Sink will stop trying to receive a message from ZeroMQ socket when it did not receive a message for idle_timeout seconds.
- with_log_provider(log_provider: LogProvider) SinkBuilder
Set log provider for Sink.
- with_module_health_check_interval(interval: float) SinkBuilder
Set module health check interval for Sink.
Sink will check the module health every specified interval.
- with_module_health_check_timeout(timeout: float) SinkBuilder
Set module health check timeout for Sink.
Sink will wait for the module to be ready for the specified timeout.
- with_module_health_check_url(url: str) SinkBuilder
Set module health check url for Sink.
Sink will check the module health before receiving any messages.
- with_socket(socket: str) SinkBuilder
Set ZeroMQ socket for Sink.
- with_source_id(source_id: str | None) SinkBuilder
Set source id for Sink.
Sink will filter messages by source id.
- with_source_id_prefix(source_id_prefix: str | None) SinkBuilder
Set source id prefix for Sink.
Sink will filter messages by source id prefix.
Note: source_id and source_id_prefix are mutually exclusive. If both are set, source_id will be used.
- class savant_rs.py.client.SourceBuilder(socket: str | None = None, log_provider: LogProvider | None = None, retries: int = 3, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, telemetry_enabled: bool = True)
Builder for Source.
Usage example:
source = ( SourceBuilder() .with_log_provider(JaegerLogProvider('http://localhost:16686')) .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc') # Note: healthcheck port should be configured in the module. .with_module_health_check_url('http://172.17.0.1:8888/status') .build() ) result = source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg')) result.logs().pretty_print()
Usage example (async):
source = ( SourceBuilder() .with_log_provider(JaegerLogProvider('http://localhost:16686')) .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc') .build_async() ) result = await source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg')) result.logs().pretty_print()
- build() SourceRunner
Build Source.
- build_async() AsyncSourceRunner
Build async Source.
- with_log_provider(log_provider) SourceBuilder
Set log provider for Source.
- with_module_health_check_interval(interval: float) SourceBuilder
Set module health check interval for Source.
Source will check the module health every specified interval.
- with_module_health_check_timeout(timeout: float) SourceBuilder
Set module health check timeout for Source.
Source will wait for the module to be ready for the specified timeout.
- with_module_health_check_url(url: str) SourceBuilder
Set module health check url for Source.
Source will check the module health before receiving any messages.
- with_retries(retries: int) SourceBuilder
Set number of retries for Source.
Source retries to send message to ZeroMQ socket if it fails.
- with_socket(socket: str) SourceBuilder
Set ZeroMQ socket for Source.
- with_telemetry_disabled() SourceBuilder
Disable telemetry for Source.
- with_telemetry_enabled() SourceBuilder
Enable telemetry for Source.
Builders
- class savant_rs.py.client.builder.source.SourceBuilder(socket: str | None = None, log_provider: LogProvider | None = None, retries: int = 3, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, telemetry_enabled: bool = True)
Builder for Source.
Usage example:
source = ( SourceBuilder() .with_log_provider(JaegerLogProvider('http://localhost:16686')) .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc') # Note: healthcheck port should be configured in the module. .with_module_health_check_url('http://172.17.0.1:8888/status') .build() ) result = source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg')) result.logs().pretty_print()
Usage example (async):
source = ( SourceBuilder() .with_log_provider(JaegerLogProvider('http://localhost:16686')) .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc') .build_async() ) result = await source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg')) result.logs().pretty_print()
- build() SourceRunner
Build Source.
- build_async() AsyncSourceRunner
Build async Source.
- with_log_provider(log_provider) SourceBuilder
Set log provider for Source.
- with_module_health_check_interval(interval: float) SourceBuilder
Set module health check interval for Source.
Source will check the module health every specified interval.
- with_module_health_check_timeout(timeout: float) SourceBuilder
Set module health check timeout for Source.
Source will wait for the module to be ready for the specified timeout.
- with_module_health_check_url(url: str) SourceBuilder
Set module health check url for Source.
Source will check the module health before receiving any messages.
- with_retries(retries: int) SourceBuilder
Set number of retries for Source.
Source retries to send message to ZeroMQ socket if it fails.
- with_socket(socket: str) SourceBuilder
Set ZeroMQ socket for Source.
- with_telemetry_disabled() SourceBuilder
Disable telemetry for Source.
- with_telemetry_enabled() SourceBuilder
Enable telemetry for Source.
- class savant_rs.py.client.builder.sink.SinkBuilder(socket: str | None = None, log_provider: LogProvider | None = None, idle_timeout: int | None = None, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, source_id: str | None = None, source_id_prefix: str | None = None)
Builder for Sink.
Usage example:
sink = ( SinkBuilder() .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc') .with_idle_timeout(60) .with_log_provider(JaegerLogProvider('http://localhost:16686')) # Note: healthcheck port should be configured in the module. .with_module_health_check_url('http://172.17.0.1:8888/status') .build() ) for result in sink: print(result.frame_meta) result.logs().pretty_print()
Usage example (async):
sink = ( SinkBuilder() .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc') .with_idle_timeout(60) .with_log_provider(JaegerLogProvider('http://localhost:16686')) .build_async() ) async for result in sink: print(result.frame_meta) result.logs().pretty_print()
- build() SinkRunner
Build Sink.
- build_async() AsyncSinkRunner
Build async Sink.
- with_idle_timeout(idle_timeout: int | None) SinkBuilder
Set idle timeout for Sink.
Sink will stop trying to receive a message from ZeroMQ socket when it did not receive a message for idle_timeout seconds.
- with_log_provider(log_provider: LogProvider) SinkBuilder
Set log provider for Sink.
- with_module_health_check_interval(interval: float) SinkBuilder
Set module health check interval for Sink.
Sink will check the module health every specified interval.
- with_module_health_check_timeout(timeout: float) SinkBuilder
Set module health check timeout for Sink.
Sink will wait for the module to be ready for the specified timeout.
- with_module_health_check_url(url: str) SinkBuilder
Set module health check url for Sink.
Sink will check the module health before receiving any messages.
- with_socket(socket: str) SinkBuilder
Set ZeroMQ socket for Sink.
- with_source_id(source_id: str | None) SinkBuilder
Set source id for Sink.
Sink will filter messages by source id.
- with_source_id_prefix(source_id_prefix: str | None) SinkBuilder
Set source id prefix for Sink.
Sink will filter messages by source id prefix.
Note: source_id and source_id_prefix are mutually exclusive. If both are set, source_id will be used.
Frame sources
- class savant_rs.py.client.frame_source.FrameSource
Interface for frame sources.
- abstractmethod build_frame() Tuple[VideoFrame, bytes]
Build a frame.
- Returns:
A tuple of a frame metadata and its content.
- abstractmethod with_update(update: VideoFrameUpdate) T
Apply an update to a frame.
- class savant_rs.py.client.image_source.image_source.ImageSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)
Frame source for image files.
- Parameters:
source_id – Source ID.
file – Path to an image file or a file handle to an image file opened as binary.
pts – Frame presentation timestamp.
framerate – Framerate (numerator, denominator).
updates – List of frame updates.
- build_frame() Tuple[VideoFrame, bytes]
Build a frame.
- Returns:
A tuple of a frame metadata and its content.
- property duration: int
Frame duration.
- property filepath: str | PathLike | None
Path to a JPEG file.
- property framerate: Tuple[int, int]
Framerate.
- property pts: int
Frame presentation timestamp.
- property source_id: str
Source ID.
- property updates: List[VideoFrameUpdate]
List of frame updates.
- with_framerate(framerate: Tuple[int, int]) T
Set framerate.
- with_pts(pts: int) T
Set frame presentation timestamp.
- with_update(update: VideoFrameUpdate) T
Apply an update to a frame.
- class savant_rs.py.client.image_source.image_source.JpegSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)
Frame source for JPEG files.
- Parameters:
source_id – Source ID.
file – Path to a JPEG file or a file handle to a JPEG file opened as binary.
pts – Frame presentation timestamp.
framerate – Framerate (numerator, denominator).
updates – List of frame updates.
- class savant_rs.py.client.image_source.image_source.PngSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)
Frame source for PNG files.
- Parameters:
source_id – Source ID.
file – Path to a PNG file or a file handle to a PNG file opened as binary.
pts – Frame presentation timestamp.
framerate – Framerate (numerator, denominator).
updates – List of frame updates.
Image header parse utility.
- savant_rs.py.client.image_source.img_header_parse.get_image_size_codec(file: str | PathLike | BinaryIO) Tuple[int, int, str]
Get JPEG or PNG image width and height by parsing the file header.
- Parameters:
file – Path to an image file or a file handle to an image file opened as binary.
- Returns:
Image width, height and codec.
Runners
- class savant_rs.py.client.runner.source.AsyncSourceRunner(socket: str, log_provider: LogProvider | None, retries: int, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, telemetry_enabled: bool, send_hwm: int = 50, receive_timeout: int = 5000)
Sends messages to ZeroMQ socket asynchronously.
- async send(source: FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str], send_eos: bool = True) SourceResult
Send source data to ZeroMQ socket.
- Parameters:
source – Source of the frame to send. Can be an instance of FrameSource or a tuple of VideoFrame and content, or a batch of frames with topic name. NB: If the source is used to send a stream of batches, the topic name must remain the same for all batches to maintain their order.
send_eos – Whether to send EOS after sending the source.
- Returns:
Result of sending the source.
- async send_eos(source_id: str) SourceResult
Send EOS for a source to ZeroMQ socket.
- Parameters:
source_id – Source ID.
- Returns:
Result of sending EOS.
- async send_iter(sources: Iterable[FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str] | EndOfStream] | AsyncIterable[FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str] | EndOfStream], send_eos: bool = True) AsyncIterable[SourceResult]
Send multiple sources to ZeroMQ socket.
- Parameters:
sources – Frame sources to send.
send_eos – Whether to send EOS after sending sources.
- Returns:
Results of sending the sources.
- async send_shutdown(zmq_topic: str, auth: str) SourceResult
Send Shutdown message for a source to ZeroMQ socket.
- Parameters:
zmq_topic – ZeroMQ topic name.
auth – Authentication key.
- Returns:
Result of sending Shutdown.
- class savant_rs.py.client.runner.source.SourceResult(trace_id: str | None, log_provider: LogProvider | None, source_ids: Set[str], pts: int | None, status: str)
Result of sending a message to ZeroMQ socket.
- pts: int | None
PTS of the frame.
- source_ids: Set[str]
Source IDs. Single source ID for a frame, multiple source IDs for a batch.
- status: str
Status of sending the message.
- class savant_rs.py.client.runner.source.SourceRunner(socket: str, log_provider: LogProvider | None, retries: int, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, telemetry_enabled: bool, send_hwm: int = 50, receive_timeout: int = 5000)
Sends messages to ZeroMQ socket.
- send(source: FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str], send_eos: bool = True) SourceResult
Send source data to ZeroMQ socket.
- Parameters:
source – Source of the frame to send. Can be an instance of FrameSource or a tuple of VideoFrame and content, or a batch of frames with topic name. NB: If the source is used to send a stream of batches, the topic name must remain the same for all batches to maintain their order.
send_eos – Whether to send EOS after sending the source.
- Returns:
Result of sending the source.
- send_eos(source_id: str) SourceResult
Send EOS for a source to ZeroMQ socket.
- Parameters:
source_id – Source ID.
- Returns:
Result of sending EOS.
- send_iter(sources: Iterable[FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str] | EndOfStream], send_eos: bool = True) Iterable[SourceResult]
Send multiple sources to ZeroMQ socket.
- Parameters:
sources – Frame sources to send.
send_eos – Whether to send EOS after sending sources.
- Returns:
Results of sending the sources.
- send_shutdown(zmq_topic: str, auth: str) SourceResult
Send Shutdown message for a source to ZeroMQ socket.
- Parameters:
zmq_topic – ZeroMQ topic name.
auth – Authentication key.
- Returns:
Result of sending Shutdown.
- class savant_rs.py.client.runner.sink.AsyncSinkRunner(socket: str, log_provider: LogProvider | None, idle_timeout: int | None, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, receive_timeout: int = 1000, receive_hwm: int = 50, source_id: str | None = None, source_id_prefix: str | None = None)
Receives messages from ZeroMQ socket asynchronously.
- class savant_rs.py.client.runner.sink.BaseSinkRunner(socket: str, log_provider: LogProvider | None, idle_timeout: int | None, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, receive_timeout: int = 1000, receive_hwm: int = 50, source_id: str | None = None, source_id_prefix: str | None = None)
- class savant_rs.py.client.runner.sink.SinkResult(trace_id: str | None, log_provider: LogProvider | None, frame_batch: VideoFrameBatch | None, frame_meta: VideoFrame | None, frame_content: bytes | None, eos: EndOfStream | None)
Result of receiving a message from ZeroMQ socket.
frame_batch, frame_meta+frame_content, and eos are mutually exclusive.
- eos: EndOfStream | None
End of stream.
- frame_batch: VideoFrameBatch | None
Video frame batch.
- frame_meta: VideoFrame | None
Video frame metadata.
- class savant_rs.py.client.runner.sink.SinkRunner(socket: str, log_provider: LogProvider | None, idle_timeout: int | None, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, receive_timeout: int = 1000, receive_hwm: int = 50, source_id: str | None = None, source_id_prefix: str | None = None)
Receives messages from ZeroMQ socket.
- class savant_rs.py.client.runner.healthcheck.HealthCheck(url: str, interval: float, timeout: float)
Service to check the health of the module.
- Parameters:
url – URL of the health check endpoint.
interval – Interval between health checks in seconds.
timeout – Timeout for waiting the module to be ready in seconds.
- async async_check() str | None
- async async_wait_module_is_ready()
Wait until the module is ready. Async version.
- check() str | None
Check the health of the module.
- wait_module_is_ready()
Wait until the module is ready.
- class savant_rs.py.client.runner.log_result.LogResult(trace_id: str | None, log_provider: savant_rs.py.client.log_provider.LogProvider | None)
- log_provider: LogProvider | None
Log provider for to fetch the logs.
- trace_id: str | None
OpenTelemetry trace ID of the message.
Log providers
- class savant_rs.py.client.log_provider.LogEntry(timestamp: datetime, level: str, target: str, message: str, attributes: Dict[str, Any])
Single log entry.
- attributes: Dict[str, Any]
- level: str
- message: str
- pretty_format() str
Get pretty formatted string of log entry.
- target: str
- timestamp: datetime
- class savant_rs.py.client.log_provider.LogProvider
Interface for log providers.
- class savant_rs.py.client.log_provider.Logs(entries: List[LogEntry])
Collection of log entries for specific trace ID.
- pretty_format() str
Get pretty formatted string of logs.
- pretty_print()
Print pretty formatted logs.
- class savant_rs.py.client.log_provider.jaeger.JaegerLogProvider(endpoint: str, login: str | None = None, password: str | None = None)
Log provider for Jaeger.
- Parameters:
endpoint – Jaeger endpoint URL.
login – Jaeger login.
password – Jaeger password.
Utilities
Savant Client SDK utilities.
- savant_rs.py.client.utils.resize_preserving_aspect(img: ndarray, target_size: Tuple[int, int]) ndarray
Resize image while preserving aspect ratio of the image contents.
- Parameters:
img – Image to resize.
target_size – Target size (img_w, img_h).
- Returns:
Resized image.
Image resizing utilities.
- savant_rs.py.client.utils.img_resize.pad_to_aspect(img: ndarray, target_size: Tuple[int, int]) ndarray
Ensure that the image has the given aspect ratio by adding padding without resizing the image contents.
- Parameters:
img – Image to pad.
target_size – Target size (img_w, img_h).
- Returns:
Padded image.
- savant_rs.py.client.utils.img_resize.resize_preserving_aspect(img: ndarray, target_size: Tuple[int, int]) ndarray
Resize image while preserving aspect ratio of the image contents.
- Parameters:
img – Image to resize.
target_size – Target size (img_w, img_h).
- Returns:
Resized image.