# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
"""
CbfDevice
Generic Tango device for Mid.CBF
"""
from __future__ import annotations
import os
import threading
from typing import Any
import psutil
from ska_control_model import AdminMode, TaskStatus
from ska_tango_base.base.base_component_manager import JSONData
from ska_tango_base.base.base_device import SKABaseDevice
from ska_tango_base.base.base_interface import BaseInterface
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from ska_tango_base.type_hints import DevVarLongStringArrayType
from tango import AttrWriteType
from tango.server import device_property
# Base device timeouts, used primarily for DeviceProxy accesses.
DEFAULT_TIMEOUT_SHORT = 60
DEFAULT_TIMEOUT_LONG = 180
# Use these to configure optional CPU and memory diagnostics.
ENABLE_DIAGNOSTICS = False
CPU_MONITORING_INTERVAL = 1.0
CPU_MONITORING_THRESHOLD = 80.0
[docs]
class CbfDevice(SKABaseDevice):
"""
A generic base device for Mid.CBF.
Extends SKABaseDevice to override certain key values.
"""
# ----------------------------------------------------------------------- #
# Attributes #
# ----------------------------------------------------------------------- #
# Overriding base interface to acknowledge command object deprecation and
# prevent unnecessary initialization; see SKABaseDevice.
InitCommand = None
On = None
Off = None
Standby = None
Reset = None
is_On_allowed = None
is_Off_allowed = None
is_Standby_allowed = None
is_Reset_allowed = None
execute_On = BaseInterface.execute_On
execute_Off = BaseInterface.execute_Off
execute_Standby = BaseInterface.execute_Standby
execute_Reset = BaseInterface.execute_Reset
# --- Device Properties --- #
DeviceID = device_property(
doc="Integer value for identifying CbfDevice-inheriting devices.",
dtype=int,
mandatory=True,
)
LRCTimeout = device_property(
doc="Timeout to wait for blocking long-running commands (LRCs).",
dtype=str,
default_value=str(DEFAULT_TIMEOUT_LONG),
)
# --- Device Attributes & Signals --- #
health_info_signal: AttrSignal[str] = AttrSignal(
stored=True, initial_value=""
)
"""
Signal for the healthInfo attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
healthInfo: attribute_from_signal = attribute_from_signal(
health_info_signal,
access=AttrWriteType.READ,
dtype=str,
description=(
"HealthState info messages for this device. Reports all errors collected "
"during health state aggregation in a JSON-formatted string."
),
)
"""healthInfo device attribute"""
# ----------------------------------------------------------------------- #
# Methods #
# ----------------------------------------------------------------------- #
# --- Device Initialization --- #
def _monitor_resource_usage(self: CbfDevice) -> None:
"""
Log CPU and memory usage when the process is above a certain CPU threshold,
using `psutils` to calculate CPU usage per thread and `threading` to identify
threads by name.
"""
process = psutil.Process(os.getpid())
while not self._resource_monitor_event.is_set():
total_cpu_percent = process.cpu_percent(CPU_MONITORING_INTERVAL)
if total_cpu_percent >= CPU_MONITORING_THRESHOLD:
total_cpu_time = sum(process.cpu_times())
thread_id_to_name = {
thread.native_id: thread.name
for thread in threading.enumerate()
}
thread_top = [
(
thread_id_to_name.get(thread_info.id, "UNKNOWN"),
f"{total_cpu_percent * ((thread_info.system_time + thread_info.user_time) / total_cpu_time)}%",
)
for thread_info in process.threads()
]
thread_top.sort(
key=lambda value: float(value[1].split("%")[0]),
reverse=True,
)
self.logger.info(
f"Total resource usage ({CPU_MONITORING_INTERVAL}s interval): "
f"{total_cpu_percent}% CPU, "
f"{process.memory_info().rss / 1024**2} MiB memory. "
f"CPU usage by thread (in descending order): {thread_top}"
)
[docs]
def init_device(self: CbfDevice) -> None:
"""Initialise the tango device after startup."""
super().init_device()
self.init_completed()
self.logger.info("Device initialization complete.")
# Start monitoring resource usage in separate thread.
if ENABLE_DIAGNOSTICS:
self._resource_monitor_event = threading.Event()
self._resource_monitor_thread = threading.Thread(
target=self._monitor_resource_usage,
name="CbfDevice._monitor_resource_usage",
daemon=True,
)
self._resource_monitor_thread.start()
[docs]
def delete_device(self: CbfDevice) -> None:
"""Clean up device at shutdown."""
# Stop resource usage monitoring thread; do not use join() to prevent
# blocking device shutdown.
if ENABLE_DIAGNOSTICS:
self._resource_monitor_event.set()
self._resource_monitor_thread = None
super().delete_device()
# --- Device Attributes --- #
[docs]
def write_adminMode(self: CbfDevice, value: AdminMode) -> None:
"""
Set the Admin Mode of the device. Overriding SKABaseDevice to submit
AdminMode-related component manager methods to task executor.
:param value: Admin Mode of the device.
:raises ValueError: for disallowed or unknown AdminMode value
"""
# The data type of the value parameter is treated as an int by Tango when
# this function is entered. Therefore, the value parameter cannot be directly
# passed to the task without first casting it to AdminMode.
value = AdminMode(value)
if value in [AdminMode.ENGINEERING, AdminMode.ONLINE]:
if (
self._admin_mode == value
and self.component_manager.is_communicating
):
raise ValueError("Communications already online.")
task_status, message = self.submit_lrc_task(
name="start_communicating",
task=self.component_manager.start_communicating,
kwargs={"admin_mode": value},
)
if task_status == TaskStatus.REJECTED:
raise ValueError(
f"start_communicating {value.name} thread rejected; {message}"
)
elif value in [AdminMode.NOT_FITTED, AdminMode.OFFLINE]:
if self._admin_mode == value:
if (
value == AdminMode.OFFLINE
and self.component_manager.is_communicating_disabled
):
raise ValueError("Communications already offline.")
if (
value == AdminMode.NOT_FITTED
and self.component_manager.is_communicating
):
raise ValueError(
"Communications are online; must be set offline first to set component not fitted"
)
task_status, message = self.submit_lrc_task(
name="stop_communicating",
task=self.component_manager.stop_communicating,
kwargs={"admin_mode": value},
)
if task_status == TaskStatus.REJECTED:
raise ValueError(
f"stop_communicating {value.name} thread rejected; {message}"
)
else:
raise ValueError(f"Unsupported adminMode {value}")
# --- Callbacks --- #
def _admin_mode_perform_action(self: CbfDevice, action: str) -> None:
"""
Callback provided to the component manager to perform an action on the
AdminMode state model.
:param action: an action, as given in the transitions table
"""
self.admin_mode_model.perform_action(action)
def _update_command_result(
self: CbfDevice,
command_id: str,
command_result: JSONData,
) -> None:
"""
Override of SKABaseDevice._update_command_result for logging purposes.
:param command_id: unique command identifier
:param command_result: command result code and message
"""
self.logger.info(
f"Updating result for command ID: {command_id}, {command_result}"
)
super()._update_command_result(command_id, command_result)
[docs]
def submit_long_running_command(
self: CbfDevice,
command_name: str,
task: callable,
args: list[Any] = None,
kwargs: dict[str, Any] = None,
started_callback: callable = None,
completed_callback: callable = None,
) -> DevVarLongStringArrayType:
"""
Allocate a long-running command and submit the task to the task executor.
:param command_name: LRC name
:param task: LRC task, typically a method of the component manager
:param args: optional list of positional arguments for the task
:param kwargs: optional dict of keyword arguments for the task
:param started_callback: optional callback to call when the task's status
is set to TaskStatus.IN_PROGRESS
:param completed_callback: optional callback to call when the task's status
is set to TaskStatus.COMPLETED
:return: tuple containing a result code and either a unique command identifier
or a message
"""
command_id, task_callback = self.allocate_lrc(
name=command_name,
started_callback=started_callback,
completed_callback=completed_callback,
)
status, message = self.submit_lrc_task(
name=command_name,
task=task,
args=args,
kwargs=kwargs,
task_callback=task_callback,
)
return self.convert_submission_result_to_lrc_return(
command_id, status, message
)
# --- Run Device Server --- #
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: keyword arguments
:return: exit code
"""
raise TypeError(
"CbfDevice is an Abstract Class; device server cannot be run."
)
if __name__ == "__main__":
main()