Configuration
The Buffer NG service uses JSON configuration files to define its behavior. The configuration is divided into several sections: ingress, egress, and common settings.
Configuration Structure
The configuration file has the following structure:
{
"ingress": {
"socket": {
"url": "tcp://source:5555",
"options": { ... }
}
},
"egress": {
"socket": {
"url": "tcp://sink:5556",
"options": { ... }
}
},
"common": {
"message_handler_init": { ... },
"telemetry": { ... },
"buffer": { ... },
"idle_sleep": { ... }
}
}
Ingress Configuration
The ingress section defines how the service receives messages from upstream sources.
Socket Configuration
"ingress": {
"socket": {
"url": "router+bind:tcp://127.0.0.1:6666",
"options": {
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_hwm": 1000,
"topic_prefix_spec": "none",
"source_cache_size": 1000,
"fix_ipc_permissions": 511,
"inflight_ops": 100
}
}
}
Parameters:
url
: ZeroMQ socket URL for receiving messages. Example patterns: -router+bind:tcp://host:port
- Router socket binding to TCP. -dealer+connect:tcp://host:port
- Dealer socket connecting to TCP -sub+bind:tcp://host:port
- Subscriber socket binding to TCP. -sub+connect:ipc:///path/to/socket
- Subscriber socket connecting to IPC.receive_timeout
: Timeout for receiving messages (in seconds and nanoseconds).receive_hwm
: High water mark for receiving messages (queue size limit).topic_prefix_spec
: Topic prefix specification (“none” for no prefix).source_cache_size
: Size of the source cache for connection management.fix_ipc_permissions
: IPC socket permissions (Unix-style octal).inflight_ops
: Number of in-flight operations for connection management.
Egress Configuration
The egress section defines how the service sends messages to downstream destinations.
Socket Configuration
"egress": {
"socket": {
"url": "dealer+bind:tcp://127.0.0.1:6667",
"options": {
"send_timeout": {
"secs": 1,
"nanos": 0
},
"send_retries": 3,
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_retries": 3,
"send_hwm": 1000,
"receive_hwm": 1000,
"inflight_ops": 100
}
}
}
Parameters:
url
: ZeroMQ socket URL for sending messages. Example patterns: -dealer+bind:tcp://host:port
- Dealer socket binding to TCP. -push+connect:tcp://host:port
- Push socket connecting to TCP. -pub+bind:tcp://host:port
- Publisher socket binding to TCP. -pub+connect:ipc:///path/to/socket
- Push socket binding to IPC.send_timeout
: Timeout for sending messages (in seconds and nanoseconds).send_retries
: Number of retries for failed send operations.receive_timeout
: Timeout for receiving acknowledgments (in seconds and nanoseconds).receive_retries
: Number of retries for failed receive operations.send_hwm
: High water mark for sending messages (queue size limit).receive_hwm
: High water mark for receiving acknowledgments (queue size limit).inflight_ops
: Number of in-flight operations for connection management.
Common Configuration
The common section contains settings that apply to the entire service.
Buffer Configuration
"buffer": {
"path": "/tmp/buffer",
"max_length": 1000000,
"full_threshold_percentage": 90,
"reset_on_start": true
}
Parameters:
path
: Path to the RocksDB database directory for storing buffered messagesmax_length
: Maximum number of messages that can be stored in the bufferfull_threshold_percentage
: Percentage threshold (0-100) at which the buffer is considered “full” for monitoring purposesreset_on_start
: Whether to clear the buffer when the service starts (true) or preserve existing data (false)
Message Handler Configuration
"message_handler_init": {
"python_root": "/opt/python",
"module_name": "module",
"function_name": "init",
"args": [
{
"params": {
"home_dir": "/home/user",
"user_name": "user"
}
}
],
"invocation_context": "AfterReceive"
}
Parameters:
python_root
: Root directory for Python modulesmodule_name
: Name of the Python module to importfunction_name
: Name of the function to call for initializationargs
: Arguments to pass to the initialization function (optional)invocation_context
: When to invoke the handler (“AfterReceive” or “BeforeSend”)
Telemetry Configuration
"telemetry": {
"port": 8080,
"stats_log_interval": {
"secs": 60,
"nanos": 0
},
"metrics_extra_labels": null
}
Parameters:
port
: Port number for the web-based telemetry interfacestats_log_interval
: Interval for logging statistics (in seconds and nanoseconds)metrics_extra_labels
: Additional labels to include in metrics (optional)
Idle Sleep Configuration
"idle_sleep": {
"secs": 0,
"nanos": 1000
}
Parameters:
secs
: Seconds component of the idle sleep durationnanos
: Nanoseconds component of the idle sleep duration
Environment Variable Substitution
The configuration file supports environment variable substitution using the ${VARIABLE_NAME:-default_value} syntax:
{
"ingress": {
"socket": {
"url": "${ZMQ_SRC_ENDPOINT}"
}
},
"egress": {
"socket": {
"url": "${ZMQ_SINK_ENDPOINT}"
}
},
"common": {
"buffer": {
"path": "${BUFFER_PATH:-/tmp/buffer}",
"max_length": ${BUFFER_LEN:-1000000},
"full_threshold_percentage": ${BUFFER_THRESHOLD_PERCENTAGE:-90},
"reset_on_start": ${BUFFER_RESET_ON_RESTART:-true}
},
"telemetry": {
"stats_log_interval": {
"secs": ${STATS_LOG_INTERVAL:-60},
"nanos": 0
},
"metrics_extra_labels": ${METRICS_EXTRA_LABELS:-null}
}
}
}
Python Handler Development
Buffer NG supports Python handlers for custom message processing. The handler should implement the following interface:
def init(params: Any) -> Callable:
"""
Initialize the message handler.
:param params: Configuration parameters passed from the service
:return: Message handler function or None
"""
return MessageHandler()
# or
return None # to ignore python handler
class MessageHandler:
def __call__(self, topic: str, message: Message) -> (str, Message):
"""
Process a message.
:param topic: ZMQ topic of the message
:param message: Message object to process
:return: Tuple of (topic, message) or None to drop the message
"""
# Custom processing logic here
return topic, message
# or
return None # to drop the message
The handler can be invoked at two points:
AfterReceive: Called after receiving a message from ingress, before storing in buffer
BeforeSend: Called after retrieving a message from buffer, before sending to egress
Configuration Examples
Basic Configuration
{
"ingress": {
"socket": {
"url": "router+bind:tcp://127.0.0.1:6666"
}
},
"egress": {
"socket": {
"url": "dealer+bind:tcp://127.0.0.1:6667"
}
},
"common": {
"buffer": {
"path": "/tmp/buffer",
"max_length": 1000000,
"full_threshold_percentage": 90,
"reset_on_start": true
},
"telemetry": {
"port": 8080
}
}
}
Full Configuration with Python Handlers
{
"ingress": {
"socket": {
"url": "router+bind:tcp://127.0.0.1:6666",
"options": {
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_hwm": 1000,
"topic_prefix_spec": "none",
"source_cache_size": 1000,
"fix_ipc_permissions": 511,
"inflight_ops": 100
}
}
},
"egress": {
"socket": {
"url": "dealer+bind:tcp://127.0.0.1:6667",
"options": {
"send_timeout": {
"secs": 1,
"nanos": 0
},
"send_retries": 3,
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_retries": 3,
"send_hwm": 1000,
"receive_hwm": 1000,
"inflight_ops": 100
}
}
},
"common": {
"idle_sleep": {
"secs": 0,
"nanos": 1000
},
"message_handler_init": {
"python_root": "/opt/python",
"module_name": "module",
"function_name": "init",
"args": [
{
"params": {
"home_dir": "/home/user",
"user_name": "user"
}
}
],
"invocation_context": "AfterReceive"
},
"telemetry": {
"port": 8080,
"stats_log_interval": {
"secs": 60,
"nanos": 0
},
"metrics_extra_labels": null
},
"buffer": {
"path": "/tmp/buffer",
"max_length": 1000000,
"full_threshold_percentage": 90,
"reset_on_start": true
}
}
}