Source code for ska_mid_cbf_mcs.base.obs.obs_device

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

"""
CbfObsDevice

Generic observing device for Mid.CBF
"""

from __future__ import annotations

from typing import Optional

from ska_control_model import ObsState, ObsStateModel, PowerState
from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from ska_tango_base.type_hints import DevVarLongStringArrayType
from tango import AttrWriteType
from tango.server import command

from ska_mid_cbf_mcs.base.base_device import CbfDevice
from ska_mid_cbf_mcs.base.obs.obs_state_machine import (
    CbfSubElementObsStateMachine,
)


[docs] class CbfObsDevice(CbfDevice): """ A generic base observing device for Mid.CBF. Extends CbfDevice to implement key values for observing devices. """ # ----------------------------------------------------------------------- # # Attributes # # ----------------------------------------------------------------------- # # --- Device Attributes & Signals --- # obs_state_signal: AttrSignal[ObsState] = AttrSignal( stored=True, initial_value=ObsState.IDLE ) """ Signal for the obsState attribute. Values are emitted for this signal whenever a client changes the attribute. """ scan_id_signal: AttrSignal[int] = AttrSignal(stored=True, initial_value=0) """ Signal for the scanID attribute. Values are emitted for this signal whenever a client changes the attribute. """ configuration_id_signal: AttrSignal[str] = AttrSignal( stored=True, initial_value="" ) """ Signal for the configurationID attribute. Values are emitted for this signal whenever a client changes the attribute. """ delay_model_signal: AttrSignal[str] = AttrSignal( stored=True, initial_value="" ) """ Signal for the delayModel attribute. Values are emitted for this signal whenever a client changes the attribute. """ last_scan_configuration_signal: AttrSignal[str] = AttrSignal( stored=True, initial_value="" ) """ Signal for the lastScanConfiguration attribute. Values are emitted for this signal whenever a client changes the attribute. """ obsState: attribute_from_signal = attribute_from_signal( obs_state_signal, access=AttrWriteType.READ, dtype="DevEnum", enum_labels=[state.name for state in ObsState], description=( "The observing state of the device. " "Subarray overrides CbfObsDevice to implement ObsState.EMPTY." ), ) """obsState device attribute""" scanID: attribute_from_signal = attribute_from_signal( scan_id_signal, access=AttrWriteType.READ, dtype=int, description="The current scan ID of the device.", rel_change=0.1, ) """scanID device attribute""" configurationID: attribute_from_signal = attribute_from_signal( configuration_id_signal, access=AttrWriteType.READ, dtype=str, description="The current configuration ID of the device.", ) """configurationID device attribute""" delayModel: attribute_from_signal = attribute_from_signal( delay_model_signal, access=AttrWriteType.READ, dtype=str, description="The current delay model used by the device.", ) """delayModel device attribute""" lastScanConfiguration: attribute_from_signal = attribute_from_signal( last_scan_configuration_signal, access=AttrWriteType.READ, dtype=str, description=( "The last valid scan configuration of the device. " "Deliberately left in Tango layer, outside of component manager." ), ) """lastScanConfiguration device attribute""" # --- Device Commands --- # configure_scan_name = "ConfigureScan" scan_name = "Scan" end_scan_name = "EndScan" go_to_idle_name = "GoToIdle" obs_reset_name = "ObsReset" abort_name = "Abort" # ----------------------------------------------------------------------- # # Methods # # ----------------------------------------------------------------------- # # --- Device Commands --- #
[docs] def check_obs_command_allowed( self: CbfObsDevice, command_name: str, allowed_obs_states: list[ObsState], ) -> bool: """ Check if an observing command is allowed. :param command_name: name of command to check :param allowed_obs_states: list of allowed observing states :return: True if allowed, else False. """ self.logger.info(f"Checking if {command_name} is allowed.") if not self.component_manager.is_communicating: return False if self.obs_state_signal not in allowed_obs_states: self.logger.warning( f"{command_name} not allowed in {ObsState(self.obs_state_signal)}" ) return False return True
[docs] def is_ConfigureScan_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if ConfigureScan is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed( self.configure_scan_name, [ObsState.IDLE, ObsState.READY] )
[docs] @command( dtype_in="DevString", doc_in="JSON formatted string with the scan configuration.", dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def ConfigureScan( self: CbfObsDevice, params: str ) -> DevVarLongStringArrayType: """ Configure the observing device parameters for the current scan. :return: tuple containing a return code and a unique command identifier """ # store configuration in Tango layer self.last_scan_configuration_signal = params return self.submit_long_running_command( command_name=self.configure_scan_name, task=self.component_manager.configure_scan, kwargs={"params": params}, started_callback=lambda: self.obs_state_action( "configure_invoked" ), completed_callback=lambda: self.obs_state_action( "configure_completed" ), )
[docs] def is_Scan_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if Scan is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed(self.scan_name, [ObsState.READY])
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-csp-Scan schema", dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def Scan(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType: """ Start an observing scan. :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name=self.scan_name, task=self.component_manager.scan, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("scan_invoked"), )
[docs] def is_EndScan_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if EndScan is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed( self.end_scan_name, [ObsState.SCANNING] )
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-csp-endscan schema", dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def EndScan(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType: """ End a running scan. :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name=self.end_scan_name, task=self.component_manager.end_scan, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("end_scan_invoked"), )
[docs] def is_GoToIdle_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if GoToIdle is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True return self.check_obs_command_allowed( self.go_to_idle_name, [ObsState.READY] )
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-gotoidle schema", dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def GoToIdle(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType: """ Transit the device from READY to IDLE obsState. To keep in line with LMC, using "GoToIdle" rather than the SKA base class equivalent "End". :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name=self.go_to_idle_name, task=self.component_manager.go_to_idle, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("end_invoked"), )
[docs] def is_ObsReset_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if ObsReset is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True # SKB-796: continue with command execution even when CommunicationStatus # is not ESTABLISHED, so that the full command can be executed (even if only # partially successful) in order to hit all obs state model triggers self.logger.debug(f"Checking if {self.obs_reset_name} is allowed.") if not self.component_manager.is_communicating: self.logger.warning( f"Continuing with {self.obs_reset_name} command." ) if self.obs_state_signal not in [ObsState.ABORTED, ObsState.FAULT]: self.logger.warning( f"{self.obs_reset_name} not allowed in ObsState {self.obs_state_signal}" ) return False return True
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-obsreset schema", dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def ObsReset(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType: """ Reset the observing device from a FAULT/ABORTED obsState to IDLE. :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name=self.obs_reset_name, task=self.component_manager.obs_reset, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("obsreset_invoked"), completed_callback=lambda: self.obs_state_action( "obsreset_completed" ), )
[docs] def is_Abort_allowed( self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ ) -> bool: """ Check if Abort is allowed. :param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked. :return: True if allowed, else False. """ if request_type is LRCReqType.ENQUEUE_REQ: return True self.logger.info(f"Checking if {self.abort_name} is allowed.") # SKB-796: continue with command execution even when CommunicationStatus # is not ESTABLISHED, so that the full command can be executed (even if only # partially successful) in order to hit all obs state model triggers if not self.component_manager.is_communicating: self.logger.warning(f"Continuing with {self.abort_name} command.") if self.obs_state_signal in [ ObsState.EMPTY, ObsState.FAULT, ObsState.ABORTED, ObsState.ABORTING, ObsState.RESTARTING, ]: self.logger.warning( f"{self.abort_name} not allowed in ObsState {self.obs_state_signal}" ) return False return True
[docs] @command( dtype_in="DevString", doc_in="JSON string conforming to the ska-mid-cbf-abort schema", dtype_out="DevVarLongStringArray", doc_out=( "A tuple containing a return code and a string message " "indicating status. The message is for information purpose " "only." ), ) def Abort(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType: """ Abort the current observing process and move to ABORTED obsState. :return: tuple containing a return code and a unique command identifier """ return self.submit_long_running_command( command_name=self.abort_name, task=self.component_manager.abort, kwargs={"params": params}, started_callback=lambda: self.obs_state_action("abort_invoked"), completed_callback=lambda: self.obs_state_action( "abort_completed" ), )
# --- Callbacks --- #
[docs] def obs_state_action(self: CbfObsDevice, action: str) -> None: """ Callback provided to command tracker to drive the obs state model into transitioning states during the relevant command's submitted thread. :param action: the observing state model action to perform """ self.obs_state_model.perform_action(action) self.logger.info( f"ObsState after {action}: {ObsState(self.obs_state_signal)}" )
def _component_state_changed( self: CbfObsDevice, fault: Optional[bool] = None, power: Optional[PowerState] = None, resourced: Optional[bool] = None, configured: Optional[bool] = None, scanning: Optional[bool] = None, obsfault: Optional[bool] = None, ) -> None: super()._component_state_changed(fault=fault, power=power) if resourced is not None: if resourced: self.obs_state_action("component_resourced") else: self.obs_state_action("component_unresourced") if configured is not None: if configured: self.obs_state_action("component_configured") else: self.obs_state_action("component_unconfigured") if scanning is not None: if scanning: self.obs_state_action("component_scanning") else: self.obs_state_action("component_not_scanning") if obsfault is not None: if obsfault: self.obs_state_action("component_obsfault") # NOTE: to recover from obsfault, ObsReset or Restart must be invoked # --- Device Initialization --- # def _init_state_model(self: CbfObsDevice) -> None: """Set up the state model for the device.""" super()._init_state_model() # CbfObsDevice uses the reduced observing state machine defined above self.obs_state_model = ObsStateModel( logger=self.logger, callback=lambda value: setattr(self, "obs_state_signal", value), state_machine_factory=CbfSubElementObsStateMachine, )
# --- Run Device Server --- # def main(*args: str, **kwargs: str) -> int: """ Entry point for module. :param args: positional arguments :param kwargs: keyword arguments :return: exit code """ raise TypeError( "CbfObsDevice is an Abstract Class; device server cannot be run." ) if __name__ == "__main__": main()