savant_rs.py.client

Source/Sink framework for development and QA purposes.

class savant_rs.py.client.FrameSource

Interface for frame sources.

abstractmethod build_frame() Tuple[VideoFrame, bytes]

Build a frame.

Returns:

A tuple of a frame metadata and its content.

abstractmethod with_update(update: VideoFrameUpdate) T

Apply an update to a frame.

class savant_rs.py.client.JaegerLogProvider(endpoint: str, login: str | None = None, password: str | None = None)

Log provider for Jaeger.

Parameters:
  • endpoint – Jaeger endpoint URL.

  • login – Jaeger login.

  • password – Jaeger password.

class savant_rs.py.client.JpegSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)

Frame source for JPEG files.

Parameters:
  • source_id – Source ID.

  • file – Path to a JPEG file or a file handle to a JPEG file opened as binary.

  • pts – Frame presentation timestamp.

  • framerate – Framerate (numerator, denominator).

  • updates – List of frame updates.

class savant_rs.py.client.LogProvider

Interface for log providers.

logs(trace_id: str) Logs

Fetch logs for given trace ID.

class savant_rs.py.client.PngSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)

Frame source for PNG files.

Parameters:
  • source_id – Source ID.

  • file – Path to a PNG file or a file handle to a PNG file opened as binary.

  • pts – Frame presentation timestamp.

  • framerate – Framerate (numerator, denominator).

  • updates – List of frame updates.

class savant_rs.py.client.SinkBuilder(socket: str | None = None, log_provider: LogProvider | None = None, idle_timeout: int | None = None, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, source_id: str | None = None, source_id_prefix: str | None = None)

Builder for Sink.

Usage example:

sink = (
    SinkBuilder()
    .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc')
    .with_idle_timeout(60)
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    # Note: healthcheck port should be configured in the module.
    .with_module_health_check_url('http://172.17.0.1:8888/status')
    .build()
)
for result in sink:
    print(result.frame_meta)
    result.logs().pretty_print()

Usage example (async):

sink = (
    SinkBuilder()
    .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc')
    .with_idle_timeout(60)
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    .build_async()
)
async for result in sink:
    print(result.frame_meta)
    result.logs().pretty_print()
build() SinkRunner

Build Sink.

build_async() AsyncSinkRunner

Build async Sink.

with_idle_timeout(idle_timeout: int | None) SinkBuilder

Set idle timeout for Sink.

Sink will stop trying to receive a message from ZeroMQ socket when it did not receive a message for idle_timeout seconds.

with_log_provider(log_provider: LogProvider) SinkBuilder

Set log provider for Sink.

with_module_health_check_interval(interval: float) SinkBuilder

Set module health check interval for Sink.

Sink will check the module health every specified interval.

with_module_health_check_timeout(timeout: float) SinkBuilder

Set module health check timeout for Sink.

Sink will wait for the module to be ready for the specified timeout.

with_module_health_check_url(url: str) SinkBuilder

Set module health check url for Sink.

Sink will check the module health before receiving any messages.

with_socket(socket: str) SinkBuilder

Set ZeroMQ socket for Sink.

with_source_id(source_id: str | None) SinkBuilder

Set source id for Sink.

Sink will filter messages by source id.

with_source_id_prefix(source_id_prefix: str | None) SinkBuilder

Set source id prefix for Sink.

Sink will filter messages by source id prefix.

Note: source_id and source_id_prefix are mutually exclusive. If both are set, source_id will be used.

class savant_rs.py.client.SourceBuilder(socket: str | None = None, log_provider: LogProvider | None = None, retries: int = 3, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, telemetry_enabled: bool = True)

Builder for Source.

Usage example:

source = (
    SourceBuilder()
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc')
    # Note: healthcheck port should be configured in the module.
    .with_module_health_check_url('http://172.17.0.1:8888/status')
    .build()
)
result = source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg'))
result.logs().pretty_print()

