savant_rs.zmq

class savant_rs.zmq.BlockingReader(config)

Blocking Reader with GIL release on long-lasting receive operations.

Parameters:

config (ReaderConfig) – Reader configuration.

blacklist_source(source_id)

Blacklists source

Parameters:

source_id (bytes) – Source ID to blacklist.

is_blacklisted(source_id)

Returns true if the source is blacklisted.

Parameters:

source_id (bytes) – Source ID to check.

Returns:

true if the source is blacklisted.

Return type:

bool

is_started()

Returns true if the reader is started.

receive()

Receives a message. Blocks until a message is received. Releases GIL while waiting for the result.

Returns:

Raises:

RuntimeError – When the reader receives an error. Generally means that the reader is no longer usable and should be shutdown.

shutdown()

Shuts down the reader. If the reader is not started, returns an error.

start()

Starts the reader. If the reader is already started, returns an error.

class savant_rs.zmq.BlockingWriter(config)

Blocking Writer with GIL release on long-lasting send_* operations.

Parameters:

config (WriterConfig) – Writer configuration.

is_started()

Returns true if the writer is started.

send_eos(topic)

Sends EOS to the specified topic. If the writer is not started, returns an error. Releases GIL while waiting for the result.

Parameters:

topic (str) – Topic to send EOS to.

Returns:

Raises:

RuntimeError – When underlying ZeroMQ writer fails and no longer functional. Usually means that the writer must be restarted.

send_message(topic, message, extra)

Sends a message to the specified topic. If the writer is not started, returns an error. Releases GIL while waiting for the result.

Parameters:
Returns:

Raises:

RuntimeError – When underlying ZeroMQ writer fails and no longer functional. Usually means that the writer must be restarted.

shutdown()

Shuts down the writer. If the writer is not started, returns an error.

start()

Starts the writer. If the writer is already started, returns an error.

class savant_rs.zmq.NonBlockingReader(config, results_queue_size)

A non-blocking reader. Does not release GIL when uses receive convenience method, which is blocking. For non-blocking operations use try_receive.

Parameters:
  • config (ReaderConfig) – Reader configuration.

  • results_queue_size (int) – Size of the queue for storing results. If the queue is full, the reader’s internal operation will block. and depending on the socket type can cause either drop or backpressure. The user can use enqueued_results to check the number of enqueued results and read them either with receive or try_receive.

blacklist_source(source_id)

Blacklists source

Parameters:

source_id (bytes) – Source ID to blacklist.

enqueued_results()

Returns the number of enqueued results. Those results require fetching to avoid the reader blocking.

is_blacklisted(source_id)

Returns true if the source is blacklisted.

Parameters:

source_id (bytes) – Source ID to check.

Returns:

true if the source is blacklisted.

Return type:

bool

is_shutdown()

Returns true if the reader is shutdown.

is_started()

Returns true if the reader is started.

receive()

Receives a message. Blocks until a message is received. Does not release GIL. This is a convenience method which normally should not be used with such a reader. For non-blocking operations use try_receive.

Returns:

Raises:

RuntimeError – When the reader receives an error. Generally means that the reader is no longer usable and should be shutdown.

shutdown()

Shuts down the reader. If the reader is already shutdown, returns an error.

start()

Starts the reader. If the reader is already started, returns an error.

try_receive()
class savant_rs.zmq.NonBlockingWriter(config, max_infight_messages)

A non-blocking writer. Sends the message to the internal command queue with returning a Future-like object WriteOperationResult. The user can use get or try_get to get the result of the operation.

Parameters:
  • config (WriterConfig) – Writer configuration.

  • max_infight_messages (int) – Maximum number of inflight messages. If the number of inflight messages is equal to this value, the writer’s internal operation will block.

has_capacity()

Returns true if the writer has capacity to send more messages.

inflight_messages()

Returns the number of inflight messages.

is_shutdown()

Returns true if the writer is shutdown.

is_started()

Returns true if the writer is started.

send_eos(topic)

Sends EOS to the specified topic.

Parameters:

topic (str) – Topic to send EOS to.

Returns:

Write operation result - a future-like object which can be used to get the result of the operation.

Return type:

WriteOperationResult

Raises:

RuntimeError – When the writer receives an error. Generally means that the writer is no longer usable and should be shutdown.

send_message(topic, message, extra)

Sends a message to the specified topic.

