Module rabibridge.utils

Functions

def decode_pwd(cipher: Optional[str], secret: Optional[str], decrypt_function: Callable[[bytes, bytes], bytes]) ‑> Optional[str]
def dynamic_load_module(file_path: pathlib.Path)

Args

file_path
the file path of the module.

Note

This function is used to dynamically load a module and execute it. The module name is the same as the file name without the suffix. The module is loaded into the sys.modules dictionary.

This function should NOT be called explicitly by the user, but the user should ensure that the plugin filename can be legally registered and does not duplicate existing libraries.

def encode_pwd(pwd: Optional[str], secret: Optional[str], encrypt_function: Callable[[bytes, bytes], bytes]) ‑> Optional[str]
def get_config_val(config: dict, k1: str, k2: str) ‑> Any
def list_main_functions(global_symbols: dict[str, object], banned_names: list[str] = []) ‑> Generator[Tuple[str, str, object], None, None]
def load_config() ‑> dict

Args

None

Returns

dict
the configuration dictionary.

Note

The configuration file is located in the config folder of the project root directory, named env.toml.

def multiprocess_spawn_helper(num_processes: Optional[int], single_process: Callable[..., Any], *, bind_core: Optional[bool] = False)

A simple function to help you use the multiprocessing module to expand a copy of a child process in each core. We recommend reading the source code directly if you want to understand the details.

Args

num_processes
the number of processes to spawn. Input None means the number of logical cores.
single_process
the function to be executed in each process.
bind_core
whether to bind the process to the core. If your number of deployments is equal to the number of cores and your business is under pressure, turning on this option is good for avoiding register overhead due to core switching and can slightly improve performance. Most of the time it is not recommended to turn on. Defaults to False.
def register_call(queue_size: Optional[int] = None, fetch_size: Optional[int] = None, timeout: Optional[int] = None, *, validate: bool = False, re_register: bool = False)

Args

queue_size
what the queue length for this call should be (maximum number of waiting tasks). Default to None means no limit. Changing this parameter affects the persistence settings in rabbitmq, so it needs to be redeclared.
fetch_size
fetch size. You need to set a reasonable value to achieve maximum performance. Although for I/O-bound tasks, as more waiting does not open more connections, they usually don't consume too many system resources under an I/O multiplexing model. However, you generally shouldn't let your application listen to too many file descriptors at the same time. Typically, maintaining the system's listening file descriptors in the range of a few hundred to a few thousand is the key to ensuring efficiency. These file descriptors can ideally be assumed to be evenly distributed across different processes, with each process evenly distributed across different calls. From this, you can infer an appropriate value for this parameter to set, which usually shouldn't be too small or too large. Of course, if your business puts significant pressure on the backend, say a complex SQL search, limiting fetch_size to a very small value is an effective way to protect the backend service. The default is None, which means get all the messages in the current queue in the ready state.
timeout
message timeout from the queue. Defaults to None. Changing this parameter affects the persistence settings in rabbitmq, so it needs to be redeclared.
validate
whether to force constraints on the type legitimacy of input parameters when a remote call occurs, a wrapper for the pydantic.validate_call decorator. Defaults to False.
re_register
whether to remove the hyperparameter in rabbitmq that the queue has been persisted and redeclare. Defaults to False.

Note

re_register should not be used in multiprocessing mode, where reclaim will cause other worker disconeected.

def trace_exception(e: Exception) ‑> str