Configuration
The Meta Merge service is configured using a JSON configuration file that defines ingress sources, egress destination, Python handler initialization, merge callbacks, and queue settings.
Configuration File Structure
The configuration file has three main sections:
{
"ingress": [ /* Ingress configurations */ ],
"egress": { /* Egress configuration */ },
"common": { /* Common service settings */ }
}
Ingress Configuration
The ingress section defines input sources for the Meta Merge service. Each ingress configuration specifies a ZeroMQ socket for receiving messages, an optional Python handler, and an EOS (end-of-stream) policy.
{
"ingress": [
{
"name": "ingress1",
"socket": {
"url": "router+bind:tcp://127.0.0.1:6667",
"options": {
"receive_timeout": {"secs": 1, "nanos": 0},
"receive_hwm": 1000,
"topic_prefix_spec": "none",
"source_cache_size": 1000,
"fix_ipc_permissions": 511,
"inflight_ops": 100
}
},
"eos_policy": "allow"
},
{
"name": "ingress2",
"socket": {
"url": "router+bind:tcp://127.0.0.1:6668"
},
"eos_policy": "deny"
}
]
}
Ingress Parameters:
name (string): Unique identifier for the ingress source
- socket (object): ZeroMQ socket configuration
url (string): ZeroMQ socket URL (supports router, rep, sub patterns)
- options (object, optional): Socket-specific options
receive_timeout (duration): Timeout for receive operations
receive_hwm (integer): High water mark for incoming messages
topic_prefix_spec (string): Topic prefix specification
source_cache_size (integer): Size of source identifier cache
fix_ipc_permissions (integer): IPC socket permission mask
inflight_ops (integer): Maximum concurrent operations
handler (string, optional): Name of Python handler function for processing incoming messages
eos_policy (string, optional): Controls whether EOS messages from this ingress are forwarded. Values:
"allow"or"deny"
Important
The eos_policy with value "allow" must be set on at most one ingress. If multiple ingress streams have eos_policy set to "allow", the service will reject the configuration because it could lead to multiple EOS messages being delivered to downstream services. Set it to "deny" on all other ingress streams that carry copies of the same source.
Egress Configuration
The egress section defines the single output destination for merged frames. Unlike the Router service, Meta Merge has a single egress.
{
"egress": {
"socket": {
"url": "dealer+bind:tcp://127.0.0.1:3333"
}
}
}
Egress Parameters:
- socket (object): ZeroMQ socket configuration
url (string): ZeroMQ socket URL for outgoing messages
Common Configuration
The common section contains service-wide settings including Python handler initialization, callbacks configuration, queue settings, and idle behavior.
{
"common": {
"init": {
"python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
"module_name": "module",
"function_name": "init",
"args": [
{
"params": {
"home_dir": "${HOME}",
"user_name": "${USER}"
}
}
]
},
"callbacks": {
"on_merge": "merge_handler",
"on_head_expire": "head_expired_handler",
"on_head_ready": "head_ready_handler",
"on_late_arrival": "late_arrival_handler",
"on_unsupported_message": "unsupported_message_handler",
"on_send": "send_handler"
},
"idle_sleep": {"secs": 0, "nanos": 1000},
"queue": {
"max_duration": {"secs": 5, "nanos": 0}
}
}
}
Init Parameters
- init (object): Python handler initialization
python_root (string): Path to Python modules directory
module_name (string): Python module name to import
function_name (string): Initialization function name
args (object, optional): Arguments passed to initialization function
Callbacks Parameters
- callbacks (object): Python callback handler names
on_merge (string, required): Handler called when a frame copy arrives for merging
on_head_expire (string, required): Handler called when the head frame expires
on_head_ready (string, required): Handler called when the head frame is marked ready
on_late_arrival (string, required): Handler called when a frame arrives after the queue head
on_unsupported_message (string, optional): Handler called for non-video, non-EOS messages
on_send (string, optional): Handler called before sending a message, can override the topic
Queue Parameters
- queue (object, optional): Merge queue configuration
max_duration (duration): Maximum time a frame can wait in the queue before being expired. Default: 5 seconds
Other Parameters
idle_sleep (duration, optional): Sleep duration when no messages are available. Default: 1ms
Environment Variable Substitution
Configuration values support environment variable substitution using the format ${VAR_NAME:-default_value}:
{
"common": {
"init": {
"python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
"args": [
{
"params": {
"home_dir": "${HOME}",
"user_name": "${USER}"
}
}
]
}
}
}
Python Handler Development
Python handlers are callable objects registered during initialization. Below is a complete example of a Python handler module for Meta Merge:
from typing import Any, Optional
from savant_rs import register_handler, version
from savant_rs.logging import log, LogLevel
from savant_rs.primitives import VideoFrame
from savant_rs.utils.serialization import Message
class EgressItem:
"""Type hint interface for the EgressItem passed to callbacks.
Attributes:
video_frame: The video frame being merged.
state: A dictionary for accumulating merge state across callbacks.
data: List of binary data payloads.
labels: List of string labels.
"""
@property
def video_frame(self) -> VideoFrame: ...
@video_frame.setter
def video_frame(self, video_frame: VideoFrame): ...
@property
def state(self) -> dict[str, Any]: ...
@state.setter
def state(self, state: dict[str, Any]): ...
@property
def data(self) -> list[bytes]: ...
@data.setter
def data(self, data: list[bytes]): ...
@property
def labels(self) -> list[str]: ...
@labels.setter
def labels(self, labels: list[str]): ...
class MergeHandler:
def __call__(
self,
ingress_name: str,
topic: str,
current_state: EgressItem,
incoming_state: Optional[EgressItem],
) -> bool:
"""Called when a frame copy arrives for merging.
:param ingress_name: Name of the ingress that received the message.
:param topic: ZMQ topic of the message.
:param current_state: Current state of the egress item in the queue.
:param incoming_state: Incoming copy of the frame, or None for the
first arrival (which automatically becomes current_state).
:return: True if the merge is complete and the frame should be sent.
"""
if incoming_state is not None:
# Merge attributes from the incoming frame
pass
return False
class HeadExpiredHandler:
def __call__(self, state: EgressItem) -> Optional[Message]:
"""Called when the head of the queue has expired (exceeded max_duration).
:param state: The expired egress item.
:return: A Message to send downstream, or None to drop the frame.
"""
return None
class HeadReadyHandler:
def __call__(self, state: EgressItem) -> Optional[Message]:
"""Called when the head of the queue is marked as ready.
:param state: The ready egress item.
:return: A Message to send downstream, or None to drop the frame.
"""
return None
class LateArrivalHandler:
def __call__(self, state: EgressItem):
"""Called when a frame arrives after the queue head has advanced.
:param state: The late-arriving egress item.
"""
pass
class UnsupportedMessageHandler:
def __call__(
self,
ingress_name: str,
topic: str,
message: Message,
data: list[bytes],
):
"""Called for messages that are neither video frames nor EOS.
:param ingress_name: Name of the ingress.
:param topic: ZMQ topic of the message.
:param message: The unsupported message object.
:param data: Data payloads.
"""
pass
class SendHandler:
def __call__(
self,
message: Message,
message_state: Optional[dict[Any, Any]],
data: list[bytes],
labels: list[str],
) -> Optional[str]:
"""Called before sending a message to the egress.
:param message: The message to send.
:param message_state: The state dictionary accumulated during merging.
:param data: Data payloads.
:param labels: Labels.
:return: Optional topic string to override the default (source ID),
or None to use the default.
"""
return None
def init(params: Any):
"""Initialization function called once at service startup.
Register all callback handlers here.
"""
register_handler("merge_handler", MergeHandler())
register_handler("head_expired_handler", HeadExpiredHandler())
register_handler("head_ready_handler", HeadReadyHandler())
register_handler("late_arrival_handler", LateArrivalHandler())
register_handler("unsupported_message_handler", UnsupportedMessageHandler())
register_handler("send_handler", SendHandler())
return True
The EgressItem Object
The EgressItem object passed to callbacks provides the following properties:
video_frame (
VideoFrame): The video frame being merged. Supports getting and setting.state (
dict): A dictionary for accumulating merge state across callbacks. Persist counters, flags, or any other state betweenon_mergecalls here.data (
list[bytes]): Binary data payloads associated with the frame.labels (
list[str]): String labels associated with the frame.
All properties support both getting and setting, so handlers can modify the frame, state, data, and labels in-place.
Complete Configuration Example
Below is a complete configuration example with two ingress streams and all callback handlers:
{
"ingress": [
{
"name": "detection_pipeline",
"socket": {
"url": "router+bind:tcp://0.0.0.0:6667",
"options": {
"receive_timeout": {"secs": 1, "nanos": 0},
"receive_hwm": 1000,
"topic_prefix_spec": "none",
"source_cache_size": 1000,
"inflight_ops": 100
}
},
"eos_policy": "allow"
},
{
"name": "classification_pipeline",
"socket": {
"url": "router+bind:tcp://0.0.0.0:6668",
"options": {
"receive_timeout": {"secs": 1, "nanos": 0},
"receive_hwm": 1000,
"topic_prefix_spec": "none",
"source_cache_size": 1000,
"inflight_ops": 100
}
},
"eos_policy": "deny"
}
],
"egress": {
"socket": {
"url": "dealer+bind:tcp://0.0.0.0:3333"
}
},
"common": {
"init": {
"python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
"module_name": "module",
"function_name": "init",
"args": null
},
"callbacks": {
"on_merge": "merge_handler",
"on_head_expire": "head_expired_handler",
"on_head_ready": "head_ready_handler",
"on_late_arrival": "late_arrival_handler",
"on_unsupported_message": "unsupported_message_handler",
"on_send": "send_handler"
},
"idle_sleep": {"secs": 0, "nanos": 1000000},
"queue": {
"max_duration": {"secs": 5, "nanos": 0}
}
}
}
This configuration:
Accepts frames from two pipelines:
detection_pipelineon port 6667 andclassification_pipelineon port 6668.Only the
detection_pipelineingress forwards EOS messages (eos_policy: "allow"), while theclassification_pipelinesuppresses them (eos_policy: "deny").Merged frames are sent to the egress on port 3333.
Python handlers are loaded from the
/opt/pythondirectory (configurable via thePYTHON_MODULE_ROOTenvironment variable).Frames that are not marked ready within 5 seconds are expired.
The service sleeps for 1ms when idle (no incoming messages).