Skip to content

WyzeIOTCSession

An IOTC session object, used for communicating with Wyze cameras

This is constructed from a WyzeIOTC object:

with WyzeIOTC() as wyze:
    with wyze.connect_and_auth(account, camera) as session:
        ...  # send configuration commands, or stream video

However, you can construct it manually, which can be helpful if you intend to set a different frame size or bitrate than the defaults:

with WyzeIOTCSession(lib, account, camera, bitrate=tutk.BITRATE_SD)
    ...

Note: WyzeIOTCSession is intended to be used as a context manager. Otherwise, you will need to manually tell the session to connect and authenticate, by calling session._connect() followed by session._auth(), and session._disconnect() when you're ready to disconnect the session.

Attributes:

Name Type Description
tutk_platform_lib CDLL

The underlying c library (from tutk.load_library)

account WyzeAccount

A WyzeAccount instance, see api.get_user_info

camera WyzeCamera

A WyzeCamera instance, see api.get_camera_list

preferred_frame_size int

The preferred size of the video stream returned by the camera. See wyzecam.tutk.tutk.FRAME_SIZE_1080P.

preferred_bitrate int

The preferred bitrate of the video stream returned by the camera. See wyzecam.tutk.tutk.BITRATE_HD.

session_id Optional[c_int]

The id of this session, once connected.

av_chan_id Optional[c_int]

The AV channel of this session, once connected.

state WyzeIOTCSessionState

The current connection state of this session. See WyzeIOTCSessionState.

__init__(self, tutk_platform_lib, account, camera, frame_size=0, bitrate=120) special

Construct a wyze iotc session

Parameters:

Name Type Description Default
tutk_platform_lib CDLL

The underlying c library (from tutk.load_library)

required
account WyzeAccount

A WyzeAccount instance, see api.get_user_info

required
camera WyzeCamera

A WyzeCamera instance, see api.get_camera_list

required
frame_size int

Configures the size of the video stream returned by the camera. See wyzecam.tutk.tutk.FRAME_SIZE_1080P.

0
bitrate int

Configures the bitrate of the video stream returned by the camera. See wyzecam.tutk.tutk.BITRATE_HD.

120
Source code in wyzecam/iotc.py
def __init__(
    self,
    tutk_platform_lib: CDLL,
    account: WyzeAccount,
    camera: WyzeCamera,
    frame_size: int = tutk.FRAME_SIZE_1080P,
    bitrate: int = tutk.BITRATE_HD,
) -> None:
    """Construct a wyze iotc session

    :param tutk_platform_lib: The underlying c library (from
                    [tutk.load_library][wyzecam.tutk.tutk.load_library])
    :param account: A [WyzeAccount][wyzecam.api_models.WyzeAccount] instance, see
                    [api.get_user_info][wyzecam.api.get_user_info]
    :param camera: A [WyzeCamera][wyzecam.api_models.WyzeCamera] instance, see
                   [api.get_camera_list][wyzecam.api.get_camera_list]
    :param frame_size: Configures the size of the video stream returned by the camera.
                       See [wyzecam.tutk.tutk.FRAME_SIZE_1080P][].
    :param bitrate: Configures the bitrate of the video stream returned by the camera.
                    See [wyzecam.tutk.tutk.BITRATE_HD][].
    """
    self.tutk_platform_lib: CDLL = tutk_platform_lib
    self.account: WyzeAccount = account
    self.camera: WyzeCamera = camera
    self.session_id: Optional[c_int] = None
    self.av_chan_id: Optional[c_int] = None
    self.state: WyzeIOTCSessionState = WyzeIOTCSessionState.DISCONNECTED

    self.preferred_frame_size: int = frame_size
    self.preferred_bitrate: int = bitrate

iotctrl_mux(self)

Constructs a new TutkIOCtrlMux for this session

Use this to send configuration messages, such as change the cameras resolution.

Note that you either should treat the result of this as a context manager (using with), or call start_listening() explicitly on the result. This starts a separate thread listening for the responses from the camera.

with session.ioctrl_mux() as mux:
    msg = tutk_protocol.K10056SetResolvingBit(
        tutk.FRAME_SIZE_1080P, tutk.BITRATE_SD)
    future = mux.send_ioctl(msg)
    assert future.result() == True, "Change bitrate failed!"