Parameters:
Returns:

Write operation result - a future-like object which can be used to get the result of the operation.

Return type:

WriteOperationResult

Raises:

RuntimeError – When the writer receives an error. Generally means that the writer is no longer usable and should be shutdown.

shutdown()

Shuts down the writer. If the writer is already shutdown, returns an error.

start()

Starts the writer. If the writer is already started, returns an error.

class savant_rs.zmq.ReaderConfig

A reader configuration

bind
endpoint
fix_ipc_permissions
receive_hwm
receive_timeout
routing_cache_size
socket_type
source_blacklist_size
source_blacklist_ttl
topic_prefix_spec
class savant_rs.zmq.ReaderConfigBuilder(url)

Creates a new configuration builder based on the provided URL. The URL can have the following formats:

  • tcp://1.2.3.4:5678

  • ipc:///tmp/test

  • (sub|rep|router)+(bind|connect):(tcp|ipc)://...

Parameters:

url (str) – The URL to use

Return type:

ReaderConfigBuilder

Raises:

ValueError – If the URL is invalid

build()

Builds the configuration

Returns:

The built configuration

Return type:

ReaderConfig

Raises:

ValueError – If the configuration is invalid

with_bind(bind)

Specifies either socket binds or connects

Parameters:

bind (bool) – If True, the socket will bind, otherwise it will connect. Defaults to True

with_fix_ipc_permissions(permissions=None)

Fixes permissions to specified access mask for a domain unix socket. It may be required for sockets mapped to host volumes

Parameters:

fix_ipc_permissions (int) – The access mask to set, defaults to 0o777.

Raises:

ValueError – If the permissions are double set

with_receive_hwm(receive_hwm)

Sets the receive HWM on the ZeroMQ socket.

Parameters:

receive_hwm (int) – The receive HWM, defaults to 50.

Raises:

ValueError – If the receive HWM is double set

with_receive_timeout(receive_timeout)

Sets the receive timeout for the ZeroMQ socket.

Parameters:

receive_timeout (int) – The receive timeout in milliseconds. Defaults to 1000

Raises:

ValueError – If the receive timeout is double set

with_routing_cache_size(size)

Sets the routing cache size for the ZeroMQ socket. The cache is used to track dealer-router connections chaos when the dealers reconnect with the same topic.

Parameters:

size (int) – The routing cache size, defaults to 512.

with_socket_type(socket_type)

Sets the socket type

Parameters:

socket_type (ReaderSocketType) – The socket type to use, defaults to ReaderSocketType.Sub

Raises:

ValueError – If the socket type is double set. Defaults to ReaderSocketType.Router

with_source_blacklist_size(size)

Sets the source blacklist size for the ZeroMQ socket. The blacklist is used to block sources sending wrong data

Parameters:

size (int) – The source blacklist size, defaults to 1024.

Raises:

ValueError – If the source blacklist size is double set

with_source_blacklist_ttl(ttl)

Sets the source blacklist TTL for the ZeroMQ socket. The blacklist is used to track the sources sending wrong data

Parameters:

ttl (int) – The source blacklist TTL, defaults to 60000.

Raises:

ValueError – If the source blacklist TTL is double set

with_topic_prefix_spec(topic_prefix_spec)

Sets the topic prefix spec for the ZeroMQ socket.

Parameters:

topic_prefix_spec (TopicPrefixSpec) – The topic prefix spec, defaults to TopicPrefixSpec.none()

Raises:

ValueError – If the topic prefix spec is double set

class savant_rs.zmq.ReaderResultBlacklisted

Returned when a reader is unable to receive a message due to a timeout on ZMQ.

topic
class savant_rs.zmq.ReaderResultMessage

Returned when a reader received a message.

data(index)

Returns the data stored in the message at the given index.

Parameters:

index (int) – The index of the data to return.

Returns:

  • bytes – The data stored in the message at the given index.

  • None – If the index is out of bounds.

Raises:

MemoryError – if the data cannot be allocated in Python.

data_len()

Returns the length o the data vector stored in the message.

message

The savant_rs.utils.serialization.Message received.

routing_id

The routing id of the message. The field is only filled for Router socket.

topic

The topic of the message.

class savant_rs.zmq.ReaderResultPrefixMismatch

Returned when a reader received a message not matching the topic prefix configured.

routing_id

The routing id of the message. The field is only filled for Router socket.