Usage example (async):

source = (
    SourceBuilder()
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc')
    .build_async()
)
result = await source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg'))
result.logs().pretty_print()
build() SourceRunner

Build Source.

build_async() AsyncSourceRunner

Build async Source.

with_log_provider(log_provider) SourceBuilder

Set log provider for Source.

with_module_health_check_interval(interval: float) SourceBuilder

Set module health check interval for Source.

Source will check the module health every specified interval.

with_module_health_check_timeout(timeout: float) SourceBuilder

Set module health check timeout for Source.

Source will wait for the module to be ready for the specified timeout.

with_module_health_check_url(url: str) SourceBuilder

Set module health check url for Source.

Source will check the module health before receiving any messages.

with_retries(retries: int) SourceBuilder

Set number of retries for Source.

Source retries to send message to ZeroMQ socket if it fails.

with_socket(socket: str) SourceBuilder

Set ZeroMQ socket for Source.

with_telemetry_disabled() SourceBuilder

Disable telemetry for Source.

with_telemetry_enabled() SourceBuilder

Enable telemetry for Source.

Builders

class savant_rs.py.client.builder.source.SourceBuilder(socket: str | None = None, log_provider: LogProvider | None = None, retries: int = 3, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, telemetry_enabled: bool = True)

Builder for Source.

Usage example:

source = (
    SourceBuilder()
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc')
    # Note: healthcheck port should be configured in the module.
    .with_module_health_check_url('http://172.17.0.1:8888/status')
    .build()
)
result = source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg'))
result.logs().pretty_print()

Usage example (async):

source = (
    SourceBuilder()
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    .with_socket('req+connect:ipc:///tmp/zmq-sockets/input-video.ipc')
    .build_async()
)
result = await source(JpegSource('cam-1', 'data/AVG-TownCentre.jpeg'))
result.logs().pretty_print()
build() SourceRunner

Build Source.

build_async() AsyncSourceRunner

Build async Source.

with_log_provider(log_provider) SourceBuilder

Set log provider for Source.

with_module_health_check_interval(interval: float) SourceBuilder

Set module health check interval for Source.

Source will check the module health every specified interval.

with_module_health_check_timeout(timeout: float) SourceBuilder

Set module health check timeout for Source.

Source will wait for the module to be ready for the specified timeout.

with_module_health_check_url(url: str) SourceBuilder

Set module health check url for Source.

Source will check the module health before receiving any messages.

with_retries(retries: int) SourceBuilder

Set number of retries for Source.

Source retries to send message to ZeroMQ socket if it fails.

with_socket(socket: str) SourceBuilder

Set ZeroMQ socket for Source.

with_telemetry_disabled() SourceBuilder

Disable telemetry for Source.

with_telemetry_enabled() SourceBuilder

Enable telemetry for Source.

class savant_rs.py.client.builder.sink.SinkBuilder(socket: str | None = None, log_provider: LogProvider | None = None, idle_timeout: int | None = None, module_health_check_url: str | None = None, module_health_check_timeout: float = 60, module_health_check_interval: float = 5, source_id: str | None = None, source_id_prefix: str | None = None)

Builder for Sink.

Usage example:

sink = (
    SinkBuilder()
    .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc')
    .with_idle_timeout(60)
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    # Note: healthcheck port should be configured in the module.
    .with_module_health_check_url('http://172.17.0.1:8888/status')
    .build()
)
for result in sink:
    print(result.frame_meta)
    result.logs().pretty_print()

Usage example (async):

sink = (
    SinkBuilder()
    .with_socket('rep+connect:ipc:///tmp/zmq-sockets/output-video.ipc')
    .with_idle_timeout(60)
    .with_log_provider(JaegerLogProvider('http://localhost:16686'))
    .build_async()
)
async for result in sink:
    print(result.frame_meta)
    result.logs().pretty_print()
build() SinkRunner

Build Sink.

