Utilities¶
- async async_waiting_for(condition: Callable[[], bool], check_period: Union[int, float] = 0.1, timeout: Optional[Union[int, float]] = None, timeout_raise: Optional[bool] = False) bool¶
- clear_queue(input_queue: queue.Queue)¶
Clear a queue. :param input_queue: Queue to be cleared. :type input_queue: queue.Queue
- create_payload(signal: enum.Enum, data: Any, msg_uuid: str = '273884db-0861-4e8e-b4bb-c4984838b31f', timestamp: datetime.timedelta = datetime.timedelta(0), ok: bool = False) Dict[str, Any]¶
- decode_payload(data: str) Dict[str, Any]¶
- get_ip_address() str¶
- get_open_port(start_port: int) socket.socket¶
- class logging_tqdm(*_, **__)¶
- display(msg=None, pos=None)¶
Use self.sp to display msg in the specified pos.
Consider overloading this function when inheriting to use e.g.: self.some_frontend(**self.format_dict) instead of self.sp.
- Parameters
msg (str, optional. What to display (default: repr(self)).) –
pos (int, optional. Position to moveto) – (default: abs(self.pos)).
- property logger¶
- waiting_for(condition: Callable[[], bool], check_period: Union[int, float] = 0.1, timeout: Optional[Union[int, float]] = None, timeout_raise: Optional[bool] = False) bool¶