Actors¶
Manager¶
- class Manager(logdir: Union[pathlib.Path, str], port: int = 9000, max_num_of_workers: int = 50, publish_logs_via_zmq: bool = False, dashboard_api: bool = True, **kwargs)¶
- broadcast_request(htype: Literal['get', 'post'], route: str, data: Any = {}, timeout: Union[int, float] = 55) bool¶
- collect(unzip: bool = True) bool¶
Collect data from the Workers
First, we wait until all the Nodes have finished save their data. Then, manager request that Nodes’ from the Workers.
- Parameters
unzip (bool) – Should the .zip archives be extracted.
- Returns
Success in collect data from Workers
- Return type
bool
- commit_graph(graph: chimerapy.graph.Graph, mapping: Dict[str, List[str]], send_packages: Optional[List[Dict[str, Any]]] = None) bool¶
Committing
Graphto the cluster.Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.
In peer creation, the
Managermessages eachWorkerwith theNodesthey need to execute. TheWorkersconfigure theNodes, by giving them network information. TheNodesare then started and report back to theWorkers.With the successful creation of the
Nodes, theManagerrequest theNodesservers’ ip address and port numbers to create an address table for all theNodes. Then this table is used to inform eachNodewhere their in-bound and out-boundNodesare located; thereby establishing the edges betweenNodes.- Parameters
graph (cp.Graph) – The graph to deploy within the cluster.
mapping (Dict[str, List[str]) – Mapping from
cp.Workertocp.Nodesthrough a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:
strand path:pathlit.Path.
- Returns
Success in cluster’s setup
- Return type
bool
- create_p2p_network() bool¶
Create the P2P Nodes in the Network
This routine only creates the Node via the Workers but doesn’t establish the connections.
- Returns
Success in creating the P2P Nodes
- Return type
bool
- async deregister_worker(request: aiohttp.web_request.Request)¶
- distribute_packages(packages_meta: List[Dict[str, Any]]) bool¶
Distribute packages to Workers
- Parameters
packages_meta (List[Dict[str, Any]]) – A specification of the package. It’s a list of packages where each element is a configuration of the package, using a Dictionary with the following keys:
nameandpath.- Returns
Success in cluster’s workers loading the distributed package.
- Return type
bool
- gather() Dict¶
- property host: str¶
- map_graph(worker_graph_map: Dict[str, List[str]])¶
Mapping
Nodefrom graph toWorkerfrom cluster.The mapping, a dictionary, informs ChimeraPy which
Workeris going to execute which ``Node``s.- Parameters
worker_graph_map (Dict[str, List[str]]) – The keys are the
Worker’s name and the values should be a list of theNode’s names that will be executed within its correspondingWorkerkey.
- property port: int¶
- register_graph(graph: chimerapy.graph.Graph)¶
Verifying that a Graph is valid, that is a DAG.
In ChimeraPy, cycle are not allowed, this is to avoid a deadlock. Registering the graph is the first step to setting up the data pipeline in the cluster.
- Parameters
graph (Graph) – A directed acyclic graph.
- async register_worker(request: aiohttp.web_request.Request)¶
- request_connection_creation(worker_id: str) bool¶
Request establishing the connections between Nodes
This routine the Manager sends the Node’s server data and request for Workers to organize their own nodes.
- Returns
Returns if connection creation was successful
- Return type
bool
- request_node_creation(worker_id: str, node_id: str) bool¶
Request creating a Node from the Graph.
- Parameters
worker_id (str) – The targetted Worker
node_id (str) – The id of the node to create, that is in in the graph
- Returns
Success in creating the Node
- Return type
bool
- request_node_server_data(worker_id: str) bool¶
Request Workers to provide information about Node’s PUBs
- Returns
Success of obtaining the node server data
- Return type
bool
- save_meta()¶
- setup_p2p_connections() bool¶
Setting up the connections between p2p nodes
- Returns
Success in creating the connections
- Return type
bool
- shutdown()¶
Proper shutting down ChimeraPy cluster.
Through this method, the
Managerbroadcast to allWorkersto shutdown, in which they will stop their processes and threads safely.
- start()¶
Start the executiong of the cluster.
Before starting, make sure that you have perform the following steps:
Create
NodesCreate
DAGwithNodesand their edgesConnect
Workers(must be before committingGraph)Register, map, and commit
Graph
- step() bool¶
Cluster step execution for offline operation.
The
stepfunction is for careful but slow operation of the cluster. For online execution,startandstopare the methods to be used.- Returns
Success of step function broadcasting
- Return type
bool
- stop()¶
Stop the executiong of the cluster.
Do not forget that you still need to execute
shutdownto properly shutdown processes, threads, and queues.
- async update_nodes_status(request: aiohttp.web_request.Request)¶
- property workers: Dict[str, chimerapy.states.WorkerState]¶
Worker¶
- class Worker(name: str, port: int = 10000, delete_temp: bool = True, id: Optional[str] = None)¶
- async async_create_node(request: Optional[aiohttp.web_request.Request] = None, node_config: Optional[Dict[str, Any]] = None)¶
- async async_shutdown(request: aiohttp.web_request.Request)¶
- async async_start_nodes(request: aiohttp.web_request.Request)¶
- async async_step(request: aiohttp.web_request.Request)¶
- async async_stop_nodes(request: aiohttp.web_request.Request)¶
- connect(host: str, port: int, timeout: Union[int, float] = 10.0) bool¶
Connect
WorkertoManager.This establish server-client connections between
WorkerandManager. To ensure that the connections are close correctly, either theManagerorWorkershould shutdown before stopping your program to avoid processes and threads that do not shutdown.- Parameters
host (str) – The
Manager’s IP address.port (int) – The
Manager’s port numbertimeout (Union[int, float]) – Set timeout for the connection.
- Returns
Success in connecting to the Manager
- Return type
bool
- create_node(msg: Dict[str, Any])¶
- create_node_server_data()¶
- deregister()¶
- exec_coro(coro: Coroutine)¶
- property id: str¶
- idle()¶
- property ip: str¶
- async load_sent_packages(request: aiohttp.web_request.Request)¶
- property name: str¶
- async node_report_gather(msg: Dict, ws: aiohttp.web_ws.WebSocketResponse)¶
- async node_status_update(msg: Dict, ws: aiohttp.web_ws.WebSocketResponse)¶
- property nodes: Dict[str, chimerapy.states.NodeState]¶
- property port: int¶
- async process_node_server_data(request: aiohttp.web_request.Request)¶
- async report_node_gather(request: aiohttp.web_request.Request)¶
- async report_node_saving(request: aiohttp.web_request.Request)¶
- async report_node_server_data(request: aiohttp.web_request.Request)¶
- async send_archive(request: aiohttp.web_request.Request)¶
- shutdown(msg: Dict = {})¶
Shutdown
Workersafely.The
Workerneeds to shutdown its server, client andNodesin a safe manner, such as setting flag variables and clearing out queues.- Parameters
msg (Dict) – Leave empty, required to work when
Managersends shutdown message toWorker.
- start_nodes()¶
- step()¶
- stop_nodes()¶
Node¶
- class Node(name: str, debug: Optional[Literal['step', 'stream']] = None, debug_port: Optional[int] = None)¶
- async async_forward(msg: Dict)¶
- config(host: str, port: int, logdir: pathlib.Path, in_bound: List[str], in_bound_by_name: List[str], out_bound: List[str], follow: Optional[str] = None, networking: bool = True, logging_level: int = 20, worker_logging_port: int = 5555)¶
Configuring the
Node’s networking and meta data.This function does not create the connections between the
Nodeand theWorker, as that is done in the_prepmethod. It just inplants the networking meta from theWorkerto theNode. This is because we have to instantiate theServer,Client, and other components of theNodeinside therunmethod.- Parameters
host (str) – Worker’s host
port (int) – Worker’s port
in_bound (List[str]) – List of node names that will be inputs
out_bound (List[str]) – List of node names that will be sent the output
networking (bool) – Optional deselect networking setup (used in testing)
- forward(msg: Dict)¶
- get_logger() logging.Logger¶
- property id: str¶
- main()¶
User-possible overwritten method.
This method can also be overwritten, through it is recommend to do so carefully. If overwritten, the handling of inputs will have to be implemented as well.
One can have access to this information from
self.in_bound_data, andself.new_data_availableattributes.
- property name: str¶
- poll_inputs()¶
- prep()¶
User-defined method for
Nodesetup.In this method, the setup logic of the
Nodeis executed. This would include opening files, creating connections to sensors, and calibrating sensors.
- async process_node_server_data(msg: Dict)¶
- async provide_gather(msg: Dict)¶
- async provide_saving(msg: Dict)¶
- ready()¶
- run()¶
The actual method that is executed in the new process.
When working with
multiprocessing.Process, it should be considered that the creation of a new process can yield unexpected behavior if not carefull. It is recommend that one reads themutliprocessingdocumentation to understand the implications.
- save_audio(name: str, data: numpy.ndarray, channels: int, format: int, rate: int)¶
- save_image(name: str, data: numpy.ndarray)¶
- save_tabular(name: str, data: Union[pandas.core.frame.DataFrame, Dict[str, Any], pandas.core.series.Series])¶
- save_video(name: str, data: numpy.ndarray, fps: int)¶
- shutdown(msg: Dict = {})¶
- async start_node(msg: Dict)¶
- step(data_chunks: Dict[str, chimerapy.networking.data_chunk.DataChunk] = {}) Union[chimerapy.networking.data_chunk.DataChunk, Any]¶
User-define method.
In this method, the logic that is executed within the
Node’s while loop. For data sources (no inputs), thestepmethod will execute as fast as possible; therefore, it is important to addtime.sleepto specify the sampling rate.For a
Nodethat have inputs, these will be executed when new data is received.- Parameters
data_chunks (Optional[Dict[str, DataChunk]]) – For source nodes, this parameter should not be considered (as they don’t have inputs). For step and sink nodes, the
data_dictmust be included to avoid an error. The variable is a dictionary, where the key is the in-boundNode’s name and the value is the output of the in-boundNode’sstepfunction.
- async stop_node(msg: Dict)¶
- teardown()¶
User-define method.
This method provides a convienient way to shutdown services, such as closing files, signaling to sensors to stop, and making any last minute corrections to the data.
- waiting()¶
Graph¶
- class Graph(g: networkx.classes.digraph.DiGraph = <networkx.classes.digraph.DiGraph object>)¶
- add_edge(src: chimerapy.node.Node, dst: chimerapy.node.Node, follow: bool = False)¶
- add_edges_from(list_of_edges: Sequence[Sequence[chimerapy.node.Node]])¶
- add_node(node: chimerapy.node.Node)¶
- add_nodes_from(nodes: Sequence[chimerapy.node.Node])¶
- get_id_by_name(node_name: str)¶
- get_layers_and_pos()¶
- has_node_by_id(node_id: str)¶
- is_valid()¶
Checks if
Graphis a true DAG.
- plot(font_size: int = 30, node_size: int = 5000)¶
Plotting the
Graphto visualize data pipeline.This visualization tool uses
matplotlibandnetworkxto show theNodesand their edges.- Parameters
font_size (int) – Font size
node_size (int) – Node size