build_async() AsyncSinkRunner

Build async Sink.

with_idle_timeout(idle_timeout: int | None) SinkBuilder

Set idle timeout for Sink.

Sink will stop trying to receive a message from ZeroMQ socket when it did not receive a message for idle_timeout seconds.

with_log_provider(log_provider: LogProvider) SinkBuilder

Set log provider for Sink.

with_module_health_check_interval(interval: float) SinkBuilder

Set module health check interval for Sink.

Sink will check the module health every specified interval.

with_module_health_check_timeout(timeout: float) SinkBuilder

Set module health check timeout for Sink.

Sink will wait for the module to be ready for the specified timeout.

with_module_health_check_url(url: str) SinkBuilder

Set module health check url for Sink.

Sink will check the module health before receiving any messages.

with_socket(socket: str) SinkBuilder

Set ZeroMQ socket for Sink.

with_source_id(source_id: str | None) SinkBuilder

Set source id for Sink.

Sink will filter messages by source id.

with_source_id_prefix(source_id_prefix: str | None) SinkBuilder

Set source id prefix for Sink.

Sink will filter messages by source id prefix.

Note: source_id and source_id_prefix are mutually exclusive. If both are set, source_id will be used.

Frame sources

class savant_rs.py.client.frame_source.FrameSource

Interface for frame sources.

abstractmethod build_frame() Tuple[VideoFrame, bytes]

Build a frame.

Returns:

A tuple of a frame metadata and its content.

abstractmethod with_update(update: VideoFrameUpdate) T

Apply an update to a frame.

class savant_rs.py.client.image_source.image_source.ImageSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)

Frame source for image files.

Parameters:
  • source_id – Source ID.

  • file – Path to an image file or a file handle to an image file opened as binary.

  • pts – Frame presentation timestamp.

  • framerate – Framerate (numerator, denominator).

  • updates – List of frame updates.

build_frame() Tuple[VideoFrame, bytes]

Build a frame.

Returns:

A tuple of a frame metadata and its content.

property duration: int

Frame duration.

property filepath: str | PathLike | None

Path to a JPEG file.

property framerate: Tuple[int, int]

Framerate.

property pts: int

Frame presentation timestamp.

property source_id: str

Source ID.

property updates: List[VideoFrameUpdate]

List of frame updates.

with_framerate(framerate: Tuple[int, int]) T

Set framerate.

with_pts(pts: int) T

Set frame presentation timestamp.

with_update(update: VideoFrameUpdate) T

Apply an update to a frame.

class savant_rs.py.client.image_source.image_source.JpegSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)

Frame source for JPEG files.

Parameters:
  • source_id – Source ID.

  • file – Path to a JPEG file or a file handle to a JPEG file opened as binary.

  • pts – Frame presentation timestamp.

  • framerate – Framerate (numerator, denominator).

  • updates – List of frame updates.

class savant_rs.py.client.image_source.image_source.PngSource(source_id: str, file: str | PathLike | BinaryIO, pts: int = 0, framerate: Tuple[int, int] = (30, 1), updates: List[VideoFrameUpdate] | None = None)

Frame source for PNG files.

Parameters:
  • source_id – Source ID.

  • file – Path to a PNG file or a file handle to a PNG file opened as binary.

  • pts – Frame presentation timestamp.

  • framerate – Framerate (numerator, denominator).

  • updates – List of frame updates.

Image header parse utility.

savant_rs.py.client.image_source.img_header_parse.get_image_size_codec(file: str | PathLike | BinaryIO) Tuple[int, int, str]

Get JPEG or PNG image width and height by parsing the file header.

Parameters:

file – Path to an image file or a file handle to an image file opened as binary.

Returns:

Image width, height and codec.

Runners

class savant_rs.py.client.runner.source.AsyncSourceRunner(socket: str, log_provider: LogProvider | None, retries: int, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, telemetry_enabled: bool, send_hwm: int = 50, receive_timeout: int = 5000)

