Configuration

The Router service is configured using a JSON configuration file that defines ingress sources, egress destinations, routing rules, and Python handler specifications.

Configuration File Structure

The configuration file has three main sections:

{
    "ingres": [ /* Ingress configurations */ ],
    "egress": [ /* Egress configurations */ ],
    "common": { /* Common service settings */ }
}

Ingress Configuration

The ingres section defines input sources for the Router service. Each ingress configuration specifies a ZeroMQ socket for receiving messages and an optional Python handler for processing.

{
    "ingres": [
        {
            "name": "ingress1",
            "socket": {
                "url": "router+bind:tcp://127.0.0.1:6667",
                "options": {
                    "receive_timeout": {"secs": 1, "nanos": 0},
                    "receive_hwm": 1000,
                    "topic_prefix_spec": "none",
                    "source_cache_size": 1000,
                    "fix_ipc_permissions": 511,
                    "inflight_ops": 100
                }
            },
            "handler": "ingress_handler"
        }
    ]
}

Ingress Parameters:

  • name (string): Unique identifier for the ingress source

  • socket (object): ZeroMQ socket configuration
    • url (string): ZeroMQ socket URL (supports router, rep, sub patterns)

    • options (object): Socket-specific options
      • receive_timeout (duration): Timeout for receive operations

      • receive_hwm (integer): High water mark for incoming messages

      • topic_prefix_spec (string): Topic prefix specification

      • source_cache_size (integer): Size of source identifier cache

      • fix_ipc_permissions (integer): IPC socket permission mask

      • inflight_ops (integer): Maximum concurrent operations

  • handler (string, optional): Name of Python handler function for processing incoming messages

Egress Configuration

The egress section defines output destinations for routed messages. Each egress can have matching conditions, source/topic mappers, and backpressure control.

{
    "egress": [
        {
            "name": "egress1",
            "socket": {
                "url": "dealer+bind:tcp://127.0.0.1:3333"
            },
            "high_watermark": 0.9,
            "matcher": "[label1] & [label2]",
            "source_mapper": "egress_source_handler",
            "topic_mapper": "egress_topic_handler"
        },
        {
            "name": "egress2",
            "socket": {
                "url": "dealer+bind:tcp://127.0.0.1:3334"
            },
            "high_watermark": 0.9,
            "matcher": "[label1] & ([label3] | [label2])"
        }
    ]
}

Egress Parameters:

  • name (string): Unique identifier for the egress destination

  • socket (object): ZeroMQ socket configuration
    • url (string): ZeroMQ socket URL for outgoing messages

  • high_watermark (float, optional): Backpressure threshold (0.0-1.0), default: 0.9

  • matcher (string, optional): Boolean expression for message label matching

  • source_mapper (string, optional): Name of Python handler for source ID transformation

  • topic_mapper (string, optional): Name of Python handler for topic transformation

Matching Expressions

The matcher field supports boolean logic with message labels:

  • [label_name]: Check if label exists

  • &: Logical AND

  • |: Logical OR

  • (): Grouping for precedence

Examples:

  • "[vehicle]": Messages with “vehicle” label

  • "[person] & [alert]": Messages with both “person” and “alert” labels

  • "[vehicle] | [person]": Messages with either “vehicle” or “person” labels

  • "[urgent] & ([person] | [vehicle])": Complex conditions with grouping

Common Configuration

The common section contains service-wide settings including Python handler initialization and performance tuning.

{
    "common": {
        "init": {
            "python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
            "module_name": "handlers",
            "function_name": "init",
            "args": {
                "config_param": "value"
            }
        },
        "name_cache": {
            "ttl": {"secs": 10, "nanos": 0},
            "size": 1000
        },
        "idle_sleep": {"secs": 0, "nanos": 1000000}
    }
}

Common Parameters:

  • init (object): Python handler initialization
    • python_root (string): Path to Python modules directory

    • module_name (string): Python module name to import

    • function_name (string): Initialization function name

    • args (object, optional): Arguments passed to initialization function

  • name_cache (object, optional): Label caching configuration
    • ttl (duration): Time-to-live for cache entries, default: 1 second

    • size (integer): Maximum cache size, default: 1000

  • idle_sleep (duration, optional): Sleep duration when no messages, default: 1ms

Environment Variable Substitution

Configuration values support environment variable substitution using the format ${VAR_NAME:-default_value}:

{
    "common": {
        "init": {
            "python_root": "${PYTHON_MODULE_ROOT:-/opt/python}",
            "args": {
                "home_dir": "${HOME}",
                "user_name": "${USER}"
            }
        }
    }
}

Python Handler Development

Python handlers are functions that process messages at different stages of routing. Create a Python module with handler classes:

from savant_rs import register_handler
from savant_rs.utils.serialization import Message

class IngressHandler:
    def __call__(self, message_id: int, ingress_name: str,
                 topic: str, message: Message):
        # Process incoming message
        if topic == "detection":
            message.labels = ["processed", "vehicle"]
        return message

class EgressSourceHandler:
    def __call__(self, message_id: int, egress_name: str,
                 source: str, labels: list[str]):
        # Transform source ID based on routing logic
        return f"transformed_{source}"

class EgressTopicHandler:
    def __call__(self, message_id: int, egress_name: str,
                 topic: str, labels: list[str]):
        # Transform topic for specific egress
        return f"processed_{topic}"

def init(params):
    """Called once during service initialization"""
    register_handler("ingress_handler", IngressHandler())
    register_handler("egress_source_handler", EgressSourceHandler())
    register_handler("egress_topic_handler", EgressTopicHandler())
    return True