Source code in wyzecam/iotc.py
def iotctrl_mux(self) -> TutkIOCtrlMux:
    """Constructs a new TutkIOCtrlMux for this session

    Use this to send configuration messages, such as change the cameras resolution.

    Note that you either should treat the result of this as a context manager (using
    with), or call start_listening() explicitly on the result.  This starts a separate
    thread listening for the responses from the camera.

    ```python
    with session.ioctrl_mux() as mux:
        msg = tutk_protocol.K10056SetResolvingBit(
            tutk.FRAME_SIZE_1080P, tutk.BITRATE_SD)
        future = mux.send_ioctl(msg)
        assert future.result() == True, "Change bitrate failed!"
    ```

    """
    assert self.av_chan_id is not None, "Please call _connect() first!"
    return TutkIOCtrlMux(self.tutk_platform_lib, self.av_chan_id)

recv_video_data(self)

A generator for returning raw video frames!

By iterating over the return value of this function, you will get raw video frame data in the form of a bytes object. This is convenient for accessing the raw video data without doing the work of decoding or transcoding the actual video feed. If you want to save the video to disk, display it, or otherwise process the video, I highly recommend using recv_video_frame or recv_video_frame_nparray instead of this function.

The second item in the tuple returned by this function, 'frame_info', is a useful set of metadata about the frame as returned by the camera. See tutk.FrameInfoStruct for more details about the contents of this object.

Note that the format of this data is either raw h264 or HVEC H265 video. You will have to introspect the frame_info object to determine the format!

with wyzecam.WyzeIOTC() as wyze_iotc:
    with wyze_iotc.connect_and_auth(account, camera) as sess:
        for (frame, frame_info) in sess.recv_video_data():
            # do something with the video data! :)

In order to use this, you will need to install PyAV.

Returns:

Type Description
Iterator[Tuple[Optional[bytes], Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct]]]

A generator, which when iterated over, yields a tuple containing the decoded image (as a PyAV VideoFrame), as well as metadata about the frame (in the form of a tutk.FrameInfoStruct).

Source code in wyzecam/iotc.py
def recv_video_data(
    self,
) -> Iterator[
    Tuple[
        Optional[bytes], Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct]
    ]
]:
    """A generator for returning raw video frames!

    By iterating over the return value of this function, you will
    get raw video frame data in the form of a bytes object.  This
    is convenient for accessing the raw video data without doing
    the work of decoding or transcoding the actual video feed.  If
    you want to save the video to disk, display it, or otherwise process
    the video, I highly recommend using `recv_video_frame` or
    `recv_video_frame_nparray` instead of this function.

    The second item in the tuple returned by this function, 'frame_info', is a useful
    set of metadata about the frame as returned by the camera.  See
    [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct] for more details about
    the contents of this object.

    Note that the format of this data is either raw h264 or HVEC H265 video. You will
    have to introspect the frame_info object to determine the format!


    ```python
    with wyzecam.WyzeIOTC() as wyze_iotc:
        with wyze_iotc.connect_and_auth(account, camera) as sess:
            for (frame, frame_info) in sess.recv_video_data():
                # do something with the video data! :)
    ```

    In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/).

    :returns: A generator, which when iterated over, yields a tuple containing the decoded image
             (as a [PyAV VideoFrame](https://pyav.org/docs/stable/api/video.html#av.video.frame.VideoFrame)),
             as well as metadata about the frame (in the form of a
             [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]).


    """
    assert self.av_chan_id is not None, "Please call _connect() first!"

    while True:
        errno, frame_data, frame_info, frame_idx = tutk.av_recv_frame_data(
            self.tutk_platform_lib, self.av_chan_id
        )
        if errno < 0:
            if errno == tutk.AV_ER_DATA_NOREADY:
                time.sleep(1.0 / 40)
                continue
            elif errno == tutk.AV_ER_INCOMPLETE_FRAME:
                warnings.warn("Received incomplete frame")
                continue
            elif errno == tutk.AV_ER_LOSED_THIS_FRAME:
                warnings.warn("Lost frame")
                continue
            else:
                raise tutk.TutkError(errno)
        assert frame_info is not None, "Got no frame info without an error!"
        if frame_info.frame_size != self.preferred_frame_size:
            if frame_info.frame_size < 2:
                logger.debug(
                    f"skipping smaller frame at start of stream (frame_size={frame_info.frame_size})"
                )
                continue
            else:
                # wyze doorbell has weird rotated image sizes.
                if frame_info.frame_size - 3 != self.preferred_frame_size:
                    continue

        yield frame_data, frame_info

