Source code for ska_mid_cbf_mcs.subarray.subarray_device

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from typing import cast

import orjson
from ska_control_model import ObsState, ObsStateModel
from ska_tango_base.base.base_device import DevVarLongStringArrayType
from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from tango import AttrWriteType
from tango.server import command, device_property

from ska_mid_cbf_mcs.base.obs.obs_device import CbfObsDevice
from ska_mid_cbf_mcs.commons.global_enum import const, freq_band_dict
from ska_mid_cbf_mcs.subarray.subarray_component_manager import (
    CbfSubarrayComponentManager,
)


[docs] class CbfSubarray(CbfObsDevice): """ CbfSubarray TANGO device class for the prototype """ # ----------------------------------------------------------------------- # # Attributes # # ----------------------------------------------------------------------- # # --- Device Properties --- # ControllerFQDN = device_property( doc="Fully Qualified Domain Name (FQDN) for the MCS CbfController device.", dtype=str, mandatory=True, ) VccAllBandsFQDNs = device_property( doc="Fully Qualified Domain Names (FQDNs) for all of the FHS VCCAllBandsController devices.", dtype=[str], mandatory=True, ) FspFQDNs = device_property( doc="Fully Qualified Domain Names (FQDNs) for all of the MCS Fsp devices.", dtype=[str], mandatory=True, ) FspCorrSubarrayFQDNs = device_property( doc="Fully Qualified Domain Names (FQDNs) for the MCS FspCorrSubarray devices for this subarray.", dtype=[str], mandatory=True, ) # --- Device Attributes & Signals --- # delay_model_signal: AttrSignal[str] = AttrSignal( stored=True, initial_value=orjson.dumps({"start_validity_sec": 0}).decode(), ) """ Signal for the delayModel attribute. Values are emitted for this signal whenever a client changes the attribute. """ obs_state_signal: AttrSignal[ObsState] = AttrSignal( stored=True, initial_value=ObsState.EMPTY ) """ Signal for the obsState attribute. Values are emitted for this signal whenever a client changes the attribute. """ frequency_band_signal: AttrSignal[int] = AttrSignal( stored=True, initial_value=0 ) """ Signal for the frequencyBand attribute. Values are emitted for this signal whenever a client changes the attribute. """ dish_ids_signal: AttrSignal[list[str]] = AttrSignal( stored=True, initial_value=[] ) """ Signal for the dishIDs attribute. Values are emitted for this signal whenever a client changes the attribute. """ vcc_ids_signal: AttrSignal[list[int]] = AttrSignal( stored=True, initial_value=[] ) """ Signal for the assignedVCCs attribute. Values are emitted for this signal whenever a client changes the attribute. """ fsp_ids_signal: AttrSignal[list[int]] = AttrSignal( stored=True, initial_value=[] ) """ Signal for the assignedFSPs attribute. Values are emitted for this signal whenever a client changes the attribute. """ sys_param_signal: AttrSignal[str] = AttrSignal( stored=True, initial_value="" ) """ Signal for the sysParam attribute. Values are emitted for this signal whenever a client changes the attribute. """ frequencyBand: attribute_from_signal = attribute_from_signal( frequency_band_signal, access=AttrWriteType.READ, dtype="DevEnum", description="Frequency band of the subarray (defaults to 1).", enum_labels=[key for key in freq_band_dict.keys()], ) """frequencyBand device attribute""" dishIDs: attribute_from_signal = attribute_from_signal( dish_ids_signal, access=AttrWriteType.READ, dtype=[str], description="Sorted list of DISH IDs assigned to subarray.", max_dim_x=const.MAX_VCC, ) """dishIDs device attribute""" assignedVCCs: attribute_from_signal = attribute_from_signal( vcc_ids_signal, access=AttrWriteType.READ, dtype=[int], description="Sorted list of VCC IDs assigned to subarray.", max_dim_x=const.MAX_VCC, rel_change=0.1, ) """assignedVCCs device attribute""" assignedFSPs: attribute_from_signal = attribute_from_signal( fsp_ids_signal, access=AttrWriteType.READ, dtype=[int], description="Sorted list of FSP IDs assigned to subarray.", max_dim_x=const.MAX_FSP, rel_change=0.1, ) """assignedFSPs device attribute""" sysParam: attribute_from_signal = attribute_from_signal( sys_param_signal, access=AttrWriteType.READ_WRITE, dtype=str, description=( "JSON-formatted string containing system parameters. " "Should not be written by components external to Mid.CBF. " "To set the system parameters, refer to the CbfController Tango Commands: " "https://developer.skao.int/projects/ska-mid-cbf-mcs/en/latest/guide/interfaces/lmc_mcs_interface.html#cbfcontroller-tango-commands or the CbfController api docs at https://developer.skao.int/projects/ska-mid-cbf-mcs/en/latest/api/controller/controller_device.html" ), memorized=True, hw_memorized=True, ) """sysParam device attribute""" # --- Device Commands --- # assign_resources_name = "AssignResources" release_resources_name = "ReleaseResources" release_all_resources_name = "ReleaseAllResources" restart_name = "Restart" # ----------------------------------------------------------------------- # # Methods # # ----------------------------------------------------------------------- # # --- Device Attributes --- #
[docs] def write_sysParam(self: CbfSubarray, value: str) -> None: """ Update the subarray's system parameters :param value: JSON-formatted string containing system parameters """ self.sys_param_signal = value self.logger.info( f"Updated DISH ID to VCC ID and frequency offset k mapping: {value}" )
sysParam.write(write_sysParam) # --- Device Commands --- #
[docs] def is_AssignResources_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if AssignResources is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed( self.assign_resources_name, [ObsState.EMPTY, ObsState.IDLE] )
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-assignresources schema containing a List of DISH (receptor) IDs", # noqa: E501 dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def AssignResources( self: CbfSubarray, params: str ) -> DevVarLongStringArrayType: """ Assign input dishIDs to this subarray. Set subarray to ObsState.IDLE if no dishIDs were previously assigned, i.e. subarray was previously in ObsState.EMPTY. :param params: JSON string conforming to the ska-mid-cbf-assignresources schema containing a list of DISH (receptor) IDs to add :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ return self.submit_long_running_command( command_name=self.assign_resources_name, task=self.component_manager.assign_vcc, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("assign_invoked"), completed_callback=lambda: self.obs_state_action( "assign_completed" ), )
[docs] def is_ReleaseResources_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if ReleaseResources is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed( self.release_resources_name, [ObsState.IDLE] )
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-releaseresources schema containing a List of DISH (receptor) IDs", # noqa: E501 dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def ReleaseResources( self: CbfSubarray, params: str ) -> DevVarLongStringArrayType: """ Remove input from list of assigned dishIDs. Set subarray to ObsState.EMPTY if no dishIDs assigned. :param params: JSON string conforming to the ska-mid-cbf-assignresources schema containing a List of DISH (receptor) IDs to release :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ return self.submit_long_running_command( command_name=self.release_resources_name, task=self.component_manager.release_vcc, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("release_invoked"), completed_callback=lambda: self.obs_state_action( "release_completed" ), )
[docs] def is_ReleaseAllResources_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if ReleaseAllResources is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed( self.release_all_resources_name, [ObsState.IDLE] )
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-releaseallresources schema", # noqa: E501 dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def ReleaseAllResources( self: CbfSubarray, params: str ) -> DevVarLongStringArrayType: """ Remove all assigned dishIDs. Set subarray to ObsState.EMPTY if no dishIDs assigned. :param params: JSON string conforming to the ska-mid-cbf-releasedalresources schema :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ return self.submit_long_running_command( command_name=self.release_all_resources_name, task=self.component_manager.release_all_vcc, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("release_invoked"), completed_callback=lambda: self.obs_state_action( "release_completed" ), )
[docs] def is_Restart_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if Restart is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True # SKB-796: continue with command execution even when CommunicationStatus # is not ESTABLISHED, so that the full command can be executed (even if only # partially successful) in order to hit all obs state model triggers self.logger.debug(f"Checking if {self.restart_name} is allowed.") if not self.component_manager.is_communicating: self.logger.warning("Continuing with Restart command.") if self.obs_state_signal not in [ObsState.ABORTED, ObsState.FAULT]: self.logger.warning( f"{self.restart_name} not allowed in ObsState {self.obs_state_signal}" ) return False return True
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-restart schema", dtype_out="DevVarLongStringArray", ) def Restart(self: CbfSubarray, params: str) -> DevVarLongStringArrayType: """ Restart the observing device from a FAULT/ABORTED obsState to EMPTY. :param params: "JSON string conforming to the ska-mid-cbf-restart schema :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. """ return self.submit_long_running_command( command_name=self.restart_name, task=self.component_manager.restart, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("restart_invoked"), completed_callback=lambda: self.obs_state_action( "restart_completed" ), )
# --- Device Initialization --- # def _init_state_model(self: CbfSubarray) -> None: """Set up the state model for the device.""" super(CbfObsDevice, self)._init_state_model() # CbfSubarray uses the full observing state model self.obs_state_model = ObsStateModel( logger=self.logger, callback=lambda value: setattr(self, "obs_state_signal", value), )
[docs] def create_component_manager( self: CbfSubarray, ) -> CbfSubarrayComponentManager: """ Create and return a subarray component manager. :return: a subarray component manager """ self.logger.debug("Entering CbfSubarray.create_component_manager()") return CbfSubarrayComponentManager( device=self, subarray_id=int(self.DeviceID), controller=self.ControllerFQDN, vcc_all_bands=self.VccAllBandsFQDNs, fsp=self.FspFQDNs, fsp_corr_sub=self.FspCorrSubarrayFQDNs, logger=self.logger, health_state_callback=self._update_health_state, communication_state_callback=self._communication_state_changed, component_state_callback=self._component_state_changed, admin_mode_callback=self._admin_mode_perform_action, )
# --- Run Device Server --- # def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: keyword arguments :return: exit code """ return cast(int, CbfSubarray.run_server(args=args or None, **kwargs)) if __name__ == "__main__": main()