savant_rs.zmq
- class savant_rs.zmq.BlockingReader(config)
Blocking Reader with GIL release on long-lasting receive operations.
- Parameters:
config (
ReaderConfig
) – Reader configuration.
- blacklist_source(source_id)
Blacklists source
- Parameters:
source_id (bytes) – Source ID to blacklist.
- is_blacklisted(source_id)
Returns true if the source is blacklisted.
- Parameters:
source_id (bytes) – Source ID to check.
- Returns:
true if the source is blacklisted.
- Return type:
bool
- is_started()
Returns true if the reader is started.
- receive()
Receives a message. Blocks until a message is received. Releases GIL while waiting for the result.
- Returns:
ReaderResultEndOfStream
- Raises:
RuntimeError – When the reader receives an error. Generally means that the reader is no longer usable and should be shutdown.
- shutdown()
Shuts down the reader. If the reader is not started, returns an error.
- start()
Starts the reader. If the reader is already started, returns an error.
- class savant_rs.zmq.BlockingWriter(config)
Blocking Writer with GIL release on long-lasting send_* operations.
- Parameters:
config (WriterConfig) – Writer configuration.
- is_started()
Returns true if the writer is started.
- send_eos(topic)
Sends EOS to the specified topic. If the writer is not started, returns an error. Releases GIL while waiting for the result.
- Parameters:
topic (str) – Topic to send EOS to.
- Returns:
- Raises:
RuntimeError – When underlying ZeroMQ writer fails and no longer functional. Usually means that the writer must be restarted.
- send_message(topic, message, extra)
Sends a message to the specified topic. If the writer is not started, returns an error. Releases GIL while waiting for the result.
- Parameters:
topic (str) – Topic to send EOS to.
message (
savant_rs.utils.serialization.Message
) – Message to send.extra (bytes) – Extra data to send with the message.
- Returns:
- Raises:
RuntimeError – When underlying ZeroMQ writer fails and no longer functional. Usually means that the writer must be restarted.
- shutdown()
Shuts down the writer. If the writer is not started, returns an error.
- start()
Starts the writer. If the writer is already started, returns an error.
- class savant_rs.zmq.NonBlockingReader(config, results_queue_size)
A non-blocking reader. Does not release GIL when uses receive convenience method, which is blocking. For non-blocking operations use try_receive.
- Parameters:
config (ReaderConfig) – Reader configuration.
results_queue_size (int) – Size of the queue for storing results. If the queue is full, the reader’s internal operation will block. and depending on the socket type can cause either drop or backpressure. The user can use enqueued_results to check the number of enqueued results and read them either with receive or try_receive.
- blacklist_source(source_id)
Blacklists source
- Parameters:
source_id (bytes) – Source ID to blacklist.
- enqueued_results()
Returns the number of enqueued results. Those results require fetching to avoid the reader blocking.
- is_blacklisted(source_id)
Returns true if the source is blacklisted.
- Parameters:
source_id (bytes) – Source ID to check.
- Returns:
true if the source is blacklisted.
- Return type:
bool
- is_shutdown()
Returns true if the reader is shutdown.
- is_started()
Returns true if the reader is started.
- receive()
Receives a message. Blocks until a message is received. Does not release GIL. This is a convenience method which normally should not be used with such a reader. For non-blocking operations use try_receive.
- Returns:
ReaderResultEndOfStream
- Raises:
RuntimeError – When the reader receives an error. Generally means that the reader is no longer usable and should be shutdown.
- shutdown()
Shuts down the reader. If the reader is already shutdown, returns an error.
- start()
Starts the reader. If the reader is already started, returns an error.
- try_receive()
- class savant_rs.zmq.NonBlockingWriter(config, max_infight_messages)
A non-blocking writer. Sends the message to the internal command queue with returning a Future-like object
WriteOperationResult
. The user can use get or try_get to get the result of the operation.- Parameters:
config (WriterConfig) – Writer configuration.
max_infight_messages (int) – Maximum number of inflight messages. If the number of inflight messages is equal to this value, the writer’s internal operation will block.
- has_capacity()
Returns true if the writer has capacity to send more messages.
- inflight_messages()
Returns the number of inflight messages.
- is_shutdown()
Returns true if the writer is shutdown.
- is_started()
Returns true if the writer is started.
- send_eos(topic)
Sends EOS to the specified topic.
- Parameters:
topic (str) – Topic to send EOS to.
- Returns:
Write operation result - a future-like object which can be used to get the result of the operation.
- Return type:
- Raises:
RuntimeError – When the writer receives an error. Generally means that the writer is no longer usable and should be shutdown.
- send_message(topic, message, extra)
Sends a message to the specified topic.
- Parameters:
topic (str) – Topic to send the message to.
message (
savant_rs.utils.serialization.Message
) – Message to send.extra (bytes) – Extra data to send with the message.
- Returns:
Write operation result - a future-like object which can be used to get the result of the operation.
- Return type:
- Raises:
RuntimeError – When the writer receives an error. Generally means that the writer is no longer usable and should be shutdown.
- shutdown()
Shuts down the writer. If the writer is already shutdown, returns an error.
- start()
Starts the writer. If the writer is already started, returns an error.
- class savant_rs.zmq.ReaderConfig
A reader configuration
- bind
- endpoint
- fix_ipc_permissions
- receive_hwm
- receive_timeout
- routing_cache_size
- socket_type
- source_blacklist_size
- source_blacklist_ttl
- topic_prefix_spec
- class savant_rs.zmq.ReaderConfigBuilder(url)
Creates a new configuration builder based on the provided URL. The URL can have the following formats:
tcp://1.2.3.4:5678
ipc:///tmp/test
(sub|rep|router)+(bind|connect):(tcp|ipc)://...
- Parameters:
url (str) – The URL to use
- Return type:
- Raises:
ValueError – If the URL is invalid
- build()
Builds the configuration
- Returns:
The built configuration
- Return type:
- Raises:
ValueError – If the configuration is invalid
- with_bind(bind)
Specifies either socket binds or connects
- Parameters:
bind (bool) – If
True
, the socket will bind, otherwise it will connect. Defaults toTrue
- with_fix_ipc_permissions(permissions=None)
Fixes permissions to specified access mask for a domain unix socket. It may be required for sockets mapped to host volumes
- Parameters:
fix_ipc_permissions (int) – The access mask to set, defaults to
0o777
.- Raises:
ValueError – If the permissions are double set
- with_receive_hwm(receive_hwm)
Sets the receive HWM on the ZeroMQ socket.
- Parameters:
receive_hwm (int) – The receive HWM, defaults to
50
.- Raises:
ValueError – If the receive HWM is double set
- with_receive_timeout(receive_timeout)
Sets the receive timeout for the ZeroMQ socket.
- Parameters:
receive_timeout (int) – The receive timeout in milliseconds. Defaults to
1000
- Raises:
ValueError – If the receive timeout is double set
- with_routing_cache_size(size)
Sets the routing cache size for the ZeroMQ socket. The cache is used to track dealer-router connections chaos when the dealers reconnect with the same topic.
- Parameters:
size (int) – The routing cache size, defaults to
512
.
- with_socket_type(socket_type)
Sets the socket type
- Parameters:
socket_type (
ReaderSocketType
) – The socket type to use, defaults toReaderSocketType.Sub
- Raises:
ValueError – If the socket type is double set. Defaults to
ReaderSocketType.Router
- with_source_blacklist_size(size)
Sets the source blacklist size for the ZeroMQ socket. The blacklist is used to block sources sending wrong data
- Parameters:
size (int) – The source blacklist size, defaults to
1024
.- Raises:
ValueError – If the source blacklist size is double set
- with_source_blacklist_ttl(ttl)
Sets the source blacklist TTL for the ZeroMQ socket. The blacklist is used to track the sources sending wrong data
- Parameters:
ttl (int) – The source blacklist TTL, defaults to
60000
.- Raises:
ValueError – If the source blacklist TTL is double set
- with_topic_prefix_spec(topic_prefix_spec)
Sets the topic prefix spec for the ZeroMQ socket.
- Parameters:
topic_prefix_spec (
TopicPrefixSpec
) – The topic prefix spec, defaults toTopicPrefixSpec.none()
- Raises:
ValueError – If the topic prefix spec is double set
- class savant_rs.zmq.ReaderResultBlacklisted
Returned when a reader is unable to receive a message due to a timeout on ZMQ.
- topic
- class savant_rs.zmq.ReaderResultMessage
Returned when a reader received a message.
- data(index)
Returns the data stored in the message at the given index.
- Parameters:
index (int) – The index of the data to return.
- Returns:
bytes – The data stored in the message at the given index.
None – If the index is out of bounds.
- Raises:
MemoryError – if the data cannot be allocated in Python.
- data_len()
Returns the length o the data vector stored in the message.
- message
The
savant_rs.utils.serialization.Message
received.
- routing_id
The routing id of the message. The field is only filled for Router socket.
- topic
The topic of the message.
- class savant_rs.zmq.ReaderResultPrefixMismatch
Returned when a reader received a message not matching the topic prefix configured.
- routing_id
The routing id of the message. The field is only filled for Router socket.
- topic
The topic of the message.
- class savant_rs.zmq.ReaderResultTimeout
Returned when a reader is unable to receive a message due to a timeout on ZMQ.
- class savant_rs.zmq.ReaderSocketType
Represents a socket type for a reader socket.
- Rep = Rep
- Router = Router
- Sub = Sub
- class savant_rs.zmq.TopicPrefixSpec
The object is used to configure the rules to pass messages from a writer to a reader based on either exact topic match or a prefix match.
- static none()
Creates a match rule for no match
- static prefix(prefix)
Creates a match rule for prefix match
- Parameters:
prefix (str) – The prefix to match
- static source_id(id)
Creates a match rule for exact topic match
- Parameters:
id (str) – The topic to match
- class savant_rs.zmq.WriterConfig
A writer configuration
- bind
- endpoint
- fix_ipc_permissions
- receive_hwm
- receive_retries
- receive_timeout
- send_hwm
- send_retries
- send_timeout
- socket_type
- class savant_rs.zmq.WriterConfigBuilder(url)
Creates a new configuration builder based on the provided URL. The URL can have the following formats:
tcp://1.2.3.4:5678
ipc:///tmp/test
(pub|req|dealer)+(bind|connect):(tcp|ipc)://...
- Parameters:
url (str) – The URL to use
- Return type:
- Raises:
ValueError – If the URL is invalid
- build()
Builds the configuration
- Returns:
The built configuration
- Return type:
- Raises:
ValueError – If the configuration is invalid
- with_bind(bind)
Specifies either socket binds or connects
- Parameters:
bind (bool) – If
True
, the socket will bind, otherwise it will connect. Defaults toTrue
- with_fix_ipc_permissions(fix_ipc_permissions=None)
Fixes permissions to specified access mask for a domain unix socket. It may be required for sockets mapped to host volumes
- Parameters:
fix_ipc_permissions (int) – The access mask to set, defaults to
0o777
.
- with_receive_hwm(receive_hwm)
Sets the receive high water mark value for the ZeroMQ socket.
- Parameters:
receive_hwm (int) – The receive high water mark value, defaults to
50
.
- with_receive_retries(receive_retries)
Sets the number of receive retries for the ZeroMQ socket when the other side replies.
Dealer-Router pair: only on EOS
Req-Rep pair: on every message
Pub-Sub pair: never
- Parameters:
receive_retries (int) – The number of receive retries, defaults to
3
.
- with_receive_timeout(receive_timeout)
Sets the receive timeout for the ZeroMQ socket
- Parameters:
receive_timeout (int) – The receive timeout in milliseconds. Defaults to
1000
- with_send_hwm(send_hwm)
Sets the send high water mark value for the ZeroMQ socket.
- Parameters:
send_hwm (int) – The send high water mark value, defaults to
50
.
- with_send_retries(send_retries)
Sets the number of send retries for the ZeroMQ socket when the other side does not accept the message.
- Parameters:
send_retries (int) – The number of send retries, defaults to
3
.
- with_send_timeout(send_timeout)
Sets the send timeout for the ZeroMQ socket
- Parameters:
send_timeout (int) – The send timeout in milliseconds. Defaults to
5000
- with_socket_type(socket_type)
Sets the socket type
- Parameters:
socket_type (
WriterSocketType
) – The socket type to use, defaults toWriterSocketType.Dealer
- class savant_rs.zmq.WriterResultAck
Returned when a writer is able to send a message and receive an ack. The result is expected for every Req/Rep message and for only for EOS when using Dealer/Router. Pub/Sub does not use acks.
- receive_retries_spent
- send_retries_spent
- time_spent
- class savant_rs.zmq.WriterResultAckTimeout
Returned when a writer is unable to receive an ack due to a timeout on ZMQ. Contains a field holding the accumulated timeout (receive_retries x receive_timeout).
- timeout
- class savant_rs.zmq.WriterResultSendTimeout
Returned when a writer is unable to send a message due to a timeout on ZMQ.