recv_video_frame(self)

A generator for returning decoded video frames!

By iterating over the return value of this function, you will conveniently get nicely decoded frames in the form of a PyAV VideoFrame object. This is convenient for recording the video to disk.

The second item in the tuple returned by this function, 'frame_info', is a useful set of metadata about the frame as returned by the camera. See tutk.FrameInfoStruct for more details about the contents of this object.

with wyzecam.WyzeIOTC() as wyze_iotc:
    with wyze_iotc.connect_and_auth(account, camera) as sess:
        for (frame, frame_info) in sess.recv_video_frame():
            # do something with the video data! :)

In order to use this, you will need to install PyAV.

Returns:

Type Description
Iterator[Tuple[av.VideoFrame, Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct]]]

A generator, which when iterated over, yields a tuple containing the decoded image (as a PyAV VideoFrame), as well as metadata about the frame (in the form of a tutk.FrameInfoStruct).

Source code in wyzecam/iotc.py
def recv_video_frame(
    self,
) -> Iterator[
    Tuple[
        "av.VideoFrame", Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct]
    ]
]:
    """A generator for returning decoded video frames!

    By iterating over the return value of this function, you will conveniently
    get nicely decoded frames in the form of a PyAV VideoFrame object.  This is
    convenient for recording the video to disk.

    The second item in the tuple returned by this function, 'frame_info', is a useful
    set of metadata about the frame as returned by the camera.  See
    [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct] for more details about
    the contents of this object.

    ```python
    with wyzecam.WyzeIOTC() as wyze_iotc:
        with wyze_iotc.connect_and_auth(account, camera) as sess:
            for (frame, frame_info) in sess.recv_video_frame():
                # do something with the video data! :)
    ```

    In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/).

    :returns: A generator, which when iterated over, yields a tuple containing the decoded image
             (as a [PyAV VideoFrame](https://pyav.org/docs/stable/api/video.html#av.video.frame.VideoFrame)),
             as well as metadata about the frame (in the form of a
             [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]).
    """
    if av is None:
        raise RuntimeError(
            "recv_video_frame requires PyAv to parse video frames. "
            "Install with `pip install av` and try again."
        )

    codec = None
    for frame_data, frame_info in self.recv_video_data():
        if codec is None:
            codec = self._av_codec_from_frameinfo(frame_info)
        packets = codec.parse(frame_data)
        for packet in packets:
            frames = codec.decode(packet)
            for frame in frames:
                yield frame, frame_info

recv_video_frame_ndarray(self)

A generator for returning decoded video frames!

By iterating over the return value of this function, you will conveniently get nicely decoded frames in the form of a numpy array (suitable for matplotlib.imshow or cv2.imshow.

The second item in the tuple returned by this function, 'frame_info', is a useful set of metadata about the frame as returned by the camera. See tutk.FrameInfoStruct for more details about the contents of this object.

with wyzecam.WyzeIOTC() as wyze_iotc:
    with wyze_iotc.connect_and_auth(account, camera) as sess:
        for (frame, frame_info) in sess.recv_video_frame_ndarray():
            # do something with the video data! :)

In order to use this, you will need to install PyAV and numpy.

Returns:

Type Description
Iterator[Tuple[np.ndarray, Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct]]]

A generator, which when iterated over, yields a tuple containing the decoded image (as a numpy array), as well as metadata about the frame (in the form of a tutk.FrameInfoStruct).