topic

The topic of the message.

class savant_rs.zmq.ReaderResultTimeout

Returned when a reader is unable to receive a message due to a timeout on ZMQ.

class savant_rs.zmq.ReaderSocketType

Represents a socket type for a reader socket.

Rep = Rep
Router = Router
Sub = Sub
class savant_rs.zmq.TopicPrefixSpec

The object is used to configure the rules to pass messages from a writer to a reader based on either exact topic match or a prefix match.

static none()

Creates a match rule for no match

static prefix(prefix)

Creates a match rule for prefix match

Parameters:

prefix (str) – The prefix to match

static source_id(id)

Creates a match rule for exact topic match

Parameters:

id (str) – The topic to match

class savant_rs.zmq.WriteOperationResult
get()
try_get()
class savant_rs.zmq.WriterConfig

A writer configuration

bind
endpoint
fix_ipc_permissions
receive_hwm
receive_retries
receive_timeout
send_hwm
send_retries
send_timeout
socket_type
class savant_rs.zmq.WriterConfigBuilder(url)

Creates a new configuration builder based on the provided URL. The URL can have the following formats:

  • tcp://1.2.3.4:5678

  • ipc:///tmp/test

  • (pub|req|dealer)+(bind|connect):(tcp|ipc)://...

Parameters:

url (str) – The URL to use

Return type:

WriterConfigBuilder

Raises:

ValueError – If the URL is invalid

build()

Builds the configuration

Returns:

The built configuration

Return type:

WriterConfig

Raises:

ValueError – If the configuration is invalid

with_bind(bind)

Specifies either socket binds or connects

Parameters:

bind (bool) – If True, the socket will bind, otherwise it will connect. Defaults to True

with_fix_ipc_permissions(fix_ipc_permissions=None)

Fixes permissions to specified access mask for a domain unix socket. It may be required for sockets mapped to host volumes

Parameters:

fix_ipc_permissions (int) – The access mask to set, defaults to 0o777.

with_receive_hwm(receive_hwm)

Sets the receive high water mark value for the ZeroMQ socket.

Parameters:

receive_hwm (int) – The receive high water mark value, defaults to 50.

with_receive_retries(receive_retries)

Sets the number of receive retries for the ZeroMQ socket when the other side replies.

  • Dealer-Router pair: only on EOS

  • Req-Rep pair: on every message

  • Pub-Sub pair: never

Parameters:

receive_retries (int) – The number of receive retries, defaults to 3.

with_receive_timeout(receive_timeout)

Sets the receive timeout for the ZeroMQ socket

Parameters:

receive_timeout (int) – The receive timeout in milliseconds. Defaults to 1000

with_send_hwm(send_hwm)

Sets the send high water mark value for the ZeroMQ socket.

Parameters:

send_hwm (int) – The send high water mark value, defaults to 50.

with_send_retries(send_retries)

Sets the number of send retries for the ZeroMQ socket when the other side does not accept the message.

Parameters:

send_retries (int) – The number of send retries, defaults to 3.

with_send_timeout(send_timeout)

Sets the send timeout for the ZeroMQ socket

Parameters:

send_timeout (int) – The send timeout in milliseconds. Defaults to 5000

with_socket_type(socket_type)

Sets the socket type

Parameters:

socket_type (WriterSocketType) – The socket type to use, defaults to WriterSocketType.Dealer

class savant_rs.zmq.WriterResultAck

Returned when a writer is able to send a message and receive an ack. The result is expected for every Req/Rep message and for only for EOS when using Dealer/Router. Pub/Sub does not use acks.

receive_retries_spent
send_retries_spent
time_spent
class savant_rs.zmq.WriterResultAckTimeout

Returned when a writer is unable to receive an ack due to a timeout on ZMQ. Contains a field holding the accumulated timeout (receive_retries x receive_timeout).

timeout
class savant_rs.zmq.WriterResultSendTimeout

Returned when a writer is unable to send a message due to a timeout on ZMQ.

class savant_rs.zmq.WriterResultSuccess

Returned when a writer is able to send a message without receiving an ack. For Dealer/Router when the message is not EOS, for Pub/Sub always. Req/Rep does not use this result.

retries_spent
time_spent
class savant_rs.zmq.WriterSocketType

Represents a socket type for a writer socket.

Dealer = Dealer
Pub = Pub
Req = Req