Utilities

async async_waiting_for(condition: Callable[[], bool], check_period: Union[int, float] = 0.1, timeout: Optional[Union[int, float]] = None, timeout_raise: Optional[bool] = False) bool
clear_queue(input_queue: queue.Queue)

Clear a queue. :param input_queue: Queue to be cleared. :type input_queue: queue.Queue

create_payload(signal: enum.Enum, data: Any, msg_uuid: str = '273884db-0861-4e8e-b4bb-c4984838b31f', timestamp: datetime.timedelta = datetime.timedelta(0), ok: bool = False) Dict[str, Any]
decode_payload(data: str) Dict[str, Any]
get_ip_address() str
get_open_port(start_port: int) socket.socket
class logging_tqdm(*_, **__)
display(msg=None, pos=None)

Use self.sp to display msg in the specified pos.

Consider overloading this function when inheriting to use e.g.: self.some_frontend(**self.format_dict) instead of self.sp.

Parameters
  • msg (str, optional. What to display (default: repr(self)).) –

  • pos (int, optional. Position to moveto) – (default: abs(self.pos)).

property logger
waiting_for(condition: Callable[[], bool], check_period: Union[int, float] = 0.1, timeout: Optional[Union[int, float]] = None, timeout_raise: Optional[bool] = False) bool