WyzeIOTCSession
An IOTC session object, used for communicating with Wyze cameras
This is constructed from a WyzeIOTC object:
with WyzeIOTC() as wyze:
with wyze.connect_and_auth(account, camera) as session:
... # send configuration commands, or stream video
However, you can construct it manually, which can be helpful if you intend to set a different frame size or bitrate than the defaults:
with WyzeIOTCSession(lib, account, camera, bitrate=tutk.BITRATE_SD)
...
Note: WyzeIOTCSession is intended to be used as a context manager. Otherwise, you will need to manually tell the session to connect and authenticate, by calling session._connect() followed by session._auth(), and session._disconnect() when you're ready to disconnect the session.
Attributes:
Name | Type | Description |
---|---|---|
tutk_platform_lib |
CDLL |
The underlying c library (from tutk.load_library) |
account |
WyzeAccount |
A WyzeAccount instance, see api.get_user_info |
camera |
WyzeCamera |
A WyzeCamera instance, see api.get_camera_list |
preferred_frame_size |
int |
The preferred size of the video stream returned by the camera. See wyzecam.tutk.tutk.FRAME_SIZE_1080P. |
preferred_bitrate |
int |
The preferred bitrate of the video stream returned by the camera. See wyzecam.tutk.tutk.BITRATE_HD. |
session_id |
Optional[c_int] |
The id of this session, once connected. |
av_chan_id |
Optional[c_int] |
The AV channel of this session, once connected. |
state |
WyzeIOTCSessionState |
The current connection state of this session. See WyzeIOTCSessionState. |
__init__(self, tutk_platform_lib, account, camera, frame_size=0, bitrate=120)
special
Construct a wyze iotc session
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tutk_platform_lib |
CDLL |
The underlying c library (from tutk.load_library) |
required |
account |
WyzeAccount |
A WyzeAccount instance, see api.get_user_info |
required |
camera |
WyzeCamera |
A WyzeCamera instance, see api.get_camera_list |
required |
frame_size |
int |
Configures the size of the video stream returned by the camera. See wyzecam.tutk.tutk.FRAME_SIZE_1080P. |
0 |
bitrate |
int |
Configures the bitrate of the video stream returned by the camera. See wyzecam.tutk.tutk.BITRATE_HD. |
120 |
Source code in wyzecam/iotc.py
def __init__(
self,
tutk_platform_lib: CDLL,
account: WyzeAccount,
camera: WyzeCamera,
frame_size: int = tutk.FRAME_SIZE_1080P,
bitrate: int = tutk.BITRATE_HD,
) -> None:
"""Construct a wyze iotc session
:param tutk_platform_lib: The underlying c library (from
[tutk.load_library][wyzecam.tutk.tutk.load_library])
:param account: A [WyzeAccount][wyzecam.api_models.WyzeAccount] instance, see
[api.get_user_info][wyzecam.api.get_user_info]
:param camera: A [WyzeCamera][wyzecam.api_models.WyzeCamera] instance, see
[api.get_camera_list][wyzecam.api.get_camera_list]
:param frame_size: Configures the size of the video stream returned by the camera.
See [wyzecam.tutk.tutk.FRAME_SIZE_1080P][].
:param bitrate: Configures the bitrate of the video stream returned by the camera.
See [wyzecam.tutk.tutk.BITRATE_HD][].
"""
self.tutk_platform_lib: CDLL = tutk_platform_lib
self.account: WyzeAccount = account
self.camera: WyzeCamera = camera
self.session_id: Optional[c_int] = None
self.av_chan_id: Optional[c_int] = None
self.state: WyzeIOTCSessionState = WyzeIOTCSessionState.DISCONNECTED
self.preferred_frame_size: int = frame_size
self.preferred_bitrate: int = bitrate
iotctrl_mux(self)
Constructs a new TutkIOCtrlMux for this session
Use this to send configuration messages, such as change the cameras resolution.
Note that you either should treat the result of this as a context manager (using with), or call start_listening() explicitly on the result. This starts a separate thread listening for the responses from the camera.
with session.ioctrl_mux() as mux:
msg = tutk_protocol.K10056SetResolvingBit(
tutk.FRAME_SIZE_1080P, tutk.BITRATE_SD)
future = mux.send_ioctl(msg)
assert future.result() == True, "Change bitrate failed!"
Source code in wyzecam/iotc.py
def iotctrl_mux(self) -> TutkIOCtrlMux:
"""Constructs a new TutkIOCtrlMux for this session
Use this to send configuration messages, such as change the cameras resolution.
Note that you either should treat the result of this as a context manager (using
with), or call start_listening() explicitly on the result. This starts a separate
thread listening for the responses from the camera.
```python
with session.ioctrl_mux() as mux:
msg = tutk_protocol.K10056SetResolvingBit(
tutk.FRAME_SIZE_1080P, tutk.BITRATE_SD)
future = mux.send_ioctl(msg)
assert future.result() == True, "Change bitrate failed!"
```
"""
assert self.av_chan_id is not None, "Please call _connect() first!"
return TutkIOCtrlMux(self.tutk_platform_lib, self.av_chan_id)
recv_video_data(self)
A generator for returning raw video frames!
By iterating over the return value of this function, you will
get raw video frame data in the form of a bytes object. This
is convenient for accessing the raw video data without doing
the work of decoding or transcoding the actual video feed. If
you want to save the video to disk, display it, or otherwise process
the video, I highly recommend using recv_video_frame
or
recv_video_frame_nparray
instead of this function.
The second item in the tuple returned by this function, 'frame_info', is a useful set of metadata about the frame as returned by the camera. See tutk.FrameInfoStruct for more details about the contents of this object.
Note that the format of this data is either raw h264 or HVEC H265 video. You will have to introspect the frame_info object to determine the format!
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info) in sess.recv_video_data():
# do something with the video data! :)
In order to use this, you will need to install PyAV.
Returns:
Type | Description |
---|---|
Iterator[Tuple[Optional[bytes], Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct]]] |
A generator, which when iterated over, yields a tuple containing the decoded image (as a PyAV VideoFrame), as well as metadata about the frame (in the form of a tutk.FrameInfoStruct). |
Source code in wyzecam/iotc.py
def recv_video_data(
self,
) -> Iterator[
Tuple[
Optional[bytes], Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct]
]
]:
"""A generator for returning raw video frames!
By iterating over the return value of this function, you will
get raw video frame data in the form of a bytes object. This
is convenient for accessing the raw video data without doing
the work of decoding or transcoding the actual video feed. If
you want to save the video to disk, display it, or otherwise process
the video, I highly recommend using `recv_video_frame` or
`recv_video_frame_nparray` instead of this function.
The second item in the tuple returned by this function, 'frame_info', is a useful
set of metadata about the frame as returned by the camera. See
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct] for more details about
the contents of this object.
Note that the format of this data is either raw h264 or HVEC H265 video. You will
have to introspect the frame_info object to determine the format!
```python
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info) in sess.recv_video_data():
# do something with the video data! :)
```
In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/).
:returns: A generator, which when iterated over, yields a tuple containing the decoded image
(as a [PyAV VideoFrame](https://pyav.org/docs/stable/api/video.html#av.video.frame.VideoFrame)),
as well as metadata about the frame (in the form of a
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]).
"""
assert self.av_chan_id is not None, "Please call _connect() first!"
while True:
errno, frame_data, frame_info, frame_idx = tutk.av_recv_frame_data(
self.tutk_platform_lib, self.av_chan_id
)
if errno < 0:
if errno == tutk.AV_ER_DATA_NOREADY:
time.sleep(1.0 / 40)
continue
elif errno == tutk.AV_ER_INCOMPLETE_FRAME:
warnings.warn("Received incomplete frame")
continue
elif errno == tutk.AV_ER_LOSED_THIS_FRAME:
warnings.warn("Lost frame")
continue
else:
raise tutk.TutkError(errno)
assert frame_info is not None, "Got no frame info without an error!"
if frame_info.frame_size != self.preferred_frame_size:
if frame_info.frame_size < 2:
logger.debug(
f"skipping smaller frame at start of stream (frame_size={frame_info.frame_size})"
)
continue
else:
# wyze doorbell has weird rotated image sizes.
if frame_info.frame_size - 3 != self.preferred_frame_size:
continue
yield frame_data, frame_info
recv_video_frame(self)
A generator for returning decoded video frames!
By iterating over the return value of this function, you will conveniently get nicely decoded frames in the form of a PyAV VideoFrame object. This is convenient for recording the video to disk.
The second item in the tuple returned by this function, 'frame_info', is a useful set of metadata about the frame as returned by the camera. See tutk.FrameInfoStruct for more details about the contents of this object.
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info) in sess.recv_video_frame():
# do something with the video data! :)
In order to use this, you will need to install PyAV.
Returns:
Type | Description |
---|---|
Iterator[Tuple[av.VideoFrame, Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct]]] |
A generator, which when iterated over, yields a tuple containing the decoded image (as a PyAV VideoFrame), as well as metadata about the frame (in the form of a tutk.FrameInfoStruct). |
Source code in wyzecam/iotc.py
def recv_video_frame(
self,
) -> Iterator[
Tuple[
"av.VideoFrame", Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct]
]
]:
"""A generator for returning decoded video frames!
By iterating over the return value of this function, you will conveniently
get nicely decoded frames in the form of a PyAV VideoFrame object. This is
convenient for recording the video to disk.
The second item in the tuple returned by this function, 'frame_info', is a useful
set of metadata about the frame as returned by the camera. See
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct] for more details about
the contents of this object.
```python
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info) in sess.recv_video_frame():
# do something with the video data! :)
```
In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/).
:returns: A generator, which when iterated over, yields a tuple containing the decoded image
(as a [PyAV VideoFrame](https://pyav.org/docs/stable/api/video.html#av.video.frame.VideoFrame)),
as well as metadata about the frame (in the form of a
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]).
"""
if av is None:
raise RuntimeError(
"recv_video_frame requires PyAv to parse video frames. "
"Install with `pip install av` and try again."
)
codec = None
for frame_data, frame_info in self.recv_video_data():
if codec is None:
codec = self._av_codec_from_frameinfo(frame_info)
packets = codec.parse(frame_data)
for packet in packets:
frames = codec.decode(packet)
for frame in frames:
yield frame, frame_info
recv_video_frame_ndarray(self)
A generator for returning decoded video frames!
By iterating over the return value of this function, you will conveniently get nicely decoded frames in the form of a numpy array (suitable for matplotlib.imshow or cv2.imshow.
The second item in the tuple returned by this function, 'frame_info', is a useful set of metadata about the frame as returned by the camera. See tutk.FrameInfoStruct for more details about the contents of this object.
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info) in sess.recv_video_frame_ndarray():
# do something with the video data! :)
In order to use this, you will need to install PyAV and numpy.
Returns:
Type | Description |
---|---|
Iterator[Tuple[np.ndarray, Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct]]] |
A generator, which when iterated over, yields a tuple containing the decoded image (as a numpy array), as well as metadata about the frame (in the form of a tutk.FrameInfoStruct). |
Source code in wyzecam/iotc.py
def recv_video_frame_ndarray(
self,
) -> Iterator[
Tuple["np.ndarray", Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct]]
]:
"""A generator for returning decoded video frames!
By iterating over the return value of this function, you will conveniently
get nicely decoded frames in the form of a numpy array (suitable for
[matplotlib.imshow](https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.imshow.html)
or [cv2.imshow](https://docs.opencv.org/master/dd/d43/tutorial_py_video_display.html).
The second item in the tuple returned by this function, 'frame_info', is a useful
set of metadata about the frame as returned by the camera. See
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct] for more details about
the contents of this object.
```python
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info) in sess.recv_video_frame_ndarray():
# do something with the video data! :)
```
In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/)
and [numpy](https://numpy.org/).
:returns: A generator, which when iterated over, yields a tuple containing the decoded image
(as a numpy array), as well as metadata about the frame (in the form of a
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]).
"""
if np is None:
raise RuntimeError(
"recv_video_frame_ndarray requires numpy to convert to a numpy array. "
"Install with `pip install numpy` and try again."
)
for frame, frame_info in self.recv_video_frame():
img = frame.to_ndarray(format="bgr24")
if frame_info.frame_size in (3, 4):
img = np.rot90(img, 3)
img = np.ascontiguousarray(img, dtype=np.uint8)
yield img, frame_info
recv_video_frame_ndarray_with_stats(self, stat_window_size=210, draw_stats='{width}x{height} {kilobytes_per_second} kB/s {frames_per_second} FPS')
Does everything recv_video_frame_ndarray does, but also computes a number
of useful / interesting debug metrics including effective framerate, bitrate,
and frame size information. Optionally, if you specify a format string to the
draw_stats
function, this information will be used to draw a line of text
onto the image in the top-right corner with this debug information.
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info, frame_stats) in sess.recv_video_frame_ndarray_with_stats():
# do something with the video data! :)
This method gives you an additional 'frame_stats' value every frame, which is a dict with the following keys:
- "bytes_per_second"
- "kilobytes_per_second"
- "window_duration"
- "frames_per_second"
- "width"
- "height"
This dictionary is available in the draw_stats string as arguments to a python str.format() call, allowing you to quickly change the debug string in the top corner of the video.
In order to use this, you will need to install PyAV, numpy, and PyOpenCV.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stat_window_size |
int |
the number of consecutive frames to use as the window function for computing the above metrics. The larger the window size, the longer period over which the metrics are averaged. Note that this method is not performant for very large window sizes. |
210 |
draw_stats |
Optional[str] |
if specified, this python format() string is used to draw some debug text in the upper right hand corner. |
'{width}x{height} {kilobytes_per_second} kB/s {frames_per_second} FPS' |
Returns:
Type | Description |
---|---|
Iterator[Tuple[np.ndarray, Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct], Dict[str, int]]] |
A generator, which when iterated over, yields a 3-tuple containing the decoded image (as a numpy array), metadata about the frame (in the form of a tutk.FrameInfoStruct), and some performance statistics (in the form of a dict). |
Source code in wyzecam/iotc.py
def recv_video_frame_ndarray_with_stats(
self,
stat_window_size: int = 210,
draw_stats: Optional[
str
] = "{width}x{height} {kilobytes_per_second} kB/s {frames_per_second} FPS",
) -> Iterator[
Tuple[
"np.ndarray",
Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct],
Dict[str, int],
]
]:
"""
Does everything recv_video_frame_ndarray does, but also computes a number
of useful / interesting debug metrics including effective framerate, bitrate,
and frame size information. Optionally, if you specify a format string to the
`draw_stats` function, this information will be used to draw a line of text
onto the image in the top-right corner with this debug information.
```python
with wyzecam.WyzeIOTC() as wyze_iotc:
with wyze_iotc.connect_and_auth(account, camera) as sess:
for (frame, frame_info, frame_stats) in sess.recv_video_frame_ndarray_with_stats():
# do something with the video data! :)
```
This method gives you an additional 'frame_stats' value every frame, which is a
dict with the following keys:
- "bytes_per_second"
- "kilobytes_per_second"
- "window_duration"
- "frames_per_second"
- "width"
- "height"
This dictionary is available in the draw_stats string as arguments to a python
str.format() call, allowing you to quickly change the debug string in the top corner
of the video.
In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/),
[numpy](https://numpy.org/), and [PyOpenCV](https://pypi.org/project/opencv-python/).
:param stat_window_size: the number of consecutive frames to use as the window function
for computing the above metrics. The larger the window size,
the longer period over which the metrics are averaged. Note that
this method is not performant for very large window sizes.
:param draw_stats: if specified, this python format() string is used to draw some debug text
in the upper right hand corner.
:returns: A generator, which when iterated over, yields a 3-tuple containing the decoded image
(as a numpy array), metadata about the frame (in the form of a
[tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]), and some performance
statistics (in the form of a dict).
"""
stat_window = []
for frame_ndarray, frame_info in self.recv_video_frame_ndarray():
stat_window.append(frame_info)
if len(stat_window) > stat_window_size:
stat_window = stat_window[len(stat_window) - stat_window_size :]
if len(stat_window) > 1:
stat_window_start = (
stat_window[0].timestamp
+ stat_window[0].timestamp_ms / 1_000_000
)
stat_window_end = (
stat_window[-1].timestamp
+ stat_window[-1].timestamp_ms / 1_000_000
)
stat_window_duration = stat_window_end - stat_window_start
if stat_window_duration <= 0:
# wyze doorbell doesn't support timestamp_ms; workaround:
stat_window_duration = (
len(stat_window) / stat_window[-1].framerate
)
stat_window_total_size = sum(
b.frame_len for b in stat_window[:-1]
) # skip the last reading
bytes_per_second = int(
stat_window_total_size / stat_window_duration
)
frames_per_second = int(len(stat_window) / stat_window_duration)
else:
bytes_per_second = 0
stat_window_duration = 0
frames_per_second = 0
stats = {
"bytes_per_second": bytes_per_second,
"kilobytes_per_second": int(bytes_per_second / 1000),
"window_duration": stat_window_duration,
"frames_per_second": frames_per_second,
"width": frame_ndarray.shape[1],
"height": frame_ndarray.shape[0],
}
if draw_stats:
text = draw_stats.format(**stats)
cv2.putText(
frame_ndarray,
text,
(50, 50),
cv2.FONT_HERSHEY_DUPLEX,
1,
(0, 0, 0),
2,
cv2.LINE_AA,
)
cv2.putText(
frame_ndarray,
text,
(50, 50),
cv2.FONT_HERSHEY_DUPLEX,
1,
(255, 255, 255),
1,
cv2.LINE_AA,
)
yield frame_ndarray, frame_info, stats
session_check(self)
Used by a device or a client to check the IOTC session info.
A device or a client may use this function to check if the IOTC session is still alive as well as getting the IOTC session info.
Returns:
Type | Description |
---|---|
SInfoStruct |
Source code in wyzecam/iotc.py
def session_check(self) -> tutk.SInfoStruct:
"""Used by a device or a client to check the IOTC session info.
A device or a client may use this function to check if the IOTC session is
still alive as well as getting the IOTC session info.
:returns: A [`tutk.SInfoStruct`][wyzecam.tutk.tutk.SInfoStruct]
"""
assert (
self.session_id is not None
), "Please call _connect() before session_check()"
errcode, sess_info = tutk.iotc_session_check(
self.tutk_platform_lib, self.session_id
)
if errcode < 0:
raise tutk.TutkError(errcode)
return sess_info