rocksq.blocking

class rocksq.blocking.MpmcQueue(path, ttl)

A persistent queue with a ttl that supports multiple consumers marked with labels. This is a blocking implementation.

Parameters:
  • path (str) – The path to the queue.

  • ttl (int) – The amount of seconds after which the element in the queue will be removed. TTL is not strict. It means that the element will remain in the queue for TTL seconds after insertion and the queue will make efforts to remove the element after TTL seconds but it is not guaranteed to be done immediately. Thus, consumers can retrieve expired but not removed elements.

Raises:

PyRuntimeError – If the queue could not be created.

add(items, no_gil=True)

Adds items to the queue.

GIL: the method can optionally be called without the GIL.

Parameters:
  • items (list of bytes) – The items to add to the queue.

  • no_gil (bool) – If True, the method will be called without the GIL. Default is True.

Raises:

PyRuntimeError – If the method fails.

Return type:

None

disk_size

Returns the disk size of the queue in bytes.

Returns:

size

Return type:

int

Raises:

PyRuntimeError – If the method fails.

is_empty

Checks if the queue is empty.

Returns:

True if the queue is empty, False otherwise.

Return type:

bool

labels

Returns the consumer labels.

Returns:

labels – The consumer labels.

Return type:

list of str

len

Returns the number of elements in the queue.

Returns:

The number of elements in the queue.

Return type:

int

next(label, start_position, max_elements=1, no_gil=True)

Retrieves items from the queue.

GIL: the method can optionally be called without the GIL.

Parameters:
  • label (str) – The consumer label that determines the start position in the queue to retrieve elements. If the label does not exist the start position is determined by ``option` parameter. If the label exists the start position is the next element after the last call of this method. If some elements are expired between the last and this call next non-expired elements will be retrieved.

  • start_position (StartPosition) – The option that determines the start position in the queue to retrieve elements if the consumer label does not exist.

  • max_elements (int) – The maximum number of elements to retrieve. Default is 1.

  • no_gil (bool) – If True, the method will be called without the GIL. Default is True.

Raises:

PyRuntimeError – If the method fails.

Returns:

  • items (list of bytes) – The items retrieved from the queue.

  • expired (bool) – True if some elements are expired between the last and this call of the method.

remove_label(label, no_gil=True)

Removes the consumer label from the queue.

GIL: the method can optionally be called without the GIL.

Parameters:
  • label (str) – The consumer label to remove.

  • no_gil (bool) – If True, the method will be called without the GIL. Default is True.

Raises:

PyRuntimeError – If the method fails.

Return type:

None

class rocksq.blocking.PersistentQueueWithCapacity(path, max_elements=1000000000)

A persistent queue with a fixed capacity. This is a blocking implementation.

Parameters:
  • path (str) – The path to the queue.

  • max_elements (int) – The maximum number of elements the queue can hold. Default is 1_000_000_000.

Raises:

PyRuntimeError – If the queue could not be created.

disk_size

Returns the disk size of the queue in bytes.

Returns:

size

Return type:

int

Raises:

PyRuntimeError – If the method fails.

is_empty

Checks if the queue is empty.

Returns:

True if the queue is empty, False otherwise.

Return type:

bool

len

Returns the number of elements in the queue.

Returns:

The number of elements in the queue.

Return type:

int

payload_size

Returns the size of the queue in bytes (only payload).

Returns:

size

Return type:

int

pop(max_elements=1, no_gil=True)

Retrieves items from the queue.

GIL: the method can optionally be called without the GIL.

Parameters:
  • max_elements (int) – The maximum number of elements to retrieve. Default is 1.

  • no_gil (bool) – If True, the method will be called without the GIL. Default is True.

Raises:

PyRuntimeError – If the method fails.

Returns:

items – The items retrieved from the queue.

Return type:

list of bytes

push(items, no_gil=True)

Adds items to the queue.

GIL: the method can optionally be called without the GIL.

Parameters:
  • items (list of bytes) – The items to add to the queue.

  • no_gil (bool) – If True, the method will be called without the GIL. Default is True.

Raises:

PyRuntimeError – If the method fails.

Return type:

None