Configuration

The Meta Merge service is configured using a JSON configuration file that defines ingress sources, egress destination, Python handler initialization, merge callbacks, and queue settings.

Configuration File Structure

The configuration file has three main sections:

{
    "ingress": [ /* Ingress configurations */ ],
    "egress": { /* Egress configuration */ },
    "common": { /* Common service settings */ }
}

Ingress Configuration

The ingress section defines input sources for the Meta Merge service. Each ingress configuration specifies a ZeroMQ socket for receiving messages, an optional Python handler, and an EOS (end-of-stream) policy.

{
    "ingress": [
        {
            "name": "ingress1",
            "socket": {
                "url": "router+bind:tcp://127.0.0.1:6667",
                "options": {
                    "receive_timeout": {"secs": 1, "nanos": 0},
                    "receive_hwm": 1000,
                    "topic_prefix_spec": "none",
                    "source_cache_size": 1000,
                    "fix_ipc_permissions": 511,
                    "inflight_ops": 100
                }
            },
            "eos_policy": "allow"
        },
        {
            "name": "ingress2",
            "socket": {
                "url": "router+bind:tcp://127.0.0.1:6668"
            },
            "eos_policy": "deny"
        }
    ]
}

Ingress Parameters:

  • name (string): Unique identifier for the ingress source

  • socket (object): ZeroMQ socket configuration
    • url (string): ZeroMQ socket URL (supports router, rep, sub patterns)

    • options (object, optional): Socket-specific options
      • receive_timeout (duration): Timeout for receive operations

      • receive_hwm (integer): High water mark for incoming messages

      • topic_prefix_spec (string): Topic prefix specification

      • source_cache_size (integer): Size of source identifier cache

      • fix_ipc_permissions (integer): IPC socket permission mask

      • inflight_ops (integer): Maximum concurrent operations

  • handler (string, optional): Name of Python handler function for processing incoming messages

  • eos_policy (string, optional): Controls whether EOS messages from this ingress are forwarded. Values: "allow" or "deny"

Important

The eos_policy with value "allow" must be set on at most one ingress. If multiple ingress streams have eos_policy set to "allow", the service will reject the configuration because it could lead to multiple EOS messages being delivered to downstream services. Set it to "deny" on all other ingress streams that carry copies of the same source.

Egress Configuration

The egress section defines the single output destination for merged frames. Unlike the Router service, Meta Merge has a single egress.

{
    "egress": {
        "socket": {
            "url": "dealer+bind:tcp://127.0.0.1:3333"
        }
    }
}

Egress Parameters:

  • socket (object): ZeroMQ socket configuration
    • url (string): ZeroMQ socket URL for outgoing messages

Common Configuration

The common section contains service-wide settings including Python handler initialization, callbacks configuration, queue settings, and idle behavior.

{
    "common": {
        "init": {
            "python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
            "module_name": "module",
            "function_name": "init",
            "args": [
                {
                    "params": {
                        "home_dir": "${HOME}",
                        "user_name": "${USER}"
                    }
                }
            ]
        },
        "callbacks": {
            "on_merge": "merge_handler",
            "on_head_expire": "head_expired_handler",
            "on_head_ready": "head_ready_handler",
            "on_late_arrival": "late_arrival_handler",
            "on_unsupported_message": "unsupported_message_handler",
            "on_send": "send_handler"
        },
        "idle_sleep": {"secs": 0, "nanos": 1000},
        "queue": {
            "max_duration": {"secs": 5, "nanos": 0}
        }
    }
}

Init Parameters

  • init (object): Python handler initialization
    • python_root (string): Path to Python modules directory

    • module_name (string): Python module name to import

    • function_name (string): Initialization function name

    • args (object, optional): Arguments passed to initialization function

Callbacks Parameters

  • callbacks (object): Python callback handler names
    • on_merge (string, required): Handler called when a frame copy arrives for merging

    • on_head_expire (string, required): Handler called when the head frame expires

    • on_head_ready (string, required): Handler called when the head frame is marked ready

    • on_late_arrival (string, required): Handler called when a frame arrives after the queue head

    • on_unsupported_message (string, optional): Handler called for non-video, non-EOS messages

    • on_send (string, optional): Handler called before sending a message, can override the topic

Queue Parameters

  • queue (object, optional): Merge queue configuration
    • max_duration (duration): Maximum time a frame can wait in the queue before being expired. Default: 5 seconds

Other Parameters

  • idle_sleep (duration, optional): Sleep duration when no messages are available. Default: 1ms

Environment Variable Substitution

Configuration values support environment variable substitution using the format ${VAR_NAME:-default_value}:

{
    "common": {
        "init": {
            "python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
            "args": [
                {
                    "params": {
                        "home_dir": "${HOME}",
                        "user_name": "${USER}"
                    }
                }
            ]
        }
    }
}

Python Handler Development

Python handlers are callable objects registered during initialization. Below is a complete example of a Python handler module for Meta Merge:

from typing import Any, Optional
from savant_rs import register_handler, version
from savant_rs.logging import log, LogLevel
from savant_rs.primitives import VideoFrame
from savant_rs.utils.serialization import Message


class EgressItem:
    """Type hint interface for the EgressItem passed to callbacks.

    Attributes:
        video_frame: The video frame being merged.
        state: A dictionary for accumulating merge state across callbacks.
        data: List of binary data payloads.
        labels: List of string labels.
    """
    @property
    def video_frame(self) -> VideoFrame: ...
    @video_frame.setter
    def video_frame(self, video_frame: VideoFrame): ...
    @property
    def state(self) -> dict[str, Any]: ...
    @state.setter
    def state(self, state: dict[str, Any]): ...
    @property
    def data(self) -> list[bytes]: ...
    @data.setter
    def data(self, data: list[bytes]): ...
    @property
    def labels(self) -> list[str]: ...
    @labels.setter
    def labels(self, labels: list[str]): ...