Source code in wyzecam/iotc.py
def recv_video_frame_ndarray(
    self,
) -> Iterator[
    Tuple["np.ndarray", Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct]]
]:
    """A generator for returning decoded video frames!

    By iterating over the return value of this function, you will conveniently
    get nicely decoded frames in the form of a numpy array (suitable for
    [matplotlib.imshow](https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.imshow.html)
    or [cv2.imshow](https://docs.opencv.org/master/dd/d43/tutorial_py_video_display.html).

    The second item in the tuple returned by this function, 'frame_info', is a useful
    set of metadata about the frame as returned by the camera.  See
    [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct] for more details about
    the contents of this object.

    ```python
    with wyzecam.WyzeIOTC() as wyze_iotc:
        with wyze_iotc.connect_and_auth(account, camera) as sess:
            for (frame, frame_info) in sess.recv_video_frame_ndarray():
                # do something with the video data! :)
    ```

    In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/)
    and [numpy](https://numpy.org/).

    :returns: A generator, which when iterated over, yields a tuple containing the decoded image
             (as a numpy array), as well as metadata about the frame (in the form of a
             [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]).
    """
    if np is None:
        raise RuntimeError(
            "recv_video_frame_ndarray requires numpy to convert to a numpy array. "
            "Install with `pip install numpy` and try again."
        )

    for frame, frame_info in self.recv_video_frame():
        img = frame.to_ndarray(format="bgr24")
        if frame_info.frame_size in (3, 4):
            img = np.rot90(img, 3)
            img = np.ascontiguousarray(img, dtype=np.uint8)
        yield img, frame_info

recv_video_frame_ndarray_with_stats(self, stat_window_size=210, draw_stats='{width}x{height} {kilobytes_per_second} kB/s {frames_per_second} FPS')

Does everything recv_video_frame_ndarray does, but also computes a number of useful / interesting debug metrics including effective framerate, bitrate, and frame size information. Optionally, if you specify a format string to the draw_stats function, this information will be used to draw a line of text onto the image in the top-right corner with this debug information.

with wyzecam.WyzeIOTC() as wyze_iotc:
    with wyze_iotc.connect_and_auth(account, camera) as sess:
        for (frame, frame_info, frame_stats) in sess.recv_video_frame_ndarray_with_stats():
            # do something with the video data! :)

This method gives you an additional 'frame_stats' value every frame, which is a dict with the following keys:

  • "bytes_per_second"
  • "kilobytes_per_second"
  • "window_duration"
  • "frames_per_second"
  • "width"
  • "height"

This dictionary is available in the draw_stats string as arguments to a python str.format() call, allowing you to quickly change the debug string in the top corner of the video.

In order to use this, you will need to install PyAV, numpy, and PyOpenCV.

Parameters:

Name Type Description Default
stat_window_size int

the number of consecutive frames to use as the window function for computing the above metrics. The larger the window size, the longer period over which the metrics are averaged. Note that this method is not performant for very large window sizes.

210
draw_stats Optional[str]

if specified, this python format() string is used to draw some debug text in the upper right hand corner.

'{width}x{height} {kilobytes_per_second} kB/s {frames_per_second} FPS'

Returns:

Type Description
Iterator[Tuple[np.ndarray, Union[wyzecam.tutk.tutk.FrameInfoStruct, wyzecam.tutk.tutk.FrameInfo3Struct], Dict[str, int]]]

A generator, which when iterated over, yields a 3-tuple containing the decoded image (as a numpy array), metadata about the frame (in the form of a tutk.FrameInfoStruct), and some performance statistics (in the form of a dict).

