Module rabibridge.utils
Functions
def decode_pwd(cipher: Optional[str], secret: Optional[str], decrypt_function: Callable[[bytes, bytes], bytes]) ‑> Optional[str]
def dynamic_load_module(file_path: pathlib.Path)
-
Args
file_path
- the file path of the module.
Note
This function is used to dynamically load a module and execute it. The module name is the same as the file name without the suffix. The module is loaded into the
sys.modules
dictionary.This function should NOT be called explicitly by the user, but the user should ensure that the plugin filename can be legally registered and does not duplicate existing libraries.
def encode_pwd(pwd: Optional[str], secret: Optional[str], encrypt_function: Callable[[bytes, bytes], bytes]) ‑> Optional[str]
def get_config_val(config: dict, k1: str, k2: str) ‑> Any
def list_main_functions(global_symbols: dict[str, object], banned_names: list[str] = []) ‑> Generator[Tuple[str, str, object], None, None]
def load_config() ‑> dict
-
Args
None
Returns
dict
- the configuration dictionary.
Note
The configuration file is located in the
config
folder of the project root directory, namedenv.toml
. def multiprocess_spawn_helper(num_processes: Optional[int], single_process: Callable[..., Any], *, bind_core: Optional[bool] = False)
-
A simple function to help you use the multiprocessing module to expand a copy of a child process in each core. We recommend reading the source code directly if you want to understand the details.
Args
num_processes
- the number of processes to spawn. Input
None
means the number of logical cores. single_process
- the function to be executed in each process.
bind_core
- whether to bind the process to the core. If your number of deployments is equal to the number of cores and your business is under pressure, turning on this option is good for avoiding register overhead due to core switching and can slightly improve performance. Most of the time it is not recommended to turn on. Defaults to
False
.
def register_call(queue_size: Optional[int] = None, fetch_size: Optional[int] = None, timeout: Optional[int] = None, *, validate: bool = False, re_register: bool = False)
-
Args
queue_size
- what the queue length for this call should be (maximum number of waiting tasks). Default to
None
means no limit. Changing this parameter affects the persistence settings in rabbitmq, so it needs to be redeclared. fetch_size
- fetch size. You need to set a reasonable value to achieve maximum performance. Although for I/O-bound tasks, as more waiting does not open more connections, they usually don't consume too many system resources under an I/O multiplexing model. However, you generally shouldn't let your application listen to too many file descriptors at the same time. Typically, maintaining the system's listening file descriptors in the range of a few hundred to a few thousand is the key to ensuring efficiency. These file descriptors can ideally be assumed to be evenly distributed across different processes, with each process evenly distributed across different calls. From this, you can infer an appropriate value for this parameter to set, which usually shouldn't be too small or too large. Of course, if your business puts significant pressure on the backend, say a complex SQL search, limiting
fetch_size
to a very small value is an effective way to protect the backend service. The default isNone
, which means get all the messages in the current queue in the ready state. timeout
- message timeout from the queue. Defaults to
None
. Changing this parameter affects the persistence settings in rabbitmq, so it needs to be redeclared. validate
- whether to force constraints on the type legitimacy of input parameters when a remote call occurs, a wrapper for the
pydantic.validate_call
decorator. Defaults toFalse
. re_register
- whether to remove the hyperparameter in rabbitmq that the queue has been persisted and redeclare. Defaults to
False
.
Note
re_register should not be used in multiprocessing mode, where reclaim will cause other worker disconeected.
def trace_exception(e: Exception) ‑> str