Source code for ska_mid_cbf_mcs.base.base_device

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

"""
CbfDevice

Generic Tango device for Mid.CBF
"""

from __future__ import annotations

import os
import threading
from typing import Any

import psutil
from ska_control_model import AdminMode, TaskStatus
from ska_tango_base.base.base_component_manager import JSONData
from ska_tango_base.base.base_device import SKABaseDevice
from ska_tango_base.base.base_interface import BaseInterface
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from ska_tango_base.type_hints import DevVarLongStringArrayType
from tango import AttrWriteType
from tango.server import device_property

# Base device timeouts, used primarily for DeviceProxy accesses.
DEFAULT_TIMEOUT_SHORT = 60
DEFAULT_TIMEOUT_LONG = 180

# Use these to configure optional CPU and memory diagnostics.
ENABLE_DIAGNOSTICS = False
CPU_MONITORING_INTERVAL = 1.0
CPU_MONITORING_THRESHOLD = 80.0


[docs] class CbfDevice(SKABaseDevice): """ A generic base device for Mid.CBF. Extends SKABaseDevice to override certain key values. """ # ----------------------------------------------------------------------- # # Attributes # # ----------------------------------------------------------------------- # # Overriding base interface to acknowledge command object deprecation and # prevent unnecessary initialization; see SKABaseDevice. InitCommand = None On = None Off = None Standby = None Reset = None is_On_allowed = None is_Off_allowed = None is_Standby_allowed = None is_Reset_allowed = None execute_On = BaseInterface.execute_On execute_Off = BaseInterface.execute_Off execute_Standby = BaseInterface.execute_Standby execute_Reset = BaseInterface.execute_Reset # --- Device Properties --- # DeviceID = device_property( doc="Integer value for identifying CbfDevice-inheriting devices.", dtype=int, mandatory=True, ) LRCTimeout = device_property( doc="Timeout to wait for blocking long-running commands (LRCs).", dtype=str, default_value=str(DEFAULT_TIMEOUT_LONG), ) # --- Device Attributes & Signals --- # health_info_signal: AttrSignal[str] = AttrSignal( stored=True, initial_value="" ) """ Signal for the healthInfo attribute. Values are emitted for this signal whenever a client changes the attribute. """ healthInfo: attribute_from_signal = attribute_from_signal( health_info_signal, access=AttrWriteType.READ, dtype=str, description=( "HealthState info messages for this device. Reports all errors collected " "during health state aggregation in a JSON-formatted string." ), ) """healthInfo device attribute""" # ----------------------------------------------------------------------- # # Methods # # ----------------------------------------------------------------------- # # --- Device Initialization --- # def _monitor_resource_usage(self: CbfDevice) -> None: """ Log CPU and memory usage when the process is above a certain CPU threshold, using `psutils` to calculate CPU usage per thread and `threading` to identify threads by name. """ process = psutil.Process(os.getpid()) while not self._resource_monitor_event.is_set(): total_cpu_percent = process.cpu_percent(CPU_MONITORING_INTERVAL) if total_cpu_percent >= CPU_MONITORING_THRESHOLD: total_cpu_time = sum(process.cpu_times()) thread_id_to_name = { thread.native_id: thread.name for thread in threading.enumerate() } thread_top = [ ( thread_id_to_name.get(thread_info.id, "UNKNOWN"), f"{total_cpu_percent * ((thread_info.system_time + thread_info.user_time) / total_cpu_time)}%", ) for thread_info in process.threads() ] thread_top.sort( key=lambda value: float(value[1].split("%")[0]), reverse=True, ) self.logger.info( f"Total resource usage ({CPU_MONITORING_INTERVAL}s interval): " f"{total_cpu_percent}% CPU, " f"{process.memory_info().rss / 1024**2} MiB memory. " f"CPU usage by thread (in descending order): {thread_top}" )
[docs] def init_device(self: CbfDevice) -> None: """Initialise the tango device after startup.""" super().init_device() self.init_completed() self.logger.info("Device initialization complete.") # Start monitoring resource usage in separate thread. if ENABLE_DIAGNOSTICS: self._resource_monitor_event = threading.Event() self._resource_monitor_thread = threading.Thread( target=self._monitor_resource_usage, name="CbfDevice._monitor_resource_usage", daemon=True, ) self._resource_monitor_thread.start()
[docs] def delete_device(self: CbfDevice) -> None: """Clean up device at shutdown.""" # Stop resource usage monitoring thread; do not use join() to prevent # blocking device shutdown. if ENABLE_DIAGNOSTICS: self._resource_monitor_event.set() self._resource_monitor_thread = None super().delete_device()
# --- Device Attributes --- #
[docs] def write_adminMode(self: CbfDevice, value: AdminMode) -> None: """ Set the Admin Mode of the device. Overriding SKABaseDevice to submit AdminMode-related component manager methods to task executor. :param value: Admin Mode of the device. :raises ValueError: for disallowed or unknown AdminMode value """ # The data type of the value parameter is treated as an int by Tango when # this function is entered. Therefore, the value parameter cannot be directly # passed to the task without first casting it to AdminMode. value = AdminMode(value) if value in [AdminMode.ENGINEERING, AdminMode.ONLINE]: if ( self._admin_mode == value and self.component_manager.is_communicating ): raise ValueError("Communications already online.") task_status, message = self.submit_lrc_task( name="start_communicating", task=self.component_manager.start_communicating, kwargs={"admin_mode": value}, ) if task_status == TaskStatus.REJECTED: raise ValueError( f"start_communicating {value.name} thread rejected; {message}" ) elif value in [AdminMode.NOT_FITTED, AdminMode.OFFLINE]: if self._admin_mode == value: if ( value == AdminMode.OFFLINE and self.component_manager.is_communicating_disabled ): raise ValueError("Communications already offline.") if ( value == AdminMode.NOT_FITTED and self.component_manager.is_communicating ): raise ValueError( "Communications are online; must be set offline first to set component not fitted" ) task_status, message = self.submit_lrc_task( name="stop_communicating", task=self.component_manager.stop_communicating, kwargs={"admin_mode": value}, ) if task_status == TaskStatus.REJECTED: raise ValueError( f"stop_communicating {value.name} thread rejected; {message}" ) else: raise ValueError(f"Unsupported adminMode {value}")
# --- Callbacks --- # def _admin_mode_perform_action(self: CbfDevice, action: str) -> None: """ Callback provided to the component manager to perform an action on the AdminMode state model. :param action: an action, as given in the transitions table """ self.admin_mode_model.perform_action(action) def _update_command_result( self: CbfDevice, command_id: str, command_result: JSONData, ) -> None: """ Override of SKABaseDevice._update_command_result for logging purposes. :param command_id: unique command identifier :param command_result: command result code and message """ self.logger.info( f"Updating result for command ID: {command_id}, {command_result}" ) super()._update_command_result(command_id, command_result)
[docs] def submit_long_running_command( self: CbfDevice, command_name: str, task: callable, args: list[Any] = None, kwargs: dict[str, Any] = None, started_callback: callable = None, completed_callback: callable = None, ) -> DevVarLongStringArrayType: """ Allocate a long-running command and submit the task to the task executor. :param command_name: LRC name :param task: LRC task, typically a method of the component manager :param args: optional list of positional arguments for the task :param kwargs: optional dict of keyword arguments for the task :param started_callback: optional callback to call when the task's status is set to TaskStatus.IN_PROGRESS :param completed_callback: optional callback to call when the task's status is set to TaskStatus.COMPLETED :return: tuple containing a result code and either a unique command identifier or a message """ command_id, task_callback = self.allocate_lrc( name=command_name, started_callback=started_callback, completed_callback=completed_callback, ) status, message = self.submit_lrc_task( name=command_name, task=task, args=args, kwargs=kwargs, task_callback=task_callback, ) return self.convert_submission_result_to_lrc_return( command_id, status, message )
# --- Run Device Server --- # def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: keyword arguments :return: exit code """ raise TypeError( "CbfDevice is an Abstract Class; device server cannot be run." ) if __name__ == "__main__": main()