Actors

Manager

class Manager(logdir: Union[pathlib.Path, str], port: int = 9000, max_num_of_workers: int = 50, publish_logs_via_zmq: bool = False, dashboard_api: bool = True, **kwargs)
broadcast_request(htype: Literal['get', 'post'], route: str, data: Any = {}, timeout: Union[int, float] = 55) bool
collect(unzip: bool = True) bool

Collect data from the Workers

First, we wait until all the Nodes have finished save their data. Then, manager request that Nodes’ from the Workers.

Parameters

unzip (bool) – Should the .zip archives be extracted.

Returns

Success in collect data from Workers

Return type

bool

commit_graph(graph: chimerapy.graph.Graph, mapping: Dict[str, List[str]], send_packages: Optional[List[Dict[str, Any]]] = None) bool

Committing Graph to the cluster.

Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.

In peer creation, the Manager messages each Worker with the Nodes they need to execute. The Workers configure the Nodes, by giving them network information. The Nodes are then started and report back to the Workers.

With the successful creation of the Nodes, the Manager request the Nodes servers’ ip address and port numbers to create an address table for all the Nodes. Then this table is used to inform each Node where their in-bound and out-bound Nodes are located; thereby establishing the edges between Nodes.

Parameters
  • graph (cp.Graph) – The graph to deploy within the cluster.

  • mapping (Dict[str, List[str]) – Mapping from cp.Worker to cp.Nodes through a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.

  • send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:str and path:pathlit.Path.

Returns

Success in cluster’s setup

Return type

bool

create_p2p_network() bool

Create the P2P Nodes in the Network

This routine only creates the Node via the Workers but doesn’t establish the connections.

Returns

Success in creating the P2P Nodes

Return type

bool

async deregister_worker(request: aiohttp.web_request.Request)
distribute_packages(packages_meta: List[Dict[str, Any]]) bool

Distribute packages to Workers

Parameters

packages_meta (List[Dict[str, Any]]) – A specification of the package. It’s a list of packages where each element is a configuration of the package, using a Dictionary with the following keys: name and path.

Returns

Success in cluster’s workers loading the distributed package.

Return type

bool

gather() Dict
property host: str
map_graph(worker_graph_map: Dict[str, List[str]])

Mapping Node from graph to Worker from cluster.

The mapping, a dictionary, informs ChimeraPy which Worker is going to execute which ``Node``s.

Parameters

worker_graph_map (Dict[str, List[str]]) – The keys are the Worker’s name and the values should be a list of the Node’s names that will be executed within its corresponding Worker key.

property port: int
register_graph(graph: chimerapy.graph.Graph)

Verifying that a Graph is valid, that is a DAG.

In ChimeraPy, cycle are not allowed, this is to avoid a deadlock. Registering the graph is the first step to setting up the data pipeline in the cluster.

Parameters

graph (Graph) – A directed acyclic graph.

async register_worker(request: aiohttp.web_request.Request)
request_connection_creation(worker_id: str) bool

Request establishing the connections between Nodes

This routine the Manager sends the Node’s server data and request for Workers to organize their own nodes.

Returns

Returns if connection creation was successful

Return type

bool

request_node_creation(worker_id: str, node_id: str) bool

Request creating a Node from the Graph.

Parameters
  • worker_id (str) – The targetted Worker

  • node_id (str) – The id of the node to create, that is in in the graph

Returns

Success in creating the Node

Return type

bool

request_node_server_data(worker_id: str) bool

Request Workers to provide information about Node’s PUBs

Returns

Success of obtaining the node server data

Return type

bool

save_meta()
setup_p2p_connections() bool

Setting up the connections between p2p nodes

Returns

Success in creating the connections

Return type

bool

shutdown()

Proper shutting down ChimeraPy cluster.

Through this method, the Manager broadcast to all Workers to shutdown, in which they will stop their processes and threads safely.

start()

Start the executiong of the cluster.

Before starting, make sure that you have perform the following steps:

  • Create Nodes

  • Create DAG with Nodes and their edges

  • Connect Workers (must be before committing Graph)

  • Register, map, and commit Graph

step() bool

Cluster step execution for offline operation.

The step function is for careful but slow operation of the cluster. For online execution, start and stop are the methods to be used.

Returns

Success of step function broadcasting

Return type

bool

stop()

Stop the executiong of the cluster.

Do not forget that you still need to execute shutdown to properly shutdown processes, threads, and queues.

async update_nodes_status(request: aiohttp.web_request.Request)
property workers: Dict[str, chimerapy.states.WorkerState]

Worker

class Worker(name: str, port: int = 10000, delete_temp: bool = True, id: Optional[str] = None)
async async_create_node(request: Optional[aiohttp.web_request.Request] = None, node_config: Optional[Dict[str, Any]] = None)
async async_shutdown(request: aiohttp.web_request.Request)
async async_start_nodes(request: aiohttp.web_request.Request)
async async_step(request: aiohttp.web_request.Request)
async async_stop_nodes(request: aiohttp.web_request.Request)
connect(host: str, port: int, timeout: Union[int, float] = 10.0) bool

Connect Worker to Manager.

This establish server-client connections between Worker and Manager. To ensure that the connections are close correctly, either the Manager or Worker should shutdown before stopping your program to avoid processes and threads that do not shutdown.

Parameters
  • host (str) – The Manager’s IP address.

  • port (int) – The Manager’s port number

  • timeout (Union[int, float]) – Set timeout for the connection.

Returns

Success in connecting to the Manager

Return type

bool

create_node(msg: Dict[str, Any])
create_node_server_data()
deregister()
exec_coro(coro: Coroutine)
property id: str
idle()
property ip: str
async load_sent_packages(request: aiohttp.web_request.Request)
property name: str
async node_report_gather(msg: Dict, ws: aiohttp.web_ws.WebSocketResponse)
async node_status_update(msg: Dict, ws: aiohttp.web_ws.WebSocketResponse)
property nodes: Dict[str, chimerapy.states.NodeState]
property port: int
async process_node_server_data(request: aiohttp.web_request.Request)
async report_node_gather(request: aiohttp.web_request.Request)
async report_node_saving(request: aiohttp.web_request.Request)
async report_node_server_data(request: aiohttp.web_request.Request)
async send_archive(request: aiohttp.web_request.Request)
shutdown(msg: Dict = {})

Shutdown Worker safely.

The Worker needs to shutdown its server, client and Nodes in a safe manner, such as setting flag variables and clearing out queues.

Parameters

msg (Dict) – Leave empty, required to work when Manager sends shutdown message to Worker.

start_nodes()
step()
stop_nodes()

Node

class Node(name: str, debug: Optional[Literal['step', 'stream']] = None, debug_port: Optional[int] = None)
async async_forward(msg: Dict)
config(host: str, port: int, logdir: pathlib.Path, in_bound: List[str], in_bound_by_name: List[str], out_bound: List[str], follow: Optional[str] = None, networking: bool = True, logging_level: int = 20, worker_logging_port: int = 5555)

Configuring the Node’s networking and meta data.

This function does not create the connections between the Node and the Worker, as that is done in the _prep method. It just inplants the networking meta from the Worker to the Node. This is because we have to instantiate the Server, Client, and other components of the Node inside the run method.

Parameters
  • host (str) – Worker’s host

  • port (int) – Worker’s port

  • in_bound (List[str]) – List of node names that will be inputs

  • out_bound (List[str]) – List of node names that will be sent the output

  • networking (bool) – Optional deselect networking setup (used in testing)

forward(msg: Dict)
get_logger() logging.Logger
property id: str
main()

User-possible overwritten method.

This method can also be overwritten, through it is recommend to do so carefully. If overwritten, the handling of inputs will have to be implemented as well.

One can have access to this information from self.in_bound_data, and self.new_data_available attributes.

property name: str
poll_inputs()
prep()

User-defined method for Node setup.

In this method, the setup logic of the Node is executed. This would include opening files, creating connections to sensors, and calibrating sensors.

async process_node_server_data(msg: Dict)
async provide_gather(msg: Dict)
async provide_saving(msg: Dict)
ready()
run()

The actual method that is executed in the new process.

When working with multiprocessing.Process, it should be considered that the creation of a new process can yield unexpected behavior if not carefull. It is recommend that one reads the mutliprocessing documentation to understand the implications.

save_audio(name: str, data: numpy.ndarray, channels: int, format: int, rate: int)
save_image(name: str, data: numpy.ndarray)
save_tabular(name: str, data: Union[pandas.core.frame.DataFrame, Dict[str, Any], pandas.core.series.Series])
save_video(name: str, data: numpy.ndarray, fps: int)
shutdown(msg: Dict = {})
async start_node(msg: Dict)
step(data_chunks: Dict[str, chimerapy.networking.data_chunk.DataChunk] = {}) Union[chimerapy.networking.data_chunk.DataChunk, Any]

User-define method.

In this method, the logic that is executed within the Node’s while loop. For data sources (no inputs), the step method will execute as fast as possible; therefore, it is important to add time.sleep to specify the sampling rate.

For a Node that have inputs, these will be executed when new data is received.

Parameters

data_chunks (Optional[Dict[str, DataChunk]]) – For source nodes, this parameter should not be considered (as they don’t have inputs). For step and sink nodes, the data_dict must be included to avoid an error. The variable is a dictionary, where the key is the in-bound Node’s name and the value is the output of the in-bound Node’s step function.

async stop_node(msg: Dict)
teardown()

User-define method.

This method provides a convienient way to shutdown services, such as closing files, signaling to sensors to stop, and making any last minute corrections to the data.

waiting()

Graph

class Graph(g: networkx.classes.digraph.DiGraph = <networkx.classes.digraph.DiGraph object>)
add_edge(src: chimerapy.node.Node, dst: chimerapy.node.Node, follow: bool = False)
add_edges_from(list_of_edges: Sequence[Sequence[chimerapy.node.Node]])
add_node(node: chimerapy.node.Node)
add_nodes_from(nodes: Sequence[chimerapy.node.Node])
get_id_by_name(node_name: str)
get_layers_and_pos()
has_node_by_id(node_id: str)
is_valid()

Checks if Graph is a true DAG.

plot(font_size: int = 30, node_size: int = 5000)

Plotting the Graph to visualize data pipeline.

This visualization tool uses matplotlib and networkx to show the Nodes and their edges.

Parameters
  • font_size (int) – Font size

  • node_size (int) – Node size