Skip to content

tutk_ioctl_mux.py

TutkIOCtrlMux

An "IO Ctrl" interface for sending and receiving data over a control channel built into an IOTC session with a particular device.

Use this to send and receive configuration data from the camera. There are many, many commands supported by the wyze camera over this interface, though just a fraction of them have been reverse engineered at this point. See TutkWyzeProtocolMessage and its subclasses for the supported commands.

This channel is used to authenticate the client with the camera prior to streaming audio or video data.

See: wyzecam.iotc.WyzeIOTCSession.iotctrl_mux

Methods

__init__(self, tutk_platform_lib, av_chan_id) special

Initialize the mux channel.

Parameters:

Name Type Description Default
tutk_platform_lib CDLL

the underlying c library used to communicate with the wyze device; see tutk.load_library.

required
av_chan_id c_int

the channel id of the session this mux is created on.

required
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def __init__(self, tutk_platform_lib: CDLL, av_chan_id: c_int) -> None:
    """Initialize the mux channel.

    :param tutk_platform_lib: the underlying c library used to communicate with the wyze
                            device; see [tutk.load_library][wyzecam.tutk.tutk.load_library].
    :param av_chan_id: the channel id of the session this mux is created on.
    """
    self.tutk_platform_lib = tutk_platform_lib
    self.av_chan_id = av_chan_id
    self.queues: DefaultDict[
        Union[str, int], "Queue[Union[object, Tuple[int, int, int, bytes]]]"
    ] = defaultdict(Queue)
    self.listener = TutkIOCtrlMuxListener(
        tutk_platform_lib, av_chan_id, self.queues
    )

send_ioctl(self, msg, ctrl_type=256)

Send a TutkWyzeProtocolMessage to the camera.

This should be called after the listener has been started, by using the mux as a context manager:

with session.ioctrl_mux() as mux:
    result = mux.send_ioctl(msg)

Parameters:

Name Type Description Default
msg TutkWyzeProtocolMessage

The message to send to the client. See tutk_protocol.py Commands

required
ctrl_type int

used internally by the iotc library, should always be tutk.IOTYPE_USER_DEFINED_START.

256

Returns:

Type Description
TutkIOCtrlFuture

a future promise of a response from the camera. See wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlFuture

Source code in wyzecam/tutk/tutk_ioctl_mux.py
def send_ioctl(
    self,
    msg: TutkWyzeProtocolMessage,
    ctrl_type: int = tutk.IOTYPE_USER_DEFINED_START,
) -> TutkIOCtrlFuture:
    """
    Send a [TutkWyzeProtocolMessage][wyzecam.tutk.tutk_protocol.TutkWyzeProtocolMessage]
    to the camera.

    This should be called after the listener has been started, by using the mux as a context manager:

    ```python
    with session.ioctrl_mux() as mux:
        result = mux.send_ioctl(msg)
    ```

    :param msg: The message to send to the client. See
                [tutk_protocol.py Commands](../tutk_protocol_commands/)
    :param ctrl_type: used internally by the iotc library, should always be
                      `tutk.IOTYPE_USER_DEFINED_START`.

    :returns: a future promise of a response from the camera.  See [wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlFuture][]
    """
    encoded_msg = msg.encode()
    encoded_msg_header = (
        tutk_protocol.TutkWyzeProtocolHeader.from_buffer_copy(
            encoded_msg[0:16]
        )
    )
    logger.debug("SEND %s %s %s", msg, encoded_msg_header, encoded_msg[16:])
    errcode = tutk.av_send_io_ctrl(
        self.tutk_platform_lib, self.av_chan_id, ctrl_type, encoded_msg
    )
    if errcode:
        return TutkIOCtrlFuture(msg, errcode=errcode)
    if not msg.expected_response_code:
        logger.warning("no expected response code found")
        return TutkIOCtrlFuture(msg)

    return TutkIOCtrlFuture(msg, self.queues[msg.expected_response_code])

start_listening(self)

Start a separate thread listening for responses from the camera.

This is generally called by using the TutkIOCtrlMux as a context manager:

with session.ioctrl_mux() as mux:
    ...

If this method is called explicitly, remember to call stop_listening when finished.

See: wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.stop_listening

Source code in wyzecam/tutk/tutk_ioctl_mux.py
def start_listening(self) -> None:
    """Start a separate thread listening for responses from the camera.

    This is generally called by using the TutkIOCtrlMux as a context manager:

    ```python
    with session.ioctrl_mux() as mux:
        ...
    ```

    If this method is called explicitly, remember to call `stop_listening` when
    finished.

    See: [wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.stop_listening][]
    """
    self.listener.start()

stop_listening(self)

Shuts down the separate thread used for listening for responses to the camera

See: wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.start_listening

Source code in wyzecam/tutk/tutk_ioctl_mux.py
def stop_listening(self) -> None:
    """
    Shuts down the separate thread used for listening for responses to the camera

    See: [wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.start_listening][]
    """
    self.queues[CONTROL_CHANNEL].put(STOP_SENTINEL)
    self.listener.join()

waitfor(self, futures, timeout=None)

Wait for the responses of one or more TutkIOCtrlFutures.

with session.ioctrl_mux() as mux:
    f1 = mux.send_ioctl(msg)
    f2 = mux.send_ioctl(msg2)

    resp1, resp2 = mux.waitfor([f1, f2])

This allows you to wait for a set of TutkIOCtrlFutures to respond in any order, and allows you to send multiple commands to the camera without waiting for each one to return before sending another.

If you are sending one command at a time, consider using TutkIOCtrlFuture.result() directly:

