tutk_ioctl_mux.py
TutkIOCtrlMux
An "IO Ctrl" interface for sending and receiving data over a control channel built into an IOTC session with a particular device.
Use this to send and receive configuration data from the camera. There are many, many commands supported by the wyze camera over this interface, though just a fraction of them have been reverse engineered at this point. See TutkWyzeProtocolMessage and its subclasses for the supported commands.
This channel is used to authenticate the client with the camera prior to streaming audio or video data.
See: wyzecam.iotc.WyzeIOTCSession.iotctrl_mux
Methods
__init__(self, tutk_platform_lib, av_chan_id)
special
Initialize the mux channel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tutk_platform_lib |
CDLL |
the underlying c library used to communicate with the wyze device; see tutk.load_library. |
required |
av_chan_id |
c_int |
the channel id of the session this mux is created on. |
required |
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def __init__(self, tutk_platform_lib: CDLL, av_chan_id: c_int) -> None:
"""Initialize the mux channel.
:param tutk_platform_lib: the underlying c library used to communicate with the wyze
device; see [tutk.load_library][wyzecam.tutk.tutk.load_library].
:param av_chan_id: the channel id of the session this mux is created on.
"""
self.tutk_platform_lib = tutk_platform_lib
self.av_chan_id = av_chan_id
self.queues: DefaultDict[
Union[str, int], "Queue[Union[object, Tuple[int, int, int, bytes]]]"
] = defaultdict(Queue)
self.listener = TutkIOCtrlMuxListener(
tutk_platform_lib, av_chan_id, self.queues
)
send_ioctl(self, msg, ctrl_type=256)
Send a TutkWyzeProtocolMessage to the camera.
This should be called after the listener has been started, by using the mux as a context manager:
with session.ioctrl_mux() as mux:
result = mux.send_ioctl(msg)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
TutkWyzeProtocolMessage |
The message to send to the client. See tutk_protocol.py Commands |
required |
ctrl_type |
int |
used internally by the iotc library, should always be |
256 |
Returns:
Type | Description |
---|---|
TutkIOCtrlFuture |
a future promise of a response from the camera. See wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlFuture |
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def send_ioctl(
self,
msg: TutkWyzeProtocolMessage,
ctrl_type: int = tutk.IOTYPE_USER_DEFINED_START,
) -> TutkIOCtrlFuture:
"""
Send a [TutkWyzeProtocolMessage][wyzecam.tutk.tutk_protocol.TutkWyzeProtocolMessage]
to the camera.
This should be called after the listener has been started, by using the mux as a context manager:
```python
with session.ioctrl_mux() as mux:
result = mux.send_ioctl(msg)
```
:param msg: The message to send to the client. See
[tutk_protocol.py Commands](../tutk_protocol_commands/)
:param ctrl_type: used internally by the iotc library, should always be
`tutk.IOTYPE_USER_DEFINED_START`.
:returns: a future promise of a response from the camera. See [wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlFuture][]
"""
encoded_msg = msg.encode()
encoded_msg_header = (
tutk_protocol.TutkWyzeProtocolHeader.from_buffer_copy(
encoded_msg[0:16]
)
)
logger.debug("SEND %s %s %s", msg, encoded_msg_header, encoded_msg[16:])
errcode = tutk.av_send_io_ctrl(
self.tutk_platform_lib, self.av_chan_id, ctrl_type, encoded_msg
)
if errcode:
return TutkIOCtrlFuture(msg, errcode=errcode)
if not msg.expected_response_code:
logger.warning("no expected response code found")
return TutkIOCtrlFuture(msg)
return TutkIOCtrlFuture(msg, self.queues[msg.expected_response_code])
start_listening(self)
Start a separate thread listening for responses from the camera.
This is generally called by using the TutkIOCtrlMux as a context manager:
with session.ioctrl_mux() as mux:
...
If this method is called explicitly, remember to call stop_listening
when
finished.
See: wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.stop_listening
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def start_listening(self) -> None:
"""Start a separate thread listening for responses from the camera.
This is generally called by using the TutkIOCtrlMux as a context manager:
```python
with session.ioctrl_mux() as mux:
...
```
If this method is called explicitly, remember to call `stop_listening` when
finished.
See: [wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.stop_listening][]
"""
self.listener.start()
stop_listening(self)
Shuts down the separate thread used for listening for responses to the camera
See: wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.start_listening
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def stop_listening(self) -> None:
"""
Shuts down the separate thread used for listening for responses to the camera
See: [wyzecam.tutk.tutk_ioctl_mux.TutkIOCtrlMux.start_listening][]
"""
self.queues[CONTROL_CHANNEL].put(STOP_SENTINEL)
self.listener.join()
waitfor(self, futures, timeout=None)
Wait for the responses of one or more TutkIOCtrlFuture
s.
with session.ioctrl_mux() as mux:
f1 = mux.send_ioctl(msg)
f2 = mux.send_ioctl(msg2)
resp1, resp2 = mux.waitfor([f1, f2])
This allows you to wait for a set of TutkIOCtrlFuture
s to respond in
any order, and allows you to send multiple commands to the camera without
waiting for each one to return before sending another.
If you are sending one command at a time, consider using
TutkIOCtrlFuture.result()
directly:
with session.ioctrl_mux() as mux:
f1 = mux.send_ioctl(msg)
resp1 = f1.result()
f2 = mux.send_ioctl(msg2)
resp2 = f2.result()
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def waitfor(
self,
futures: Union[TutkIOCtrlFuture, List[TutkIOCtrlFuture]],
timeout: Optional[int] = None,
) -> Union[Any, List[Any]]:
"""Wait for the responses of one or more `TutkIOCtrlFuture`s.
```python
with session.ioctrl_mux() as mux:
f1 = mux.send_ioctl(msg)
f2 = mux.send_ioctl(msg2)
resp1, resp2 = mux.waitfor([f1, f2])
```
This allows you to wait for a set of `TutkIOCtrlFuture`s to respond in
any order, and allows you to send multiple commands to the camera without
waiting for each one to return before sending another.
If you are sending one command at a time, consider using
`TutkIOCtrlFuture.result()` directly:
```python
with session.ioctrl_mux() as mux:
f1 = mux.send_ioctl(msg)
resp1 = f1.result()
f2 = mux.send_ioctl(msg2)
resp2 = f2.result()
```
"""
unwrap_single_item = False
if isinstance(futures, TutkIOCtrlFuture):
futures = [futures]
unwrap_single_item = True
results = [None] * len(futures)
start = time.time()
while (timeout is None or time.time() - start <= timeout) and any(
result is None for result in results
):
all_success = True
for i, future in enumerate(futures):
if results[i] is not None:
continue
try:
result = future.result(block=False)
results[i] = result
except Empty:
all_success = False
# if we don't get all of them this pass, wait a short period before checking again
if not all_success:
time.sleep(0.1)
if unwrap_single_item:
return results[0]
else:
return results
TutkIOCtrlMuxListener
Methods
run(self)
Method representing the thread's activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def run(self) -> None:
timeout_ms = 1000
logger.info(f"Now listening on channel id {self.av_chan_id}")
while True:
try:
control_channel_command = self.queues[
CONTROL_CHANNEL
].get_nowait()
if control_channel_command == STOP_SENTINEL:
logger.info(
f"No longer listening on channel id {self.av_chan_id}"
)
return
except Empty:
pass
actual_len, io_ctl_type, data = tutk.av_recv_io_ctrl(
self.tutk_platform_lib, self.av_chan_id, timeout_ms
)
if actual_len == tutk.AV_ER_TIMEOUT:
continue
elif actual_len == tutk.AV_ER_SESSION_CLOSE_BY_REMOTE:
logger.warning(
"Connection closed by remote. Closing connection."
)
break
elif actual_len < 0:
raise tutk.TutkError(actual_len)
header, payload = tutk_protocol.decode(data)
logger.debug(f"RECV {header}: {repr(payload)}")
self.queues[header.code].put(
(actual_len, io_ctl_type, header.protocol, payload)
)
TutkIOCtrlFuture
Holds the result of a message sent over a TutkIOCtrlMux; a TutkIOCtrlFuture
is returned by [TutkIOCtrlMux.send_ioctl][wyzecam.tutk.tutk_ioctrl_mox.TutkIOCtrlMux.send_ioctl]
,
and represents the value of a future response from the camera. The actual contents
of this response should be retrieved by calling result()
, below.
Attributes:
Name | Type | Description |
---|---|---|
req |
TutkWyzeProtocolMessage |
The message sent to the camera that we are waiting for a response from |
errcode |
Optional[c_int] |
The resultant error code associated with this response |
resp_protocol |
Optional[int] |
The 2-byte protocol version of the header of the response |
resp_data |
Optional[bytes] |
The raw message sent from the camera to the client |
Methods
result(self, block=True, timeout=10000)
Wait until the camera has responded to our message, and return the result.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
block |
bool |
wait until the camera has responded, or the timeout has been reached. if False, returns immediately if we have already recieved a response, otherwise raise queue.Empty. |
True |
timeout |
int |
the maximum number of milliseconds to wait for the response from the camera, after which queue.Empty will be raised. |
10000 |
Returns:
Type | Description |
---|---|
Optional[Any] |
the result of |
Source code in wyzecam/tutk/tutk_ioctl_mux.py
def result(self, block: bool = True, timeout: int = 10000) -> Optional[Any]:
"""
Wait until the camera has responded to our message, and return the result.
:param block: wait until the camera has responded, or the timeout has been reached.
if False, returns immediately if we have already recieved a response,
otherwise raise queue.Empty.
:param timeout: the maximum number of milliseconds to wait for the response
from the camera, after which queue.Empty will be raised.
:returns: the result of [`TutkWyzeProtocolMessage.parse_response`][wyzecam.tutk.tutk_protocol.TutkWyzeProtocolMessage.parse_response]
for the appropriate message.
"""
if self.resp_data is not None:
return self.req.parse_response(self.resp_data)
if self.errcode:
raise tutk.TutkError(self.errcode)
if self.expected_response_code is None:
logger.warning("no response code!")
return None
assert self.queue is not None, "Future created without error nor queue!"
msg = self.queue.get(block=block, timeout=timeout)
assert isinstance(
msg, tuple
), "Expected a iotc result, instead got sentinel!"
actual_len, io_ctl_type, resp_protocol, data = msg
if actual_len < 0:
raise tutk.TutkError(self.errcode)
self.io_ctl_type = io_ctl_type
self.resp_protocol = resp_protocol
self.resp_data = data
return self.req.parse_response(data)