Source code for ska_mid_cbf_mcs.base.component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from collections.abc import Iterable
from queue import Empty, Full, Queue
from sys import exc_info
from threading import Event, Lock
from time import sleep
from typing import Any, Callable, Dict, Optional, cast

import backoff
import tango
from ska_control_model import (
    AdminMode,
    CommunicationStatus,
    HealthState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_mid_cbf_common.events.event_handler import EventHandler
from ska_tango_base import SKABaseDevice
from ska_tango_base.executor.executor_component_manager import (
    TaskExecutorComponentManager,
)

from ska_mid_cbf_mcs.base.base_device import (
    DEFAULT_TIMEOUT_LONG,
    DEFAULT_TIMEOUT_SHORT,
)

__all__ = ["CbfComponentManager"]


[docs] class CbfComponentManager(TaskExecutorComponentManager): """ A base component manager for SKA Mid.CBF MCS This class exists to modify the interface of the :py:class:`ska_tango_base.executor.executor_component_manager.TaskExecutorComponentManager`. The ``TaskExecutorComponentManager`` accepts ``max_queue_size`` keyword argument to determine limits on worker queue length, for the management of LRC threads. Additionally, this provides optional arguments for attribute change event and HealthState updates, for a device to pass in its callbacks for push change events. Finally, the ``TaskExecutorComponentManager`` inherits from BaseComponentManager, which accepts the keyword arguments communication_state_callback and component_state_callback, each with an analogous callback method in the SKABaseDevice (namely _communication_state_changed and _component_state_changed) used to drive the operational state (opState) model from the component manager. """ def __init__( self: CbfComponentManager, *args: Any, device: SKABaseDevice, proxy_timeout: int = DEFAULT_TIMEOUT_SHORT, state_change_timeout: int = DEFAULT_TIMEOUT_SHORT, health_state_callback: Callable[[HealthState], None] | None = None, admin_mode_callback: Callable[[str], None] | None = None, **kwargs: Any, ) -> None: """ Initialise a new CbfComponentManager instance. max_queue_size of the parent is set to match the MAX_QUEUED_COMMANDS of the base device class, as this constant is also used to limit the dimensions of the longRunningCommandsInQueue, longRunningCommandIDsInQueue, longRunningCommandStatus and longRunningCommandProgress attributes used to track LRCs, a current limitation of the SKABaseDevice class. :param device: device object; used to trigger attribute signals :param proxy_timeout: timeout (in seconds) for DeviceProxy control :param state_change_timeout: timeout (in seconds) when waiting for device state attribute change :param health_state_callback: callback to be called when the HealthState of the component changes """ # Initialize operating state machine trigger keywords "fault" and "power" super().__init__( *args, fault=None, power=None, **kwargs, ) # Keep a reference to the device object for triggering attribute signals self._device = device # Device state change callbacks self.device_admin_mode_callback = admin_mode_callback self._device_health_state_callback = health_state_callback # HealthState management self._health_state_lock = Lock() self._health_state = HealthState.UNKNOWN # State management self._op_states = {} self._op_states_queue: Queue = None # instantiated in child classes self._state_change_timeout = state_change_timeout # DeviceProxy and change event management self.proxy_dict = dict() self._event_handler = EventHandler(self.logger) # Long-running command (LRC) management self._proxy_timeout = proxy_timeout self._results_queue = Queue(maxsize=EventHandler.MAX_QUEUE_SIZE) # ------------- # Communication # -------------
[docs] def start_communicating( self: CbfComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for start_communicating operation. """ self._update_communication_state( communication_state=CommunicationStatus.ESTABLISHED ) if admin_mode == AdminMode.ENGINEERING: self.device_admin_mode_callback("to_engineering") elif admin_mode == AdminMode.ONLINE: self.device_admin_mode_callback("to_online") self.logger.info( f"CbfComponentManager.start_communicating({admin_mode.name}) completed OK" )
[docs] def stop_communicating( self: CbfComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for stop_communicating operation. """ if admin_mode == AdminMode.OFFLINE: self._update_component_state(power=PowerState.UNKNOWN) self._update_communication_state( communication_state=CommunicationStatus.DISABLED ) self.device_admin_mode_callback("to_offline") elif admin_mode == AdminMode.NOT_FITTED: self.device_admin_mode_callback("to_notfitted") self.logger.info( f"CbfComponentManager.stop_communicating({admin_mode.name}) completed OK" )
[docs] def task_abort_event_is_set( self: CbfComponentManager, command_name: str, task_callback: Callable, task_abort_event: Event, ) -> bool: """ Helper method for checking task abort event during command thread. :param command_name: name of command for result message :param task_callback: command tracker update_command_info callback :param task_abort_event: task executor abort event :return: True if abort event is set, otherwise False """ if task_abort_event.is_set(): task_callback( status=TaskStatus.ABORTED, result=( ResultCode.ABORTED, f"{command_name} command aborted by task executor abort event.", ), ) return True return False
# --------------- # Command Methods # ---------------
[docs] def results_callback( self: CbfComponentManager, event_data: Optional[tango.EventData] ) -> None: """ Callback for LRC command result events. All subdevices that may block our LRC thread with their own LRC execution have their `longRunningCommandResult` attribute subscribed to with this method as the change event callback. :param event_data: Tango attribute change event data """ # NOTE: event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._results_queue.put_nowait(event_data) except Full: # Start dropping data if queue is full self._results_queue.get_nowait() self._results_queue.task_done() self._results_queue.put(event_data)
def _clear_queue( self: CbfComponentManager, queue_to_clear: Queue, queue_name: str ) -> None: """ Clear a queue; use 1ms sleep to alleviate CPU usage. :param queue_to_clear: queue object to empty :param queue_name: name of queue for logging purposes """ try: while True: queue_to_clear.get_nowait() queue_to_clear.task_done() sleep(0.001) except Empty: self.logger.info(f"Cleared {queue_name} queue.") def _wait_for_blocking_results( self: CbfComponentManager, blocking_command_ids: Dict[str, str], ) -> Dict[str, ResultCode]: """ Wait for the number of anticipated results to be pushed by subordinate devices. When issuing an LRC (or multiple) on subordinate devices from an LRC thread, command result events will be stored in self._results_queue; use this method to wait for all blocking command ID `longRunningCommandResult` events. All subdevices that may block our LRC thread with their own LRC execution have the `results_callback` method above provided as the change event callback for their `longRunningCommandResult` attribute subscription, which will store command IDs and results as change events are received. :param blocking_command_ids: dict of blocking command IDs keyed by device name :return: dict containing command_id and result codes keyed by device name """ self.logger.info( f"wait_for_blocking_results invoked; blocking command IDs: {blocking_command_ids}" ) # Loop exits when no blocking command IDs remain, or upon timeout when # trying to get new results from the queue command_results = {} while len(blocking_command_ids) != 0: try: event_data = self._results_queue.get( block=True, timeout=self._proxy_timeout ) self._results_queue.task_done() except Empty: self.logger.error( f"Event data queue is still empty after {self._proxy_timeout} seconds, blocking command IDs remaining: {blocking_command_ids}" ) for dev_name in blocking_command_ids.values(): command_results[dev_name] = ResultCode.UNKNOWN break # EventHandler will validate event data, so we can immediately parse it here. value = tuple(event_data.attr_value.value) if "" in value: self.logger.debug( f"Skipping empty event; event data: {event_data}" ) continue # longRunningCommandResult attribute format: ("command_id", "[result_code, message]") try: command_id = value[0] result = value[1] result_code = int(result.split(",")[0].split("[")[1]) except IndexError as index_error: self.logger.error( f"IndexError in parsing EventData; {index_error}" ) continue # Remove any successfully parsed results from received events if command_id in blocking_command_ids: dev_name = blocking_command_ids.pop(command_id) if result_code != ResultCode.OK: self.logger.error( f"{dev_name} blocking command failure; {command_id}: {result}" ) else: self.logger.debug( f"{dev_name} blocking command completed OK: {command_id}, {result}" ) command_results[dev_name] = result_code self._clear_queue(self._results_queue, "LRC results") return command_results @backoff.on_exception( wait_gen=backoff.expo, exception=tango.DevFailed, max_time=DEFAULT_TIMEOUT_LONG, ) def _invoke_command_backoff( self: CbfComponentManager, proxy: Any, command_arg: Any, command_name: str, ) -> tuple[Any]: """ Helper function to issue a command to a DeviceProxy with backoff and retry. :param proxy: proxy target for command :param command_arg: optional command argument (can be None for no argument) :param command_name: command to be issued :return: command result (if any) with device name """ [[result_code], [cmd_id_or_msg]] = ( proxy.command_inout(command_name, command_arg) if command_arg is not None else proxy.command_inout(command_name) ) return (proxy.dev_name(), result_code, cmd_id_or_msg) def _invoke_command( self: CbfComponentManager, proxy: Any, command_arg: Any, command_name: str, ) -> tuple[Any]: """ Helper function to issue a command to a DeviceProxy. NOTE: the ordering of parameters here affects this method's use in _issue_group_command. :param proxy: proxy target for command :param command_arg: optional command argument (can be None for no argument) :param command_name: command to be issued :return: command result (if any) with device name """ # NOTE: logging here was removed for performance reasons try: return self._invoke_command_backoff( proxy=proxy, command_arg=command_arg, command_name=command_name, ) except tango.DevFailed as dev_failed: dev_name = proxy.dev_name() return ( dev_name, ResultCode.UNKNOWN, f"Error issuing {dev_name}/{command_name}; {dev_failed}", )
[docs] def issue_group_command( self: CbfComponentManager, command_name: str, proxies: Iterable[tango.DeviceProxy], command_args: Any | Iterable[Any] | None = None, is_lrc: bool | None = True, partial_success: bool | None = False, check_on_fail: dict[str, callable] | None = None, ) -> dict[str, ResultCode]: """ Helper function to issue commands to groups of devices. Important: all proxies provided must be able to execute the requested command. :param command_name: name of command to be issued :param proxies: iterable of device proxies :param command_args: command argument to be supplied to all commands, or an iterable of separate command arguments for each command (defaults to None for commands with no arguments); requires separate arguments be provided for commands that take in array-like arguments, otherwise it will map a single iterable argument to multiple commands :param is_lrc: True if command is long-running, otherwise False :param partial_success: defaults to False, in which case an error will be logged if any command fails; set to True to only log a warning if any command fails but at least 1 succeeds :param check_on_fail: dict containing attribute name(s) and evaluating function(s) to validate command success via attribute read should the command result fail initial validation :return: dict of proxy command returns """ # NOTE: take care to not log proxies or command args as they may be # single-use iterators. self.logger.info( f"Issuing {command_name} command to sub-device group." ) # Store a dict of proxies for reuse if additional validation will be # required should the initial command validation fail. proxy_dict = None if check_on_fail is not None: proxy_dict = {} for proxy in proxies: proxy_dict[proxy.dev_name()] = proxy # Store proxies again in case it was provided via generator proxies = proxy_dict.values() # Set up map target and build argument generators. Argument ordering is # important here; in the _invoke_command definition, we have the keyword # arguments positioned after the arguments that are mapped. # # NOTE: Here we use separate command arguments if provided; otherwise, # the same argument is used for all commands. We determine if arguments # are separate by checking if command_args is of an iterable type but is # not a string.This method therefore requires separate arguments be provided # for commands that take in array-like arguments, otherwise it will map # a single iterable argument to multiple commands. # map_target = lambda args: self._invoke_command( proxy=args[0], command_arg=args[1], command_name=command_name, ) map_args = None if isinstance(command_args, Iterable) and not isinstance( command_args, str ): map_args = ( (proxy, arg) for proxy, arg in zip(proxies, command_args) ) else: map_args = ((proxy, command_args) for proxy in proxies) # Issue commands in a loop using map. # If LRC, get blocking command IDs and wait for command results. # If fast command, populate command results immediately. blocking_command_ids = {} command_results = {} for dev_name, result_code, cmd_id_or_msg in map(map_target, map_args): command_results[dev_name] = result_code if is_lrc: if result_code != ResultCode.QUEUED: self.logger.warning(cmd_id_or_msg) else: blocking_command_ids[cmd_id_or_msg] = dev_name elif result_code != ResultCode.OK: self.logger.warning(cmd_id_or_msg) # If LRC, update results after waiting for change events. if is_lrc: command_results |= self._wait_for_blocking_results( blocking_command_ids ) if len(command_results) == 0: self.logger.warning("No commands were issued.") # If any command returned a failure code but all of the additional attribute # validation checks succeed, set that command's result to ResultCode.OK. if check_on_fail is not None: for dev_name, result_code in command_results.items(): try: if result_code != ResultCode.OK and all( evaluator(getattr(proxy_dict[dev_name], attr_name)) for attr_name, evaluator in check_on_fail.items() ): command_results[dev_name] = ResultCode.OK except tango.DevFailed as dev_failed: self.logger.error( f"Failed to perform additional validation checks on {dev_name}; {dev_failed}" ) # Check command return values and log for various success cases. if all( result_code == ResultCode.OK for result_code in command_results.values() ): self.logger.info(f"{command_name} command success.") elif partial_success and any( result_code == ResultCode.OK for result_code in command_results.values() ): self.logger.warning(f"Partial {command_name} command success.") else: self.logger.error( f"{command_name} command failure; command results:{command_results}" ) return command_results
# ---------------- # Callback Methods # ---------------- def _push_health_state_update( self: CbfComponentManager, health_state: HealthState ) -> None: """ Push a health state update to the device. :param health_state: the new health state of the component manager. """ if self._device_health_state_callback is not None: self._device_health_state_callback(health_state)
[docs] def update_device_health_state( self: CbfComponentManager, health_state: HealthState, ) -> None: """ Handle a health state change. This is a helper method for use by subclasses. :param health_state: the new health state of the component manager. """ with self._health_state_lock: if self._health_state != health_state: self.logger.info(f"Updating healthState to {health_state}") self._health_state = health_state self._push_health_state_update(self._health_state)
[docs] def op_state_callback( self: CbfComponentManager, event_data: Optional[tango.EventData] ) -> None: """ Callback for state attribute events. :param event_data: Tango attribute change event data """ # NOTE (CIP-3559): event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._op_states_queue.put_nowait(event_data) except Full: # Start dropping data if queue is full self._op_states_queue.get_nowait() self._op_states_queue.task_done() self._op_states_queue.put(event_data)
# ---------------- # Op State Changes # ----------------
[docs] def wait_for_blocking_states( self: CbfComponentManager, target_state: tango.DevState, task_abort_event: Optional[Event] = None, ) -> bool: """ Wait for the number of anticipated state changes to be pushed by subordinate devices. :param target_state: target state to wait for in all monitored op states :param task_abort_event: Check for abort, defaults to None :return: True if all state changes were successfully recorded, otherwise False """ blocking_devices = set(self._op_states.keys()) self.logger.debug( f"wait_for_blocking_states invoked; devices monitored: {blocking_devices}; target state: {target_state}" ) # Loop exits upon timeout when no further new states available in queue while True: if task_abort_event and task_abort_event.is_set(): self.logger.warning( "wait_for_blocking_states aborted while waiting for blocking state changes." ) return False try: event_data = self._op_states_queue.get( block=True, timeout=self._state_change_timeout ) self._op_states_queue.task_done() except Empty: break # Check event error and quality factor first if event_data.err: self.logger.error(f"Event error: {event_data.errors}") continue if event_data.attr_value.quality is tango.AttrQuality.ATTR_INVALID: self.logger.error( f"Event quality factor invalid: {event_data}" ) continue value = event_data.attr_value.value dev_name = event_data.device.dev_name() self.logger.info( f"Received new value for {dev_name}/state: {value}" ) self._op_states[dev_name] = value for dev_name, state in self._op_states.items(): if state == target_state and dev_name in blocking_devices: blocking_devices.remove(dev_name) if len(blocking_devices) > 0: self.logger.error( f"The following devices failed to transition to {target_state}: {blocking_devices}" ) return False self.logger.info( f"All monitored devices successfully transitioned to {target_state}." ) return True
@property def is_communicating(self: CbfComponentManager) -> bool: """ Return whether communication with the component is established. SKA Mid.CBF MCS uses the more expressive :py:attr:`communication_status` for this, but this is still needed as a base classes hook. :return: True if communication with the component is established, else False. """ with self._communication_state_lock: communication_state = CommunicationStatus[ self._communication_state.name ] if communication_state == CommunicationStatus.ESTABLISHED: return True # If communication_state is not ESTABLISHED, log a warning and use exception # traceback to get caller try: raise Exception except Exception: self.logger.warning( f"{exc_info()[2].tb_frame.f_back.f_code.co_name} failed: CommunicationStatus.{communication_state.name}" ) return False @property def is_communicating_disabled(self: CbfComponentManager) -> bool: """ Return whether communication with the component is disabled. SKA Mid.CBF MCS uses the more expressive :py:attr:`communication_status` for this, but this is still needed as a base classes hook. :return: True if communication with the component is established, else False. """ with self._communication_state_lock: communication_state = CommunicationStatus[ self._communication_state.name ] if communication_state == CommunicationStatus.DISABLED: return True return False @property def power_state(self: CbfComponentManager) -> Optional[PowerState]: """ Return the power state of this component manager. :return: the power state of this component manager, if known. """ return self._component_state["power"] @property def faulty(self: CbfComponentManager) -> Optional[bool]: """ Return whether this component manager is currently experiencing a fault. :return: True if this component manager is currently experiencing a fault, else False. """ return cast(bool, self._component_state["fault"])