API Reference

DataChunk

class DataChunk
__init__()
add(name: str, value: Any, content_type: Literal['image', 'other'] = 'other')

Add a new record to the DataChunk instance.

The important parameter here is the content_type, as this will affect the execution speed and real-time ability of a pipeline. As of now, we only have two options: image and other, as these are the provided serialization methods.

When sending an image (a numpy array), use the image option. As for anything else, use the other option until further notice.

Parameters
  • name (str) – The name to the record.

  • value (Any) – The contents to be stored with the name key.

  • content_type (Literal["image", "other"]) – Specifying the content type to help serialization and compression efficiency.

get(name: str) Dict[str, Any]

Extract the record given a name.

Parameters

name (str) – The requested key name.

Returns

Returns a record, stored as a dictionary, with the following attributes: value, content-type and ownership. Mostly you will only need to use value.

Return type

Dict[str, Any]

update(name: str, record: Dict[str, Any])

Overwrite record with a new one, deletes previous meta data.

Parameters
  • name (str) – The name of the record

  • record (Dict[str, Any]) – The new record to overwrite the pre-existing one.

Node

class Node(name: str, debug: Optional[Literal['step', 'stream']] = None, debug_port: Optional[int] = None)
__init__(name: str, debug: Optional[Literal['step', 'stream']] = None, debug_port: Optional[int] = None)

Create a basic unit of computation in ChimeraPy.

A node has three main functions that can be overwritten to add desired behavior: prep, step, and teardown. You don’t require them all if not necessary. The step function is executed within a while loop, when new inputs are available (if inputs are specified in the graph).

If the step function is too restrictive, the main (containing the while loop) can be overwritten instead.

Parameters

name (str) – The name that will later used to refer to the Node.

prep()

User-defined method for Node setup.

In this method, the setup logic of the Node is executed. This would include opening files, creating connections to sensors, and calibrating sensors.

step(data_chunks: Dict[str, chimerapy.networking.data_chunk.DataChunk] = {}) Union[chimerapy.networking.data_chunk.DataChunk, Any]

User-define method.

In this method, the logic that is executed within the Node’s while loop. For data sources (no inputs), the step method will execute as fast as possible; therefore, it is important to add time.sleep to specify the sampling rate.

For a Node that have inputs, these will be executed when new data is received.

Parameters

data_chunks (Optional[Dict[str, DataChunk]]) – For source nodes, this parameter should not be considered (as they don’t have inputs). For step and sink nodes, the data_dict must be included to avoid an error. The variable is a dictionary, where the key is the in-bound Node’s name and the value is the output of the in-bound Node’s step function.

main()

User-possible overwritten method.

This method can also be overwritten, through it is recommend to do so carefully. If overwritten, the handling of inputs will have to be implemented as well.

One can have access to this information from self.in_bound_data, and self.new_data_available attributes.

teardown()

User-define method.

This method provides a convienient way to shutdown services, such as closing files, signaling to sensors to stop, and making any last minute corrections to the data.

Graph

class Graph(g: networkx.classes.digraph.DiGraph = <networkx.classes.digraph.DiGraph object>)
is_valid()

Checks if Graph is a true DAG.

plot(font_size: int = 30, node_size: int = 5000)

Plotting the Graph to visualize data pipeline.

This visualization tool uses matplotlib and networkx to show the Nodes and their edges.

Parameters
  • font_size (int) – Font size

  • node_size (int) – Node size

Worker

class Worker(name: str, port: int = 10000, delete_temp: bool = True, id: Optional[str] = None)
__init__(name: str, port: int = 10000, delete_temp: bool = True, id: Optional[str] = None)

Create a local Worker.

To execute Nodes within the main computer that is also housing the Manager, it will require a Worker as well. Therefore, it is common to create a Worker and a Manager within the same computer.

To create a worker in another machine, you will have to use the following command (in the other machine’s terminal):

>>> cp-worker --ip <manager's IP> --port <manager's port> --name <name>
Parameters
  • name (str) – The name for the Worker that will be used as reference.

  • port (int) – The port of the Worker’s HTTP server. Select 0 for a random port, mostly when running multiple Worker instances in the same computer.

  • delete_temp (bool) – After session is over, should the Worker delete any of the temporary files.

connect(host: str, port: int, timeout: Union[int, float] = 10.0) bool

Connect Worker to Manager.

This establish server-client connections between Worker and Manager. To ensure that the connections are close correctly, either the Manager or Worker should shutdown before stopping your program to avoid processes and threads that do not shutdown.

Parameters
  • host (str) – The Manager’s IP address.

  • port (int) – The Manager’s port number

  • timeout (Union[int, float]) – Set timeout for the connection.

Returns

Success in connecting to the Manager

Return type

bool

shutdown(msg: Dict = {})

Shutdown Worker safely.

The Worker needs to shutdown its server, client and Nodes in a safe manner, such as setting flag variables and clearing out queues.

Parameters

msg (Dict) – Leave empty, required to work when Manager sends shutdown message to Worker.

Manager

class Manager(logdir: Union[pathlib.Path, str], port: int = 9000, max_num_of_workers: int = 50, publish_logs_via_zmq: bool = False, dashboard_api: bool = True, **kwargs)
__init__(logdir: Union[pathlib.Path, str], port: int = 9000, max_num_of_workers: int = 50, publish_logs_via_zmq: bool = False, dashboard_api: bool = True, **kwargs)

Create Manager, the controller of the cluster.

The Manager is the director of the cluster, such as adding new computers, providing roles, starting and stopping data collection, and shutting down the system.

Parameters
  • port (int) – Referred port, might return a different one based on availablity.

  • max_num_of_workers (int) – max_num_of_workers

  • publish_logs_via_zmq (bool, optional) – Whether to publish logs via ZMQ. Defaults to False.

  • dashboard_api (bool) – Enable front-end API entrypoints to controll cluster. Defaults to True.

  • **kwargs – Additional keyword arguments. Currently, this is used to configure the ZMQ log handler.

commit_graph(graph: chimerapy.graph.Graph, mapping: Dict[str, List[str]], send_packages: Optional[List[Dict[str, Any]]] = None) bool

Committing Graph to the cluster.

Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.

In peer creation, the Manager messages each Worker with the Nodes they need to execute. The Workers configure the Nodes, by giving them network information. The Nodes are then started and report back to the Workers.

With the successful creation of the Nodes, the Manager request the Nodes servers’ ip address and port numbers to create an address table for all the Nodes. Then this table is used to inform each Node where their in-bound and out-bound Nodes are located; thereby establishing the edges between Nodes.

Parameters
  • graph (cp.Graph) – The graph to deploy within the cluster.

  • mapping (Dict[str, List[str]) – Mapping from cp.Worker to cp.Nodes through a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.

  • send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:str and path:pathlit.Path.

Returns

Success in cluster’s setup

Return type

bool

step() bool

Cluster step execution for offline operation.

The step function is for careful but slow operation of the cluster. For online execution, start and stop are the methods to be used.

Returns

Success of step function broadcasting

Return type

bool

start()

Start the executiong of the cluster.

Before starting, make sure that you have perform the following steps:

  • Create Nodes

  • Create DAG with Nodes and their edges

  • Connect Workers (must be before committing Graph)

  • Register, map, and commit Graph

stop()

Stop the executiong of the cluster.

Do not forget that you still need to execute shutdown to properly shutdown processes, threads, and queues.

shutdown()

Proper shutting down ChimeraPy cluster.

Through this method, the Manager broadcast to all Workers to shutdown, in which they will stop their processes and threads safely.