Source code for ska_mid_cbf_mcs.testing.assertions

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from typing import Any

from assertpy import assert_that
from ska_tango_testing.integration import TangoEventTracer
from tango import DeviceProxy


[docs] def assert_events( expected_events: tuple[tuple[Any]], proxies: tuple[DeviceProxy], event_tracer: TangoEventTracer, event_timeout: int, read_on_failure: bool = False, ): """ Helper for calling assertpy.assert_that for a list of expected change events from a list of device proxies. :param expected_events: list of tuples detailing expected events; each tuple should be formatted as such: - 4 items: (attribute_name, expected_value, previous_value, number_of_events) - 3 items using custom matcher: (attribute_name, custom_matcher, number_of_events) :param proxies: device proxies to check for expected events :param event_tracer: a TangoEventTracer tracking all the expected events :param event_timeout: timeout for event assertion :param read_on_failure: defaults to False; if set to True and upon AssertionError try once more to validate device attribute by reading :raises AssertionError: if any expected event was not received :raises ValueError: if expected_events formatted incorrectly """ for proxy in proxies: for event in expected_events: if len(event) == 3: name, custom, num_occurrence = event try: assert_that(event_tracer).within_timeout( event_timeout ).has_change_event_occurred( device_name=proxy, attribute_name=name, custom_matcher=custom, min_n_events=num_occurrence, ) except AssertionError as assert_err: if read_on_failure: assert custom(getattr(proxy, name)) else: raise assert_err elif len(event) == 4: name, value, previous, num_occurrence = event try: assert_that(event_tracer).within_timeout( event_timeout ).has_change_event_occurred( device_name=proxy, attribute_name=name, attribute_value=value, previous_value=previous, min_n_events=num_occurrence, ) except AssertionError as assert_err: if read_on_failure: assert getattr(proxy, name) == value else: raise assert_err else: raise ValueError(f"Incorrect expected event value {event}")