Module rabibridge.mq

Classes

class RMQBase (loop: Optional[asyncio.events.AbstractEventLoop] = None, host: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None)

Subclasses

Methods

async def close(self) ‑> None
async def connect(self) ‑> RMQBase
class RMQClient (loop: Optional[asyncio.events.AbstractEventLoop] = None, host: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None)

Acting as a client-initiating requestor in a gateway service.

The following parameters are used preferentially if they are specified, if they are not specified, the configuration file is searched to use, and an error is reported if they are not in the configuration file either.

Args

loop
Event loop in this particular process. Defaults to None.
host
RabbitMQ host. Defaults to None.
port
RabbitMQ port. Defaults to None.
username
RabbitMQ username. Defaults to None.
password
RabbitMQ password. Defaults to None.

Ancestors

Instance variables

var correlation_id : str

A (periodically) self-incrementing pointer that should not normally be called directly by the user.

Methods

async def close(self)
async def connect(self) ‑> RMQClient

It should be connected using connect() before making the call and released using close() before closing the client. The client can be used as a context manager to ensure that the connection is closed properly after use.

Examples

>>> async with RMQClient(...) as client:
...     res = await client.remote_call(...)
...     print(res)
async def remote_call(self, func_name: str, args: tuple[typing.Any] = (), kwargs: dict[typing.Any] = {}, ftype: Literal['async', 'sync'] = 'async', *, timeout: Optional[float] = None) ‑> Any

Args

func_name
function name to be called.
args
arguments. Defaults to ().
kwargs
keyword arguments. Defaults to {}.
ftype
function type to be called remotely. e.g. defaults to 'async', and asynchronous call will be made on server side.
timeout
Client timeout time, independent from queue timeout hyper-parameter. Defaults to None.

Note

The timeout setting is recommended to be consistent with the timeout of the back-end service, if not, there may be a situation where the front-end has already timed out but the back-end still continues to execute the task. This is due to the loose coupling of front-end and back-end in rabbitmq, and the timeout cancellation mechanism can not be controlled by the publisher, active cancellation is not easy to realize. This function will uplift errors that may occur during execution:

This function raises errors that may occur during execution, possible errors are: call queue is full (aio_pika.Basic.Nack), no route (aiormq.exceptions.PublishError), call timeout (asyncio.TimeoutError), remote execution error (rabibridge.RemoteExecutionError).

Examples

>>> try:
...     res = await client.remote_call(...)
>>> Except Exceition as e:
...     ...

Returns

Any
(result) The result of the remote call.
async def try_remote_call(self, func_name: str, args: tuple[typing.Any] = (), kwargs: dict[typing.Any] = {}, ftype: Literal['async', 'sync'] = 'async', *, timeout: Optional[float] = None) ‑> Tuple[bool, Tuple[int, Any]]

A simplified way of writing remote_call, there is no essential difference between the two. Instead of doing a try except externally, you control the process execution through the error code.

Return

(bool, Tuple[int, Any]): (success, (err_code, result)) Where success means if the call successfully returns on client side, if it returns normally, it means that no timeout or full queue error has occurred, but this does not mean that the returned result is reliable. While err_code means if the call run smoothly on remote side, 0 for success, 1 for error.

The relationship between success and err_code: If error_code is 0, the result is reliable. If error_code is 1, it is necessary to determine whether the error occurred on the client or server side based on success.

class RMQServer (loop: Optional[asyncio.events.AbstractEventLoop] = None, host: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None)

Used to provide services with daemon process.

The following parameters are used preferentially if they are specified, if they are not specified, the configuration file is searched to use, and an error is reported if they are not in the configuration file either.

Args

loop
Event loop in this particular process. Defaults to None.
host
RabbitMQ host. Defaults to None.
port
RabbitMQ port. Defaults to None.
username
RabbitMQ username. Defaults to None.
password
RabbitMQ password. Defaults to None.

Ancestors

Methods

def add_service(self, func_ptr: Callable[..., Any], queue_size: Optional[int] = None, fetch_size: Optional[int] = None, timeout: Optional[int] = None, re_register: bool = False) ‑> ServiceSchema

If you do not wish to use automatic capture, you can register the service manually. Manually registered services do not need to be pre-registered using the recister_call decorator.

For technical details and meanings of the parameters, please refer to the description in register_call.

Args

func_ptr
function pointer.
queue_size
queue size.
fetch_size
fetch size.
timeout
timeout. Defaults to None.
re_register
re-register. Defaults to False.
async def close(self)
async def connect(self) ‑> RMQServer

It should be connected using connect() before making the call and released using close() before closing the client. The client can be used as a context manager to ensure that the connection is closed properly after use.

Examples

>>> s = RMQServer(...)
>>> s.load_services(globals())
>>> async with s:
...     await server.run_serve()
def load_plugins(self, plugin_dir: Union[pathlib.Path, str]) ‑> None

Load services from a specified directory, the directory should contain python files that have been registered with the register_call decorator.

Args

plugin_dir
plugin directory.
def load_services(self, symbols: dict[str, object]) ‑> None

Automatically captures all registered functions in global space, using which requires that all the specified call has already been registered.

Args

symbols
global dict.

Examples

>>> @register_call(...)
>>> def call_1():
...     ...
>>> obj.load_services(globals())
None # call_1 has been loaded
async def run_serve(self, *, reload: bool = False)

Start the service, the program will block once called.

Args

reload
whether to reload the service on file changes. Defaults to False. You need to explicitly call load_plugins() first to enable this option

Note

Repeated loading and unloading in the current version can lead to memory leaks.