# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from typing import cast
from ska_control_model import ObsMode
from ska_tango_base.base.base_device import DevVarLongStringArrayType
from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from tango import AttrWriteType
from tango.server import command, device_property
from ska_mid_cbf_mcs.base.base_device import CbfDevice
from ska_mid_cbf_mcs.commons.global_enum import const
from ska_mid_cbf_mcs.fsp.fsp_component_manager import FspComponentManager
[docs]
class Fsp(CbfDevice):
"""
Fsp TANGO device class for the prototype
"""
# ----------------------------------------------------------------------- #
# Attributes #
# ----------------------------------------------------------------------- #
# --- Device Properties --- #
FspCorrSubarrayFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for all of the MCS FspCorrSubarray devices for this FSP.",
dtype=[str],
mandatory=True,
)
FspPstSubarrayFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for all of the MCS FspPstSubarray devices for this FSP.",
dtype=[str],
mandatory=True,
)
FhsFspModeControllerFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for all of the FHS FSPModeManagementController devices for this FSP.",
dtype=[str],
mandatory=True,
)
FhsFspCorrControllerFQDNs = device_property(
doc="Fully Qualified Domain Names (FQDNs) for all of the FHS FspCorrController devices for this FSP.",
dtype=("str",),
)
# --- Device Attributes & Signals --- #
obs_mode_signal: AttrSignal[ObsMode] = AttrSignal(
stored=True, initial_value=ObsMode.IDLE
)
"""
Signal for the obsMode attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
subarray_membership_signal: AttrSignal[list[int]] = AttrSignal(
stored=True, initial_value=[]
)
"""
Signal for the subarrayMembership attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
corner_turner_started_signal: AttrSignal[bool] = AttrSignal(
stored=True, initial_value=False
)
"""
Signal for the cornerTurnerStarted attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
obsMode: attribute_from_signal = attribute_from_signal(
obs_mode_signal,
access=AttrWriteType.READ,
dtype="DevEnum",
enum_labels=[mode.name for mode in ObsMode],
description="The observing mode of the device.",
)
"""obsMode device attribute"""
subarrayMembership: attribute_from_signal = attribute_from_signal(
subarray_membership_signal,
access=AttrWriteType.READ,
dtype=[int],
description="Sorted list of member subarrays.",
max_dim_x=const.MAX_SUBARRAY,
rel_change=0.1,
)
"""subarrayMembership device attribute"""
cornerTurnerStarted: attribute_from_signal = attribute_from_signal(
corner_turner_started_signal,
access=AttrWriteType.READ,
dtype=bool,
description="Boolean has Corner Turner Started.",
)
"""cornerTurnerStarted device attribute"""
# ----------------------------------------------------------------------- #
# Methods #
# ----------------------------------------------------------------------- #
# --- Device Commands --- #
[docs]
def is_UpdateObsMode_allowed(
self: Fsp, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if UpdateObsMode is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
self.logger.debug("Checking if UpdateObsMode is allowed")
if not self.component_manager.is_communicating:
return False
return True
[docs]
@command(
dtype_in="str",
dtype_out="DevVarLongStringArray",
doc_in="FSP function mode",
)
def UpdateObsMode(
self: Fsp, obs_mode_config: str
) -> DevVarLongStringArrayType:
"""
Used by subarray devices to update the observing mode and subarrayMembership
of this FSP.
If the requested ObsMode is not IDLE, observing mode will only be set if
this FSP is currently in ObsMode.IDLE or already in the requested ObsMode.
Subarray ID will only be added to subarrayMembership if the requesting
subarray has not already been added.
If IDLE is requested, the observing mode will only be set if this FSP does
not belong to any other subarrays.
Whether or not the observing mode is set to IDLE, the subarray ID will be
removed from subarrayMembership.
:param obs_mode_config: a JSON-formatted string containing a subarray ID
to be added to membership and the ObsMode of the requested configuration.
:return: tuple containing a return code and a unique command identifier
"""
return self.submit_long_running_command(
command_name="UpdateObsMode",
task=self.component_manager.update_obs_mode,
kwargs={"obs_mode_config": obs_mode_config},
)
# --- Device Initialization --- #
[docs]
def create_component_manager(self: Fsp) -> FspComponentManager:
"""
Create and return a component manager for this device.
:return: a component manager for this device.
"""
self.logger.debug("Entering create_component_manager()")
return FspComponentManager(
device=self,
fsp_id=self.DeviceID,
all_fsp_corr_subarray_fqdn=self.FspCorrSubarrayFQDNs,
all_fsp_pst_subarray_fqdn=self.FspPstSubarrayFQDNs,
fhs_fsp_mode_controller_fqdns=self.FhsFspModeControllerFQDNs,
fhs_fsp_corr_controller_fqdns=self.FhsFspCorrControllerFQDNs,
logger=self.logger,
health_state_callback=self._update_health_state,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed,
admin_mode_callback=self._admin_mode_perform_action,
)
# --- Run Device Server --- #
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: keyword arguments
:return: exit code
"""
return cast(int, Fsp.run_server(args=args or None, **kwargs))
if __name__ == "__main__":
main()