# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from collections.abc import Iterable
from queue import Empty, Full, Queue
from sys import exc_info
from threading import Event, Lock
from time import sleep
from typing import Any, Callable, Dict, Optional, cast
import backoff
import tango
from ska_control_model import (
AdminMode,
CommunicationStatus,
HealthState,
PowerState,
ResultCode,
TaskStatus,
)
from ska_mid_cbf_common.events.event_handler import EventHandler
from ska_tango_base import SKABaseDevice
from ska_tango_base.executor.executor_component_manager import (
TaskExecutorComponentManager,
)
from ska_mid_cbf_mcs.base.base_device import (
DEFAULT_TIMEOUT_LONG,
DEFAULT_TIMEOUT_SHORT,
)
__all__ = ["CbfComponentManager"]
[docs]
class CbfComponentManager(TaskExecutorComponentManager):
"""
A base component manager for SKA Mid.CBF MCS
This class exists to modify the interface of the
:py:class:`ska_tango_base.executor.executor_component_manager.TaskExecutorComponentManager`.
The ``TaskExecutorComponentManager`` accepts ``max_queue_size`` keyword argument
to determine limits on worker queue length, for the management of LRC threads.
Additionally, this provides optional arguments for attribute change event and
HealthState updates, for a device to pass in its callbacks for push change events.
Finally, the ``TaskExecutorComponentManager`` inherits from BaseComponentManager,
which accepts the keyword arguments communication_state_callback and
component_state_callback, each with an analogous callback method in the
SKABaseDevice (namely _communication_state_changed and _component_state_changed)
used to drive the operational state (opState) model from the component manager.
"""
def __init__(
self: CbfComponentManager,
*args: Any,
device: SKABaseDevice,
proxy_timeout: int = DEFAULT_TIMEOUT_SHORT,
state_change_timeout: int = DEFAULT_TIMEOUT_SHORT,
health_state_callback: Callable[[HealthState], None] | None = None,
admin_mode_callback: Callable[[str], None] | None = None,
**kwargs: Any,
) -> None:
"""
Initialise a new CbfComponentManager instance.
max_queue_size of the parent is set to match the MAX_QUEUED_COMMANDS
of the base device class, as this constant is also used to limit the
dimensions of the longRunningCommandsInQueue, longRunningCommandIDsInQueue,
longRunningCommandStatus and longRunningCommandProgress attributes used
to track LRCs, a current limitation of the SKABaseDevice class.
:param device: device object; used to trigger attribute signals
:param proxy_timeout: timeout (in seconds) for DeviceProxy control
:param state_change_timeout: timeout (in seconds) when waiting for device
state attribute change
:param health_state_callback: callback to be called when the
HealthState of the component changes
"""
# Initialize operating state machine trigger keywords "fault" and "power"
super().__init__(
*args,
fault=None,
power=None,
**kwargs,
)
# Keep a reference to the device object for triggering attribute signals
self._device = device
# Device state change callbacks
self.device_admin_mode_callback = admin_mode_callback
self._device_health_state_callback = health_state_callback
# HealthState management
self._health_state_lock = Lock()
self._health_state = HealthState.UNKNOWN
# State management
self._op_states = {}
self._op_states_queue: Queue = None # instantiated in child classes
self._state_change_timeout = state_change_timeout
# DeviceProxy and change event management
self.proxy_dict = dict()
self._event_handler = EventHandler(self.logger)
# Long-running command (LRC) management
self._proxy_timeout = proxy_timeout
self._results_queue = Queue(maxsize=EventHandler.MAX_QUEUE_SIZE)
# -------------
# Communication
# -------------
[docs]
def start_communicating(
self: CbfComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for start_communicating operation.
"""
self._update_communication_state(
communication_state=CommunicationStatus.ESTABLISHED
)
if admin_mode == AdminMode.ENGINEERING:
self.device_admin_mode_callback("to_engineering")
elif admin_mode == AdminMode.ONLINE:
self.device_admin_mode_callback("to_online")
self.logger.info(
f"CbfComponentManager.start_communicating({admin_mode.name}) completed OK"
)
[docs]
def stop_communicating(
self: CbfComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for stop_communicating operation.
"""
if admin_mode == AdminMode.OFFLINE:
self._update_component_state(power=PowerState.UNKNOWN)
self._update_communication_state(
communication_state=CommunicationStatus.DISABLED
)
self.device_admin_mode_callback("to_offline")
elif admin_mode == AdminMode.NOT_FITTED:
self.device_admin_mode_callback("to_notfitted")
self.logger.info(
f"CbfComponentManager.stop_communicating({admin_mode.name}) completed OK"
)
[docs]
def task_abort_event_is_set(
self: CbfComponentManager,
command_name: str,
task_callback: Callable,
task_abort_event: Event,
) -> bool:
"""
Helper method for checking task abort event during command thread.
:param command_name: name of command for result message
:param task_callback: command tracker update_command_info callback
:param task_abort_event: task executor abort event
:return: True if abort event is set, otherwise False
"""
if task_abort_event.is_set():
task_callback(
status=TaskStatus.ABORTED,
result=(
ResultCode.ABORTED,
f"{command_name} command aborted by task executor abort event.",
),
)
return True
return False
# ---------------
# Command Methods
# ---------------
[docs]
def results_callback(
self: CbfComponentManager, event_data: Optional[tango.EventData]
) -> None:
"""
Callback for LRC command result events.
All subdevices that may block our LRC thread with their own LRC execution
have their `longRunningCommandResult` attribute subscribed to with this method
as the change event callback.
:param event_data: Tango attribute change event data
"""
# NOTE: event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._results_queue.put_nowait(event_data)
except Full:
# Start dropping data if queue is full
self._results_queue.get_nowait()
self._results_queue.task_done()
self._results_queue.put(event_data)
def _clear_queue(
self: CbfComponentManager, queue_to_clear: Queue, queue_name: str
) -> None:
"""
Clear a queue; use 1ms sleep to alleviate CPU usage.
:param queue_to_clear: queue object to empty
:param queue_name: name of queue for logging purposes
"""
try:
while True:
queue_to_clear.get_nowait()
queue_to_clear.task_done()
sleep(0.001)
except Empty:
self.logger.info(f"Cleared {queue_name} queue.")
def _wait_for_blocking_results(
self: CbfComponentManager,
blocking_command_ids: Dict[str, str],
) -> Dict[str, ResultCode]:
"""
Wait for the number of anticipated results to be pushed by subordinate devices.
When issuing an LRC (or multiple) on subordinate devices from an LRC thread,
command result events will be stored in self._results_queue; use this
method to wait for all blocking command ID `longRunningCommandResult` events.
All subdevices that may block our LRC thread with their own LRC execution
have the `results_callback` method above provided as the change event callback
for their `longRunningCommandResult` attribute subscription, which will store
command IDs and results as change events are received.
:param blocking_command_ids: dict of blocking command IDs keyed by device name
:return: dict containing command_id and result codes keyed by device name
"""
self.logger.info(
f"wait_for_blocking_results invoked; blocking command IDs: {blocking_command_ids}"
)
# Loop exits when no blocking command IDs remain, or upon timeout when
# trying to get new results from the queue
command_results = {}
while len(blocking_command_ids) != 0:
try:
event_data = self._results_queue.get(
block=True, timeout=self._proxy_timeout
)
self._results_queue.task_done()
except Empty:
self.logger.error(
f"Event data queue is still empty after {self._proxy_timeout} seconds, blocking command IDs remaining: {blocking_command_ids}"
)
for dev_name in blocking_command_ids.values():
command_results[dev_name] = ResultCode.UNKNOWN
break
# EventHandler will validate event data, so we can immediately parse it here.
value = tuple(event_data.attr_value.value)
if "" in value:
self.logger.debug(
f"Skipping empty event; event data: {event_data}"
)
continue
# longRunningCommandResult attribute format: ("command_id", "[result_code, message]")
try:
command_id = value[0]
result = value[1]
result_code = int(result.split(",")[0].split("[")[1])
except IndexError as index_error:
self.logger.error(
f"IndexError in parsing EventData; {index_error}"
)
continue
# Remove any successfully parsed results from received events
if command_id in blocking_command_ids:
dev_name = blocking_command_ids.pop(command_id)
if result_code != ResultCode.OK:
self.logger.error(
f"{dev_name} blocking command failure; {command_id}: {result}"
)
else:
self.logger.debug(
f"{dev_name} blocking command completed OK: {command_id}, {result}"
)
command_results[dev_name] = result_code
self._clear_queue(self._results_queue, "LRC results")
return command_results
@backoff.on_exception(
wait_gen=backoff.expo,
exception=tango.DevFailed,
max_time=DEFAULT_TIMEOUT_LONG,
)
def _invoke_command_backoff(
self: CbfComponentManager,
proxy: Any,
command_arg: Any,
command_name: str,
) -> tuple[Any]:
"""
Helper function to issue a command to a DeviceProxy with backoff and retry.
:param proxy: proxy target for command
:param command_arg: optional command argument (can be None for no argument)
:param command_name: command to be issued
:return: command result (if any) with device name
"""
[[result_code], [cmd_id_or_msg]] = (
proxy.command_inout(command_name, command_arg)
if command_arg is not None
else proxy.command_inout(command_name)
)
return (proxy.dev_name(), result_code, cmd_id_or_msg)
def _invoke_command(
self: CbfComponentManager,
proxy: Any,
command_arg: Any,
command_name: str,
) -> tuple[Any]:
"""
Helper function to issue a command to a DeviceProxy.
NOTE: the ordering of parameters here affects this method's use in _issue_group_command.
:param proxy: proxy target for command
:param command_arg: optional command argument (can be None for no argument)
:param command_name: command to be issued
:return: command result (if any) with device name
"""
# NOTE: logging here was removed for performance reasons
try:
return self._invoke_command_backoff(
proxy=proxy,
command_arg=command_arg,
command_name=command_name,
)
except tango.DevFailed as dev_failed:
dev_name = proxy.dev_name()
return (
dev_name,
ResultCode.UNKNOWN,
f"Error issuing {dev_name}/{command_name}; {dev_failed}",
)
[docs]
def issue_group_command(
self: CbfComponentManager,
command_name: str,
proxies: Iterable[tango.DeviceProxy],
command_args: Any | Iterable[Any] | None = None,
is_lrc: bool | None = True,
partial_success: bool | None = False,
check_on_fail: dict[str, callable] | None = None,
) -> dict[str, ResultCode]:
"""
Helper function to issue commands to groups of devices.
Important: all proxies provided must be able to execute the requested command.
:param command_name: name of command to be issued
:param proxies: iterable of device proxies
:param command_args: command argument to be supplied to all commands, or
an iterable of separate command arguments for each command (defaults
to None for commands with no arguments); requires separate arguments
be provided for commands that take in array-like arguments, otherwise
it will map a single iterable argument to multiple commands
:param is_lrc: True if command is long-running, otherwise False
:param partial_success: defaults to False, in which case an error will
be logged if any command fails; set to True to only log a warning
if any command fails but at least 1 succeeds
:param check_on_fail: dict containing attribute name(s) and evaluating
function(s) to validate command success via attribute read should
the command result fail initial validation
:return: dict of proxy command returns
"""
# NOTE: take care to not log proxies or command args as they may be
# single-use iterators.
self.logger.info(
f"Issuing {command_name} command to sub-device group."
)
# Store a dict of proxies for reuse if additional validation will be
# required should the initial command validation fail.
proxy_dict = None
if check_on_fail is not None:
proxy_dict = {}
for proxy in proxies:
proxy_dict[proxy.dev_name()] = proxy
# Store proxies again in case it was provided via generator
proxies = proxy_dict.values()
# Set up map target and build argument generators. Argument ordering is
# important here; in the _invoke_command definition, we have the keyword
# arguments positioned after the arguments that are mapped.
#
# NOTE: Here we use separate command arguments if provided; otherwise,
# the same argument is used for all commands. We determine if arguments
# are separate by checking if command_args is of an iterable type but is
# not a string.This method therefore requires separate arguments be provided
# for commands that take in array-like arguments, otherwise it will map
# a single iterable argument to multiple commands.
#
map_target = lambda args: self._invoke_command(
proxy=args[0],
command_arg=args[1],
command_name=command_name,
)
map_args = None
if isinstance(command_args, Iterable) and not isinstance(
command_args, str
):
map_args = (
(proxy, arg) for proxy, arg in zip(proxies, command_args)
)
else:
map_args = ((proxy, command_args) for proxy in proxies)
# Issue commands in a loop using map.
# If LRC, get blocking command IDs and wait for command results.
# If fast command, populate command results immediately.
blocking_command_ids = {}
command_results = {}
for dev_name, result_code, cmd_id_or_msg in map(map_target, map_args):
command_results[dev_name] = result_code
if is_lrc:
if result_code != ResultCode.QUEUED:
self.logger.warning(cmd_id_or_msg)
else:
blocking_command_ids[cmd_id_or_msg] = dev_name
elif result_code != ResultCode.OK:
self.logger.warning(cmd_id_or_msg)
# If LRC, update results after waiting for change events.
if is_lrc:
command_results |= self._wait_for_blocking_results(
blocking_command_ids
)
if len(command_results) == 0:
self.logger.warning("No commands were issued.")
# If any command returned a failure code but all of the additional attribute
# validation checks succeed, set that command's result to ResultCode.OK.
if check_on_fail is not None:
for dev_name, result_code in command_results.items():
try:
if result_code != ResultCode.OK and all(
evaluator(getattr(proxy_dict[dev_name], attr_name))
for attr_name, evaluator in check_on_fail.items()
):
command_results[dev_name] = ResultCode.OK
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to perform additional validation checks on {dev_name}; {dev_failed}"
)
# Check command return values and log for various success cases.
if all(
result_code == ResultCode.OK
for result_code in command_results.values()
):
self.logger.info(f"{command_name} command success.")
elif partial_success and any(
result_code == ResultCode.OK
for result_code in command_results.values()
):
self.logger.warning(f"Partial {command_name} command success.")
else:
self.logger.error(
f"{command_name} command failure; command results:{command_results}"
)
return command_results
# ----------------
# Callback Methods
# ----------------
def _push_health_state_update(
self: CbfComponentManager, health_state: HealthState
) -> None:
"""
Push a health state update to the device.
:param health_state: the new health state of the component manager.
"""
if self._device_health_state_callback is not None:
self._device_health_state_callback(health_state)
[docs]
def update_device_health_state(
self: CbfComponentManager,
health_state: HealthState,
) -> None:
"""
Handle a health state change.
This is a helper method for use by subclasses.
:param health_state: the new health state of the component manager.
"""
with self._health_state_lock:
if self._health_state != health_state:
self.logger.info(f"Updating healthState to {health_state}")
self._health_state = health_state
self._push_health_state_update(self._health_state)
[docs]
def op_state_callback(
self: CbfComponentManager, event_data: Optional[tango.EventData]
) -> None:
"""
Callback for state attribute events.
:param event_data: Tango attribute change event data
"""
# NOTE (CIP-3559): event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._op_states_queue.put_nowait(event_data)
except Full:
# Start dropping data if queue is full
self._op_states_queue.get_nowait()
self._op_states_queue.task_done()
self._op_states_queue.put(event_data)
# ----------------
# Op State Changes
# ----------------
[docs]
def wait_for_blocking_states(
self: CbfComponentManager,
target_state: tango.DevState,
task_abort_event: Optional[Event] = None,
) -> bool:
"""
Wait for the number of anticipated state changes to be pushed by subordinate devices.
:param target_state: target state to wait for in all monitored op states
:param task_abort_event: Check for abort, defaults to None
:return: True if all state changes were successfully recorded, otherwise False
"""
blocking_devices = set(self._op_states.keys())
self.logger.debug(
f"wait_for_blocking_states invoked; devices monitored: {blocking_devices}; target state: {target_state}"
)
# Loop exits upon timeout when no further new states available in queue
while True:
if task_abort_event and task_abort_event.is_set():
self.logger.warning(
"wait_for_blocking_states aborted while waiting for blocking state changes."
)
return False
try:
event_data = self._op_states_queue.get(
block=True, timeout=self._state_change_timeout
)
self._op_states_queue.task_done()
except Empty:
break
# Check event error and quality factor first
if event_data.err:
self.logger.error(f"Event error: {event_data.errors}")
continue
if event_data.attr_value.quality is tango.AttrQuality.ATTR_INVALID:
self.logger.error(
f"Event quality factor invalid: {event_data}"
)
continue
value = event_data.attr_value.value
dev_name = event_data.device.dev_name()
self.logger.info(
f"Received new value for {dev_name}/state: {value}"
)
self._op_states[dev_name] = value
for dev_name, state in self._op_states.items():
if state == target_state and dev_name in blocking_devices:
blocking_devices.remove(dev_name)
if len(blocking_devices) > 0:
self.logger.error(
f"The following devices failed to transition to {target_state}: {blocking_devices}"
)
return False
self.logger.info(
f"All monitored devices successfully transitioned to {target_state}."
)
return True
@property
def is_communicating(self: CbfComponentManager) -> bool:
"""
Return whether communication with the component is established.
SKA Mid.CBF MCS uses the more expressive :py:attr:`communication_status`
for this, but this is still needed as a base classes hook.
:return: True if communication with the component is established, else False.
"""
with self._communication_state_lock:
communication_state = CommunicationStatus[
self._communication_state.name
]
if communication_state == CommunicationStatus.ESTABLISHED:
return True
# If communication_state is not ESTABLISHED, log a warning and use exception
# traceback to get caller
try:
raise Exception
except Exception:
self.logger.warning(
f"{exc_info()[2].tb_frame.f_back.f_code.co_name} failed: CommunicationStatus.{communication_state.name}"
)
return False
@property
def is_communicating_disabled(self: CbfComponentManager) -> bool:
"""
Return whether communication with the component is disabled.
SKA Mid.CBF MCS uses the more expressive :py:attr:`communication_status`
for this, but this is still needed as a base classes hook.
:return: True if communication with the component is established, else False.
"""
with self._communication_state_lock:
communication_state = CommunicationStatus[
self._communication_state.name
]
if communication_state == CommunicationStatus.DISABLED:
return True
return False
@property
def power_state(self: CbfComponentManager) -> Optional[PowerState]:
"""
Return the power state of this component manager.
:return: the power state of this component manager, if known.
"""
return self._component_state["power"]
@property
def faulty(self: CbfComponentManager) -> Optional[bool]:
"""
Return whether this component manager is currently experiencing a fault.
:return: True if this component manager is currently experiencing a fault, else False.
"""
return cast(bool, self._component_state["fault"])