Package rabibridge
Sub-modules
rabibridge.exceptions
rabibridge.models
rabibridge.mq
rabibridge.permissions
rabibridge.serialisation
rabibridge.store
rabibridge.utils
Functions
def multiprocess_spawn_helper(num_processes: Optional[int], single_process: Callable[..., Any], *, bind_core: Optional[bool] = False)
-
A simple function to help you use the multiprocessing module to expand a copy of a child process in each core. We recommend reading the source code directly if you want to understand the details.
Args
num_processes
- the number of processes to spawn. Input
None
means the number of logical cores. single_process
- the function to be executed in each process.
bind_core
- whether to bind the process to the core. If your number of deployments is equal to the number of cores and your business is under pressure, turning on this option is good for avoiding register overhead due to core switching and can slightly improve performance. Most of the time it is not recommended to turn on. Defaults to
False
.
def register_call(queue_size: Optional[int] = None, fetch_size: Optional[int] = None, timeout: Optional[int] = None, *, validate: bool = False, re_register: bool = False)
-
Args
queue_size
- what the queue length for this call should be (maximum number of waiting tasks). Default to
None
means no limit. Changing this parameter affects the persistence settings in rabbitmq, so it needs to be redeclared. fetch_size
- fetch size. You need to set a reasonable value to achieve maximum performance. Although for I/O-bound tasks, as more waiting does not open more connections, they usually don't consume too many system resources under an I/O multiplexing model. However, you generally shouldn't let your application listen to too many file descriptors at the same time. Typically, maintaining the system's listening file descriptors in the range of a few hundred to a few thousand is the key to ensuring efficiency. These file descriptors can ideally be assumed to be evenly distributed across different processes, with each process evenly distributed across different calls. From this, you can infer an appropriate value for this parameter to set, which usually shouldn't be too small or too large. Of course, if your business puts significant pressure on the backend, say a complex SQL search, limiting
fetch_size
to a very small value is an effective way to protect the backend service. The default isNone
, which means get all the messages in the current queue in the ready state. timeout
- message timeout from the queue. Defaults to
None
. Changing this parameter affects the persistence settings in rabbitmq, so it needs to be redeclared. validate
- whether to force constraints on the type legitimacy of input parameters when a remote call occurs, a wrapper for the
pydantic.validate_call
decorator. Defaults toFalse
. re_register
- whether to remove the hyperparameter in rabbitmq that the queue has been persisted and redeclare. Defaults to
False
.
Note
re_register should not be used in multiprocessing mode, where reclaim will cause other worker disconeected.
Classes
class RMQClient (loop: Optional[asyncio.events.AbstractEventLoop] = None, host: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None)
-
Acting as a client-initiating requestor in a gateway service.
The following parameters are used preferentially if they are specified, if they are not specified, the configuration file is searched to use, and an error is reported if they are not in the configuration file either.
Args
loop
- Event loop in this particular process. Defaults to
None
. host
- RabbitMQ host. Defaults to
None
. port
- RabbitMQ port. Defaults to
None
. username
- RabbitMQ username. Defaults to
None
. password
- RabbitMQ password. Defaults to
None
.
Ancestors
Instance variables
var correlation_id : str
-
A (periodically) self-incrementing pointer that should not normally be called directly by the user.
Methods
async def close(self)
async def connect(self) ‑> RMQClient
-
It should be connected using
connect()
before making the call and released usingclose()
before closing the client. The client can be used as a context manager to ensure that the connection is closed properly after use.Examples
>>> async with RMQClient(...) as client: ... res = await client.remote_call(...) ... print(res)
async def remote_call(self, func_name: str, args: tuple[typing.Any] = (), kwargs: dict[typing.Any] = {}, ftype: Literal['async', 'sync'] = 'async', *, timeout: Optional[float] = None) ‑> Any
-
Args
func_name
- function name to be called.
args
- arguments. Defaults to
()
. kwargs
- keyword arguments. Defaults to
{}
. ftype
- function type to be called remotely. e.g. defaults to 'async', and asynchronous call will be made on server side.
timeout
- Client timeout time, independent from queue timeout hyper-parameter. Defaults to
None
.
Note
The timeout setting is recommended to be consistent with the timeout of the back-end service, if not, there may be a situation where the front-end has already timed out but the back-end still continues to execute the task. This is due to the loose coupling of front-end and back-end in rabbitmq, and the timeout cancellation mechanism can not be controlled by the publisher, active cancellation is not easy to realize. This function will uplift errors that may occur during execution:
This function raises errors that may occur during execution, possible errors are: call queue is full (aio_pika.Basic.Nack), no route (aiormq.exceptions.PublishError), call timeout (asyncio.TimeoutError), remote execution error (rabibridge.RemoteExecutionError).
Examples
>>> try: ... res = await client.remote_call(...) >>> Except Exceition as e: ... ...
Returns
Any
- (result) The result of the remote call.
async def try_remote_call(self, func_name: str, args: tuple[typing.Any] = (), kwargs: dict[typing.Any] = {}, ftype: Literal['async', 'sync'] = 'async', *, timeout: Optional[float] = None) ‑> Tuple[bool, Tuple[int, Any]]
-
A simplified way of writing remote_call, there is no essential difference between the two. Instead of doing a try except externally, you control the process execution through the error code.
Return
(bool, Tuple[int, Any]): (success, (err_code, result)) Where success means if the call successfully returns on client side, if it returns normally, it means that no timeout or full queue error has occurred, but this does not mean that the returned result is reliable. While err_code means if the call run smoothly on remote side, 0 for success, 1 for error.
The relationship between success and err_code: If error_code is 0, the result is reliable. If error_code is 1, it is necessary to determine whether the error occurred on the client or server side based on success.
class RMQServer (loop: Optional[asyncio.events.AbstractEventLoop] = None, host: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None)
-
Used to provide services with daemon process.
The following parameters are used preferentially if they are specified, if they are not specified, the configuration file is searched to use, and an error is reported if they are not in the configuration file either.
Args
loop
- Event loop in this particular process. Defaults to
None
. host
- RabbitMQ host. Defaults to
None
. port
- RabbitMQ port. Defaults to
None
. username
- RabbitMQ username. Defaults to
None
. password
- RabbitMQ password. Defaults to
None
.
Ancestors
Methods
def add_service(self, func_ptr: Callable[..., Any], queue_size: Optional[int] = None, fetch_size: Optional[int] = None, timeout: Optional[int] = None, re_register: bool = False) ‑> ServiceSchema
-
If you do not wish to use automatic capture, you can register the service manually. Manually registered services do not need to be pre-registered using the
recister_call
decorator.For technical details and meanings of the parameters, please refer to the description in
register_call()
.Args
func_ptr
- function pointer.
queue_size
- queue size.
fetch_size
- fetch size.
timeout
- timeout. Defaults to
None
. re_register
- re-register. Defaults to
False
.
async def close(self)
async def connect(self) ‑> RMQServer
-
It should be connected using
connect()
before making the call and released usingclose()
before closing the client. The client can be used as a context manager to ensure that the connection is closed properly after use.Examples
>>> s = RMQServer(...) >>> s.load_services(globals()) >>> async with s: ... await server.run_serve()
def load_plugins(self, plugin_dir: Union[pathlib.Path, str]) ‑> None
-
Load services from a specified directory, the directory should contain python files that have been registered with the
register_call()
decorator.Args
plugin_dir
- plugin directory.
def load_services(self, symbols: dict[str, object]) ‑> None
-
Automatically captures all registered functions in global space, using which requires that all the specified call has already been registered.
Args
symbols
- global dict.
Examples
>>> @register_call(...) >>> def call_1(): ... ... >>> obj.load_services(globals()) None # call_1 has been loaded
async def run_serve(self, *, reload: bool = False)
-
Start the service, the program will block once called.
Args
reload
- whether to reload the service on file changes. Defaults to
False
. You need to explicitly callload_plugins()
first to enable this option
Note
Repeated loading and unloading in the current version can lead to memory leaks.
class RemoteExecutionError (*args, **kwargs)
-
Raised when an exception occurs during remote execution.
Ancestors
- builtins.Exception
- builtins.BaseException
class Store (**kwargs: Any)
-
Usage docs: https://docs.pydantic.dev/2.9/concepts/models/
A base class for creating Pydantic models.
Attributes
__class_vars__
- The names of the class variables defined on the model.
__private_attributes__
- Metadata about the private attributes of the model.
__signature__
- The synthesized
__init__
[Signature
][inspect.Signature] of the model. __pydantic_complete__
- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
- The core schema of the model.
__pydantic_custom_init__
- Whether the model has a custom
__init__
function. __pydantic_decorators__
- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. __pydantic_generic_metadata__
- Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
- The name of the post-init method for the model, if defined.
__pydantic_root_model__
- Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. __pydantic_serializer__
- The
pydantic-core
SchemaSerializer
used to dump instances of the model. __pydantic_validator__
- The
pydantic-core
SchemaValidator
used to validate instances of the model. __pydantic_extra__
- A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. __pydantic_fields_set__
- The names of fields explicitly set during instantiation.
__pydantic_private__
- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var extra_fields : Dict[str, Any]
var model_computed_fields
var model_config
var model_fields