# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from typing import cast
import orjson
from ska_control_model import ObsState, ObsStateModel
from ska_tango_base.base.base_device import DevVarLongStringArrayType
from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from tango import AttrWriteType
from tango.server import command, device_property
from ska_mid_cbf_mcs.base.obs.obs_device import CbfObsDevice
from ska_mid_cbf_mcs.commons.global_enum import const, freq_band_dict
from ska_mid_cbf_mcs.subarray.subarray_component_manager import (
CbfSubarrayComponentManager,
)
[docs]
class CbfSubarray(CbfObsDevice):
"""
CbfSubarray TANGO device class for the prototype
"""
# ----------------------------------------------------------------------- #
# Attributes #
# ----------------------------------------------------------------------- #
# --- Device Properties --- #
ControllerFQDN = device_property(
doc="Fully Qualified Domain Name (FQDN) for the MCS CbfController device.",
dtype=str,
mandatory=True,
)
VccAllBandsFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for all of the FHS VCCAllBandsController devices.",
dtype=[str],
mandatory=True,
)
FspFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for all of the MCS Fsp devices.",
dtype=[str],
mandatory=True,
)
FspCorrSubarrayFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for the MCS FspCorrSubarray devices for this subarray.",
dtype=[str],
mandatory=True,
)
# --- Device Attributes & Signals --- #
delay_model_signal: AttrSignal[str] = AttrSignal(
stored=True,
initial_value=orjson.dumps({"start_validity_sec": 0}).decode(),
)
"""
Signal for the delayModel attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
obs_state_signal: AttrSignal[ObsState] = AttrSignal(
stored=True, initial_value=ObsState.EMPTY
)
"""
Signal for the obsState attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
frequency_band_signal: AttrSignal[int] = AttrSignal(
stored=True, initial_value=0
)
"""
Signal for the frequencyBand attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
dish_ids_signal: AttrSignal[list[str]] = AttrSignal(
stored=True, initial_value=[]
)
"""
Signal for the dishIDs attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
vcc_ids_signal: AttrSignal[list[int]] = AttrSignal(
stored=True, initial_value=[]
)
"""
Signal for the assignedVCCs attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
fsp_ids_signal: AttrSignal[list[int]] = AttrSignal(
stored=True, initial_value=[]
)
"""
Signal for the assignedFSPs attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
sys_param_signal: AttrSignal[str] = AttrSignal(
stored=True, initial_value=""
)
"""
Signal for the sysParam attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
frequencyBand: attribute_from_signal = attribute_from_signal(
frequency_band_signal,
access=AttrWriteType.READ,
dtype="DevEnum",
description="Frequency band of the subarray (defaults to 1).",
enum_labels=[key for key in freq_band_dict.keys()],
)
"""frequencyBand device attribute"""
dishIDs: attribute_from_signal = attribute_from_signal(
dish_ids_signal,
access=AttrWriteType.READ,
dtype=[str],
description="Sorted list of DISH IDs assigned to subarray.",
max_dim_x=const.MAX_VCC,
)
"""dishIDs device attribute"""
assignedVCCs: attribute_from_signal = attribute_from_signal(
vcc_ids_signal,
access=AttrWriteType.READ,
dtype=[int],
description="Sorted list of VCC IDs assigned to subarray.",
max_dim_x=const.MAX_VCC,
rel_change=0.1,
)
"""assignedVCCs device attribute"""
assignedFSPs: attribute_from_signal = attribute_from_signal(
fsp_ids_signal,
access=AttrWriteType.READ,
dtype=[int],
description="Sorted list of FSP IDs assigned to subarray.",
max_dim_x=const.MAX_FSP,
rel_change=0.1,
)
"""assignedFSPs device attribute"""
sysParam: attribute_from_signal = attribute_from_signal(
sys_param_signal,
access=AttrWriteType.READ_WRITE,
dtype=str,
description=(
"JSON-formatted string containing system parameters. "
"Should not be written by components external to Mid.CBF. "
"To set the system parameters, refer to the CbfController Tango Commands: "
"https://developer.skao.int/projects/ska-mid-cbf-mcs/en/latest/guide/interfaces/lmc_mcs_interface.html#cbfcontroller-tango-commands or the CbfController api docs at https://developer.skao.int/projects/ska-mid-cbf-mcs/en/latest/api/controller/controller_device.html"
),
memorized=True,
hw_memorized=True,
)
"""sysParam device attribute"""
# --- Device Commands --- #
assign_resources_name = "AssignResources"
release_resources_name = "ReleaseResources"
release_all_resources_name = "ReleaseAllResources"
restart_name = "Restart"
# ----------------------------------------------------------------------- #
# Methods #
# ----------------------------------------------------------------------- #
# --- Device Attributes --- #
[docs]
def write_sysParam(self: CbfSubarray, value: str) -> None:
"""
Update the subarray's system parameters
:param value: JSON-formatted string containing system parameters
"""
self.sys_param_signal = value
self.logger.info(
f"Updated DISH ID to VCC ID and frequency offset k mapping: {value}"
)
sysParam.write(write_sysParam)
# --- Device Commands --- #
[docs]
def is_AssignResources_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if AssignResources is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
return self.check_obs_command_allowed(
self.assign_resources_name, [ObsState.EMPTY, ObsState.IDLE]
)
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-assignresources schema containing a List of DISH (receptor) IDs", # noqa: E501
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def AssignResources(
self: CbfSubarray, params: str
) -> DevVarLongStringArrayType:
"""
Assign input dishIDs to this subarray.
Set subarray to ObsState.IDLE if no dishIDs were previously assigned,
i.e. subarray was previously in ObsState.EMPTY.
:param params: JSON string conforming to the ska-mid-cbf-assignresources
schema containing a list of DISH (receptor) IDs to add
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
return self.submit_long_running_command(
command_name=self.assign_resources_name,
task=self.component_manager.assign_vcc,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("assign_invoked"),
completed_callback=lambda: self.obs_state_action(
"assign_completed"
),
)
[docs]
def is_ReleaseResources_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if ReleaseResources is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
return self.check_obs_command_allowed(
self.release_resources_name, [ObsState.IDLE]
)
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-releaseresources schema containing a List of DISH (receptor) IDs", # noqa: E501
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def ReleaseResources(
self: CbfSubarray, params: str
) -> DevVarLongStringArrayType:
"""
Remove input from list of assigned dishIDs.
Set subarray to ObsState.EMPTY if no dishIDs assigned.
:param params: JSON string conforming to the ska-mid-cbf-assignresources schema
containing a List of DISH (receptor) IDs to release
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
return self.submit_long_running_command(
command_name=self.release_resources_name,
task=self.component_manager.release_vcc,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("release_invoked"),
completed_callback=lambda: self.obs_state_action(
"release_completed"
),
)
[docs]
def is_ReleaseAllResources_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if ReleaseAllResources is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
return self.check_obs_command_allowed(
self.release_all_resources_name, [ObsState.IDLE]
)
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-releaseallresources schema", # noqa: E501
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def ReleaseAllResources(
self: CbfSubarray, params: str
) -> DevVarLongStringArrayType:
"""
Remove all assigned dishIDs.
Set subarray to ObsState.EMPTY if no dishIDs assigned.
:param params: JSON string conforming to the ska-mid-cbf-releasedalresources schema
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
return self.submit_long_running_command(
command_name=self.release_all_resources_name,
task=self.component_manager.release_all_vcc,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("release_invoked"),
completed_callback=lambda: self.obs_state_action(
"release_completed"
),
)
[docs]
def is_Restart_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if Restart is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
# SKB-796: continue with command execution even when CommunicationStatus
# is not ESTABLISHED, so that the full command can be executed (even if only
# partially successful) in order to hit all obs state model triggers
self.logger.debug(f"Checking if {self.restart_name} is allowed.")
if not self.component_manager.is_communicating:
self.logger.warning("Continuing with Restart command.")
if self.obs_state_signal not in [ObsState.ABORTED, ObsState.FAULT]:
self.logger.warning(
f"{self.restart_name} not allowed in ObsState {self.obs_state_signal}"
)
return False
return True
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-restart schema",
dtype_out="DevVarLongStringArray",
)
def Restart(self: CbfSubarray, params: str) -> DevVarLongStringArrayType:
"""
Restart the observing device from a FAULT/ABORTED obsState to EMPTY.
:param params: "JSON string conforming to the ska-mid-cbf-restart schema
:return: A tuple containing a return code and a string message
indicating status. The message is for information purpose
only.
"""
return self.submit_long_running_command(
command_name=self.restart_name,
task=self.component_manager.restart,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("restart_invoked"),
completed_callback=lambda: self.obs_state_action(
"restart_completed"
),
)
# --- Device Initialization --- #
def _init_state_model(self: CbfSubarray) -> None:
"""Set up the state model for the device."""
super(CbfObsDevice, self)._init_state_model()
# CbfSubarray uses the full observing state model
self.obs_state_model = ObsStateModel(
logger=self.logger,
callback=lambda value: setattr(self, "obs_state_signal", value),
)
[docs]
def create_component_manager(
self: CbfSubarray,
) -> CbfSubarrayComponentManager:
"""
Create and return a subarray component manager.
:return: a subarray component manager
"""
self.logger.debug("Entering CbfSubarray.create_component_manager()")
return CbfSubarrayComponentManager(
device=self,
subarray_id=int(self.DeviceID),
controller=self.ControllerFQDN,
vcc_all_bands=self.VccAllBandsFQDNs,
fsp=self.FspFQDNs,
fsp_corr_sub=self.FspCorrSubarrayFQDNs,
logger=self.logger,
health_state_callback=self._update_health_state,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed,
admin_mode_callback=self._admin_mode_perform_action,
)
# --- Run Device Server --- #
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: keyword arguments
:return: exit code
"""
return cast(int, CbfSubarray.run_server(args=args or None, **kwargs))
if __name__ == "__main__":
main()