Sends messages to ZeroMQ socket asynchronously.

async send(source: FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str], send_eos: bool = True) SourceResult

Send source data to ZeroMQ socket.

Parameters:
  • source – Source of the frame to send. Can be an instance of FrameSource or a tuple of VideoFrame and content, or a batch of frames with topic name. NB: If the source is used to send a stream of batches, the topic name must remain the same for all batches to maintain their order.

  • send_eos – Whether to send EOS after sending the source.

Returns:

Result of sending the source.

async send_eos(source_id: str) SourceResult

Send EOS for a source to ZeroMQ socket.

Parameters:

source_id – Source ID.

Returns:

Result of sending EOS.

async send_iter(sources: Iterable[FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str] | EndOfStream] | AsyncIterable[FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str] | EndOfStream], send_eos: bool = True) AsyncIterable[SourceResult]

Send multiple sources to ZeroMQ socket.

Parameters:
  • sources – Frame sources to send.

  • send_eos – Whether to send EOS after sending sources.

Returns:

Results of sending the sources.

async send_shutdown(zmq_topic: str, auth: str) SourceResult

Send Shutdown message for a source to ZeroMQ socket.

Parameters:
  • zmq_topic – ZeroMQ topic name.

  • auth – Authentication key.

Returns:

Result of sending Shutdown.

class savant_rs.py.client.runner.source.SourceResult(trace_id: str | None, log_provider: LogProvider | None, source_ids: Set[str], pts: int | None, status: str)

Result of sending a message to ZeroMQ socket.

pts: int | None

PTS of the frame.

source_ids: Set[str]

Source IDs. Single source ID for a frame, multiple source IDs for a batch.

status: str

Status of sending the message.

class savant_rs.py.client.runner.source.SourceRunner(socket: str, log_provider: LogProvider | None, retries: int, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, telemetry_enabled: bool, send_hwm: int = 50, receive_timeout: int = 5000)

Sends messages to ZeroMQ socket.

send(source: FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str], send_eos: bool = True) SourceResult

Send source data to ZeroMQ socket.

Parameters:
  • source – Source of the frame to send. Can be an instance of FrameSource or a tuple of VideoFrame and content, or a batch of frames with topic name. NB: If the source is used to send a stream of batches, the topic name must remain the same for all batches to maintain their order.

  • send_eos – Whether to send EOS after sending the source.

Returns:

Result of sending the source.

send_eos(source_id: str) SourceResult

Send EOS for a source to ZeroMQ socket.

Parameters:

source_id – Source ID.

Returns:

Result of sending EOS.

send_iter(sources: Iterable[FrameSource | Tuple[VideoFrame, bytes] | Tuple[VideoFrameBatch, str] | EndOfStream], send_eos: bool = True) Iterable[SourceResult]

Send multiple sources to ZeroMQ socket.

Parameters:
  • sources – Frame sources to send.

  • send_eos – Whether to send EOS after sending sources.

Returns:

Results of sending the sources.

send_shutdown(zmq_topic: str, auth: str) SourceResult

Send Shutdown message for a source to ZeroMQ socket.

Parameters:
  • zmq_topic – ZeroMQ topic name.

  • auth – Authentication key.

Returns:

Result of sending Shutdown.

class savant_rs.py.client.runner.sink.AsyncSinkRunner(socket: str, log_provider: LogProvider | None, idle_timeout: int | None, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, receive_timeout: int = 1000, receive_hwm: int = 50, source_id: str | None = None, source_id_prefix: str | None = None)

Receives messages from ZeroMQ socket asynchronously.

