# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from typing import Any, List
import orjson
# tango imports
import tango
from ska_control_model import AdminMode, HealthState, ObsMode, PowerState
from ska_tango_testing import context
from ska_mid_cbf_mcs.base.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const
__all__ = ["FspUnitComponentManager"]
[docs]
class FspUnitComponentManager(CbfComponentManager):
"""
Component manager for Fsp Unit.
"""
def __init__(
self: FspUnitComponentManager,
*args: Any,
fsp_fqdn: str,
fhs_host_controller_fqdns: List[str],
device_id: int,
**kwargs: Any,
) -> None:
"""
Initialize a new instance.
:param fsp_fqdn: FQDN of the Fsp device
:param fsp_mode_controller_fqdns: FQDN of the FSPModeManagementController devices
:param fhs_host_controller_fqdns: FQDNs of the FHSHostController devices
"""
# supply operating state machine trigger keywords
super().__init__(*args, **kwargs)
# --- Attribute Values --- #
# HealthState of the hardware
self.health_state_hw = HealthState.UNKNOWN
self._fsp_fqdn = fsp_fqdn
self._fhs_host_controller_fqdns = fhs_host_controller_fqdns
self._device_id = device_id
self.proxy_dict = {
const.FHS_HOST_CONTROLLER: {},
const.FSP: {},
}
def _create_device_proxy(
self: FspUnitComponentManager,
) -> None:
"""Initialize device proxies."""
self.logger.info(f"Creating proxy to {self._fsp_fqdn}")
try:
self.proxy_dict[const.FSP] = context.DeviceProxy(
device_name=self._fsp_fqdn
)
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to connect to {self._fsp_fqdn}; {dev_failed}"
)
return False
# TODO: FHS host devices are not implemented nor are they simulated
# they are not in scope for M1 as they are hardware monitoring and control
# Therefore, the connection is temporary commented out
#
# for device_id, fqdn in enumerate(self._fhs_host_controller_fqdns, 1):
# try:
# proxy = context.DeviceProxy(device_name=fqdn)
# self.proxy_dict[const.FHS_HOST_CONTROLLER][device_id] = proxy
# except tango.DevFailed as dev_failed:
# self.logger.error(f"Failed to connect to {fqdn}; {dev_failed}")
# return False
return True
[docs]
def start_communicating(
self: FspUnitComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Establish communication with the component, then start monitoring.
"""
self.logger.info(
"Entering fsp_unit_component_manager.start_communicating"
)
success = self._create_device_proxy()
if not success:
self.logger.error("Failed to initialize proxies.")
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
try:
self.proxy_dict[const.FSP].adminMode = admin_mode
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
super().start_communicating(admin_mode=admin_mode)
self._update_component_state(power=PowerState.ON)
[docs]
def stop_communicating(
self: FspUnitComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for stop_communicating operation.
"""
try:
self.proxy_dict[const.FSP].adminMode = admin_mode
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failed to set {self._fsp_fqdn} to {admin_mode}; {dev_failed}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
super().stop_communicating(admin_mode=admin_mode)
def _fhs_host_health_state_callback(
self: FspUnitComponentManager,
health_state: HealthState,
) -> None:
# TODO: Will be implemented in a future story
self.logger.info("Handle fhs host health state")
def _handle_admin_mode_callback(
self: FspUnitComponentManager,
admin_mode: AdminMode,
) -> None:
# TODO: Will be implemented in a future story
self.logger.info("Handle fhs host admin mode")
# --- Resource Status -- #
@property
def resource_status(self) -> str:
"""
Getter Function for resource_status
:return: A JSON string representation of FSP's Resource Status
:rtype: str
"""
return self.get_resource_status()
[docs]
def get_resource_status(self: FspUnitComponentManager) -> str:
"""
Retrieves the the resource status from all available FSP devices and
format it into a ResourceStatus VCC Status Object JSON string.
Retrieves the following from the FSP devices:
* used_by_subarrays: The subarrays that is/are using the FSP
* health_state: HealthState of the FSP
* admin_mode: AdminMode of the FSP
* vcc_unit_id: This VCC Unit's ID
:return: a JSON string
:rtype: str
"""
fsp_rs_dict = {}
fsp_proxy = self.proxy_dict[const.FSP]
fsp_id = int(fsp_proxy.dev_name().split("/")[-1])
fsp_rs_dict[fsp_id] = {}
# The attribute return a ndarray of int64, which is not compatible with
# orjson.dumps
subarray_membership = [int(i) for i in fsp_proxy.subarrayMembership]
fsp_rs_dict[fsp_id]["used_by_subarrays"] = subarray_membership
fsp_rs_dict[fsp_id]["fsp_mode"] = ObsMode(fsp_proxy.obsMode).name
fsp_rs_dict[fsp_id]["health_state"] = HealthState(
fsp_proxy.healthState
).name
fsp_rs_dict[fsp_id]["admin_mode"] = AdminMode(fsp_proxy.adminMode).name
fsp_rs_dict[fsp_id]["fsp_unit_id"] = self._device_id
return orjson.dumps(
fsp_rs_dict, option=orjson.OPT_NON_STR_KEYS
).decode()