Source code in wyzecam/iotc.py
def recv_video_frame_ndarray_with_stats(
    self,
    stat_window_size: int = 210,
    draw_stats: Optional[
        str
    ] = "{width}x{height} {kilobytes_per_second} kB/s {frames_per_second} FPS",
) -> Iterator[
    Tuple[
        "np.ndarray",
        Union[tutk.FrameInfoStruct, tutk.FrameInfo3Struct],
        Dict[str, int],
    ]
]:
    """
    Does everything recv_video_frame_ndarray does, but also computes a number
    of useful / interesting debug metrics including effective framerate, bitrate,
    and frame size information.  Optionally, if you specify a format string to the
    `draw_stats` function, this information will be used to draw a line of text
    onto the image in the top-right corner with this debug information.

    ```python
    with wyzecam.WyzeIOTC() as wyze_iotc:
        with wyze_iotc.connect_and_auth(account, camera) as sess:
            for (frame, frame_info, frame_stats) in sess.recv_video_frame_ndarray_with_stats():
                # do something with the video data! :)
    ```


    This method gives you an additional 'frame_stats' value every frame, which is a
    dict with the following keys:

     - "bytes_per_second"
     - "kilobytes_per_second"
     - "window_duration"
     - "frames_per_second"
     - "width"
     - "height"

    This dictionary is available in the draw_stats string as arguments to a python
    str.format() call, allowing you to quickly change the debug string in the top corner
    of the video.

    In order to use this, you will need to install [PyAV](https://pyav.org/docs/stable/),
    [numpy](https://numpy.org/), and [PyOpenCV](https://pypi.org/project/opencv-python/).

    :param stat_window_size: the number of consecutive frames to use as the window function
                             for computing the above metrics.  The larger the window size,
                             the longer period over which the metrics are averaged.  Note that
                             this method is not performant for very large window sizes.
    :param draw_stats: if specified, this python format() string is used to draw some debug text
                       in the upper right hand corner.

    :returns: A generator, which when iterated over, yields a 3-tuple containing the decoded image
             (as a numpy array), metadata about the frame (in the form of a
             [tutk.FrameInfoStruct][wyzecam.tutk.tutk.FrameInfoStruct]), and some performance
             statistics (in the form of a dict).

    """
    stat_window = []
    for frame_ndarray, frame_info in self.recv_video_frame_ndarray():
        stat_window.append(frame_info)
        if len(stat_window) > stat_window_size:
            stat_window = stat_window[len(stat_window) - stat_window_size :]

        if len(stat_window) > 1:
            stat_window_start = (
                stat_window[0].timestamp
                + stat_window[0].timestamp_ms / 1_000_000
            )
            stat_window_end = (
                stat_window[-1].timestamp
                + stat_window[-1].timestamp_ms / 1_000_000
            )
            stat_window_duration = stat_window_end - stat_window_start
            if stat_window_duration <= 0:
                # wyze doorbell doesn't support timestamp_ms; workaround:
                stat_window_duration = (
                    len(stat_window) / stat_window[-1].framerate
                )
            stat_window_total_size = sum(
                b.frame_len for b in stat_window[:-1]
            )  # skip the last reading
            bytes_per_second = int(
                stat_window_total_size / stat_window_duration
            )
            frames_per_second = int(len(stat_window) / stat_window_duration)
        else:
            bytes_per_second = 0
            stat_window_duration = 0
            frames_per_second = 0

        stats = {
            "bytes_per_second": bytes_per_second,
            "kilobytes_per_second": int(bytes_per_second / 1000),
            "window_duration": stat_window_duration,
            "frames_per_second": frames_per_second,
            "width": frame_ndarray.shape[1],
            "height": frame_ndarray.shape[0],
        }

        if draw_stats:
            text = draw_stats.format(**stats)
            cv2.putText(
                frame_ndarray,
                text,
                (50, 50),
                cv2.FONT_HERSHEY_DUPLEX,
                1,
                (0, 0, 0),
                2,
                cv2.LINE_AA,
            )
            cv2.putText(
                frame_ndarray,
                text,
                (50, 50),
                cv2.FONT_HERSHEY_DUPLEX,
                1,
                (255, 255, 255),
                1,
                cv2.LINE_AA,
            )

        yield frame_ndarray, frame_info, stats

session_check(self)

Used by a device or a client to check the IOTC session info.

A device or a client may use this function to check if the IOTC session is still alive as well as getting the IOTC session info.

Returns:

Type Description
SInfoStruct

A tutk.SInfoStruct

Source code in wyzecam/iotc.py
def session_check(self) -> tutk.SInfoStruct:
    """Used by a device or a client to check the IOTC session info.

    A device or a client may use this function to check if the IOTC session is
    still alive as well as getting the IOTC session info.

    :returns: A [`tutk.SInfoStruct`][wyzecam.tutk.tutk.SInfoStruct]
    """
    assert (
        self.session_id is not None
    ), "Please call _connect() before session_check()"

    errcode, sess_info = tutk.iotc_session_check(
        self.tutk_platform_lib, self.session_id
    )
    if errcode < 0:
        raise tutk.TutkError(errcode)

    return sess_info