class savant_rs.py.client.runner.sink.BaseSinkRunner(socket: str, log_provider: LogProvider | None, idle_timeout: int | None, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, receive_timeout: int = 1000, receive_hwm: int = 50, source_id: str | None = None, source_id_prefix: str | None = None)
class savant_rs.py.client.runner.sink.SinkResult(trace_id: str | None, log_provider: LogProvider | None, frame_batch: VideoFrameBatch | None, frame_meta: VideoFrame | None, frame_content: bytes | None, eos: EndOfStream | None)

Result of receiving a message from ZeroMQ socket.

frame_batch, frame_meta+frame_content, and eos are mutually exclusive.

eos: EndOfStream | None

End of stream.

frame_batch: VideoFrameBatch | None

Video frame batch.

frame_content: bytes | None

Video frame content.

frame_meta: VideoFrame | None

Video frame metadata.

class savant_rs.py.client.runner.sink.SinkRunner(socket: str, log_provider: LogProvider | None, idle_timeout: int | None, module_health_check_url: str | None, module_health_check_timeout: float, module_health_check_interval: float, receive_timeout: int = 1000, receive_hwm: int = 50, source_id: str | None = None, source_id_prefix: str | None = None)

Receives messages from ZeroMQ socket.

class savant_rs.py.client.runner.healthcheck.HealthCheck(url: str, interval: float, timeout: float)

Service to check the health of the module.

Parameters:
  • url – URL of the health check endpoint.

  • interval – Interval between health checks in seconds.

  • timeout – Timeout for waiting the module to be ready in seconds.

async async_check() str | None
async async_wait_module_is_ready()

Wait until the module is ready. Async version.

check() str | None

Check the health of the module.

wait_module_is_ready()

Wait until the module is ready.

class savant_rs.py.client.runner.log_result.LogResult(trace_id: str | None, log_provider: savant_rs.py.client.log_provider.LogProvider | None)
log_provider: LogProvider | None

Log provider for to fetch the logs.

logs() Logs

Fetch logs from log provider for this result.

trace_id: str | None

OpenTelemetry trace ID of the message.

Log providers

class savant_rs.py.client.log_provider.LogEntry(timestamp: datetime, level: str, target: str, message: str, attributes: Dict[str, Any])

Single log entry.

attributes: Dict[str, Any]
level: str
message: str
pretty_format() str

Get pretty formatted string of log entry.

target: str
timestamp: datetime
class savant_rs.py.client.log_provider.LogProvider

Interface for log providers.

logs(trace_id: str) Logs

Fetch logs for given trace ID.

class savant_rs.py.client.log_provider.Logs(entries: List[LogEntry])

Collection of log entries for specific trace ID.

property entries: List[LogEntry]

Get list of log entries.

pretty_format() str

Get pretty formatted string of logs.

pretty_print()

Print pretty formatted logs.

class savant_rs.py.client.log_provider.jaeger.JaegerLogProvider(endpoint: str, login: str | None = None, password: str | None = None)

Log provider for Jaeger.

Parameters:
  • endpoint – Jaeger endpoint URL.

  • login – Jaeger login.

  • password – Jaeger password.

Utilities

Savant Client SDK utilities.

savant_rs.py.client.utils.resize_preserving_aspect(img: ndarray, target_size: Tuple[int, int]) ndarray

Resize image while preserving aspect ratio of the image contents.

Parameters:
  • img – Image to resize.

  • target_size – Target size (img_w, img_h).

Returns:

Resized image.

Image resizing utilities.

savant_rs.py.client.utils.img_resize.pad_to_aspect(img: ndarray, target_size: Tuple[int, int]) ndarray

Ensure that the image has the given aspect ratio by adding padding without resizing the image contents.

Parameters:
  • img – Image to pad.

  • target_size – Target size (img_w, img_h).

Returns:

Padded image.

savant_rs.py.client.utils.img_resize.resize_preserving_aspect(img: ndarray, target_size: Tuple[int, int]) ndarray

Resize image while preserving aspect ratio of the image contents.

Parameters:
  • img – Image to resize.

  • target_size – Target size (img_w, img_h).

Returns:

Resized image.