with session.ioctrl_mux() as mux:
    f1 = mux.send_ioctl(msg)
    resp1 = f1.result()
    f2 = mux.send_ioctl(msg2)
    resp2 = f2.result()
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def waitfor(
    self,
    futures: Union[TutkIOCtrlFuture, List[TutkIOCtrlFuture]],
    timeout: Optional[int] = None,
) -> Union[Any, List[Any]]:
    """Wait for the responses of one or more `TutkIOCtrlFuture`s.

    ```python
    with session.ioctrl_mux() as mux:
        f1 = mux.send_ioctl(msg)
        f2 = mux.send_ioctl(msg2)

        resp1, resp2 = mux.waitfor([f1, f2])
    ```

    This allows you to wait for a set of `TutkIOCtrlFuture`s to respond in
    any order, and allows you to send multiple commands to the camera without
    waiting for each one to return before sending another.

    If you are sending one command at a time, consider using
    `TutkIOCtrlFuture.result()` directly:

    ```python
    with session.ioctrl_mux() as mux:
        f1 = mux.send_ioctl(msg)
        resp1 = f1.result()
        f2 = mux.send_ioctl(msg2)
        resp2 = f2.result()
    ```
    """
    unwrap_single_item = False
    if isinstance(futures, TutkIOCtrlFuture):
        futures = [futures]
        unwrap_single_item = True
    results = [None] * len(futures)
    start = time.time()
    while (timeout is None or time.time() - start <= timeout) and any(
        result is None for result in results
    ):
        all_success = True
        for i, future in enumerate(futures):
            if results[i] is not None:
                continue

            try:
                result = future.result(block=False)
                results[i] = result
            except Empty:
                all_success = False
        # if we don't get all of them this pass, wait a short period before checking again
        if not all_success:
            time.sleep(0.1)

    if unwrap_single_item:
        return results[0]
    else:
        return results

TutkIOCtrlMuxListener

Methods

run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Source code in wyzecam/tutk/tutk_ioctl_mux.py
def run(self) -> None:
    timeout_ms = 1000
    logger.info(f"Now listening on channel id {self.av_chan_id}")

    while True:
        try:
            control_channel_command = self.queues[
                CONTROL_CHANNEL
            ].get_nowait()
            if control_channel_command == STOP_SENTINEL:
                logger.info(
                    f"No longer listening on channel id {self.av_chan_id}"
                )
                return
        except Empty:
            pass

        actual_len, io_ctl_type, data = tutk.av_recv_io_ctrl(
            self.tutk_platform_lib, self.av_chan_id, timeout_ms
        )
        if actual_len == tutk.AV_ER_TIMEOUT:
            continue
        elif actual_len == tutk.AV_ER_SESSION_CLOSE_BY_REMOTE:
            logger.warning(
                "Connection closed by remote. Closing connection."
            )
            break
        elif actual_len < 0:
            raise tutk.TutkError(actual_len)

        header, payload = tutk_protocol.decode(data)
        logger.debug(f"RECV {header}: {repr(payload)}")

        self.queues[header.code].put(
            (actual_len, io_ctl_type, header.protocol, payload)
        )

TutkIOCtrlFuture

Holds the result of a message sent over a TutkIOCtrlMux; a TutkIOCtrlFuture is returned by [TutkIOCtrlMux.send_ioctl][wyzecam.tutk.tutk_ioctrl_mox.TutkIOCtrlMux.send_ioctl], and represents the value of a future response from the camera. The actual contents of this response should be retrieved by calling result(), below.

Attributes:

Name Type Description
req TutkWyzeProtocolMessage

The message sent to the camera that we are waiting for a response from

errcode Optional[c_int]

The resultant error code associated with this response

resp_protocol Optional[int]

The 2-byte protocol version of the header of the response

resp_data Optional[bytes]

The raw message sent from the camera to the client

Methods

result(self, block=True, timeout=10000)

Wait until the camera has responded to our message, and return the result.

Parameters:

Name Type Description Default
block bool

wait until the camera has responded, or the timeout has been reached. if False, returns immediately if we have already recieved a response, otherwise raise queue.Empty.

True
timeout int

the maximum number of milliseconds to wait for the response from the camera, after which queue.Empty will be raised.

10000

Returns:

Type Description
Optional[Any]

the result of TutkWyzeProtocolMessage.parse_response for the appropriate message.

Source code in wyzecam/tutk/tutk_ioctl_mux.py
def result(self, block: bool = True, timeout: int = 10000) -> Optional[Any]:
    """
    Wait until the camera has responded to our message, and return the result.

    :param block: wait until the camera has responded, or the timeout has been reached.
                  if False, returns immediately if we have already recieved a response,
                  otherwise raise queue.Empty.
    :param timeout: the maximum number of milliseconds to wait for the response
                    from the camera, after which queue.Empty will be raised.
    :returns: the result of [`TutkWyzeProtocolMessage.parse_response`][wyzecam.tutk.tutk_protocol.TutkWyzeProtocolMessage.parse_response]
              for the appropriate message.
    """
    if self.resp_data is not None:
        return self.req.parse_response(self.resp_data)
    if self.errcode:
        raise tutk.TutkError(self.errcode)
    if self.expected_response_code is None:
        logger.warning("no response code!")
        return None
    assert self.queue is not None, "Future created without error nor queue!"

    msg = self.queue.get(block=block, timeout=timeout)
    assert isinstance(
        msg, tuple
    ), "Expected a iotc result, instead got sentinel!"
    actual_len, io_ctl_type, resp_protocol, data = msg

    if actual_len < 0:
        raise tutk.TutkError(self.errcode)

    self.io_ctl_type = io_ctl_type
    self.resp_protocol = resp_protocol
    self.resp_data = data

    return self.req.parse_response(data)