class MergeHandler:
    def __call__(
        self,
        ingress_name: str,
        topic: str,
        current_state: EgressItem,
        incoming_state: Optional[EgressItem],
    ) -> bool:
        """Called when a frame copy arrives for merging.

        :param ingress_name: Name of the ingress that received the message.
        :param topic: ZMQ topic of the message.
        :param current_state: Current state of the egress item in the queue.
        :param incoming_state: Incoming copy of the frame, or None for the
            first arrival (which automatically becomes current_state).
        :return: True if the merge is complete and the frame should be sent.
        """
        if incoming_state is not None:
            # Merge attributes from the incoming frame
            pass
        return False


class HeadExpiredHandler:
    def __call__(self, state: EgressItem) -> Optional[Message]:
        """Called when the head of the queue has expired (exceeded max_duration).

        :param state: The expired egress item.
        :return: A Message to send downstream, or None to drop the frame.
        """
        return None


class HeadReadyHandler:
    def __call__(self, state: EgressItem) -> Optional[Message]:
        """Called when the head of the queue is marked as ready.

        :param state: The ready egress item.
        :return: A Message to send downstream, or None to drop the frame.
        """
        return None


class LateArrivalHandler:
    def __call__(self, state: EgressItem):
        """Called when a frame arrives after the queue head has advanced.

        :param state: The late-arriving egress item.
        """
        pass


class UnsupportedMessageHandler:
    def __call__(
        self,
        ingress_name: str,
        topic: str,
        message: Message,
        data: list[bytes],
    ):
        """Called for messages that are neither video frames nor EOS.

        :param ingress_name: Name of the ingress.
        :param topic: ZMQ topic of the message.
        :param message: The unsupported message object.
        :param data: Data payloads.
        """
        pass


class SendHandler:
    def __call__(
        self,
        message: Message,
        message_state: Optional[dict[Any, Any]],
        data: list[bytes],
        labels: list[str],
    ) -> Optional[str]:
        """Called before sending a message to the egress.

        :param message: The message to send.
        :param message_state: The state dictionary accumulated during merging.
        :param data: Data payloads.
        :param labels: Labels.
        :return: Optional topic string to override the default (source ID),
            or None to use the default.
        """
        return None


def init(params: Any):
    """Initialization function called once at service startup.

    Register all callback handlers here.
    """
    register_handler("merge_handler", MergeHandler())
    register_handler("head_expired_handler", HeadExpiredHandler())
    register_handler("head_ready_handler", HeadReadyHandler())
    register_handler("late_arrival_handler", LateArrivalHandler())
    register_handler("unsupported_message_handler", UnsupportedMessageHandler())
    register_handler("send_handler", SendHandler())
    return True

The EgressItem Object

The EgressItem object passed to callbacks provides the following properties:

  • video_frame (VideoFrame): The video frame being merged. Supports getting and setting.

  • state (dict): A dictionary for accumulating merge state across callbacks. Persist counters, flags, or any other state between on_merge calls here.

  • data (list[bytes]): Binary data payloads associated with the frame.

  • labels (list[str]): String labels associated with the frame.

All properties support both getting and setting, so handlers can modify the frame, state, data, and labels in-place.

Complete Configuration Example

Below is a complete configuration example with two ingress streams and all callback handlers:

{
    "ingress": [
        {
            "name": "detection_pipeline",
            "socket": {
                "url": "router+bind:tcp://0.0.0.0:6667",
                "options": {
                    "receive_timeout": {"secs": 1, "nanos": 0},
                    "receive_hwm": 1000,
                    "topic_prefix_spec": "none",
                    "source_cache_size": 1000,
                    "inflight_ops": 100
                }
            },
            "eos_policy": "allow"
        },
        {
            "name": "classification_pipeline",
            "socket": {
                "url": "router+bind:tcp://0.0.0.0:6668",
                "options": {
                    "receive_timeout": {"secs": 1, "nanos": 0},
                    "receive_hwm": 1000,
                    "topic_prefix_spec": "none",
                    "source_cache_size": 1000,
                    "inflight_ops": 100
                }
            },
            "eos_policy": "deny"
        }
    ],
    "egress": {
        "socket": {
            "url": "dealer+bind:tcp://0.0.0.0:3333"
        }
    },
    "common": {
        "init": {
            "python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
            "module_name": "module",
            "function_name": "init",
            "args": null
        },
        "callbacks": {
            "on_merge": "merge_handler",
            "on_head_expire": "head_expired_handler",
            "on_head_ready": "head_ready_handler",
            "on_late_arrival": "late_arrival_handler",
            "on_unsupported_message": "unsupported_message_handler",
            "on_send": "send_handler"
        },
        "idle_sleep": {"secs": 0, "nanos": 1000000},
        "queue": {
            "max_duration": {"secs": 5, "nanos": 0}
        }
    }
}

This configuration:

  1. Accepts frames from two pipelines: detection_pipeline on port 6667 and classification_pipeline on port 6668.

  2. Only the detection_pipeline ingress forwards EOS messages (eos_policy: "allow"), while the classification_pipeline suppresses them (eos_policy: "deny").

  3. Merged frames are sent to the egress on port 3333.

  4. Python handlers are loaded from the /opt/python directory (configurable via the PYTHON_MODULE_ROOT environment variable).

  5. Frames that are not marked ready within 5 seconds are expired.

  6. The service sleeps for 1ms when idle (no incoming messages).