# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
"""
CbfObsDevice
Generic observing device for Mid.CBF
"""
from __future__ import annotations
from typing import Optional
from ska_control_model import ObsState, ObsStateModel, PowerState
from ska_tango_base.long_running_commands import LRCReqType
from ska_tango_base.software_bus import AttrSignal, attribute_from_signal
from ska_tango_base.type_hints import DevVarLongStringArrayType
from tango import AttrWriteType
from tango.server import command
from ska_mid_cbf_mcs.base.base_device import CbfDevice
from ska_mid_cbf_mcs.base.obs.obs_state_machine import (
CbfSubElementObsStateMachine,
)
[docs]
class CbfObsDevice(CbfDevice):
"""
A generic base observing device for Mid.CBF.
Extends CbfDevice to implement key values for observing devices.
"""
# ----------------------------------------------------------------------- #
# Attributes #
# ----------------------------------------------------------------------- #
# --- Device Attributes & Signals --- #
obs_state_signal: AttrSignal[ObsState] = AttrSignal(
stored=True, initial_value=ObsState.IDLE
)
"""
Signal for the obsState attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
scan_id_signal: AttrSignal[int] = AttrSignal(stored=True, initial_value=0)
"""
Signal for the scanID attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
configuration_id_signal: AttrSignal[str] = AttrSignal(
stored=True, initial_value=""
)
"""
Signal for the configurationID attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
delay_model_signal: AttrSignal[str] = AttrSignal(
stored=True, initial_value=""
)
"""
Signal for the delayModel attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
last_scan_configuration_signal: AttrSignal[str] = AttrSignal(
stored=True, initial_value=""
)
"""
Signal for the lastScanConfiguration attribute.
Values are emitted for this signal whenever a client changes the attribute.
"""
obsState: attribute_from_signal = attribute_from_signal(
obs_state_signal,
access=AttrWriteType.READ,
dtype="DevEnum",
enum_labels=[state.name for state in ObsState],
description=(
"The observing state of the device. "
"Subarray overrides CbfObsDevice to implement ObsState.EMPTY."
),
)
"""obsState device attribute"""
scanID: attribute_from_signal = attribute_from_signal(
scan_id_signal,
access=AttrWriteType.READ,
dtype=int,
description="The current scan ID of the device.",
rel_change=0.1,
)
"""scanID device attribute"""
configurationID: attribute_from_signal = attribute_from_signal(
configuration_id_signal,
access=AttrWriteType.READ,
dtype=str,
description="The current configuration ID of the device.",
)
"""configurationID device attribute"""
delayModel: attribute_from_signal = attribute_from_signal(
delay_model_signal,
access=AttrWriteType.READ,
dtype=str,
description="The current delay model used by the device.",
)
"""delayModel device attribute"""
lastScanConfiguration: attribute_from_signal = attribute_from_signal(
last_scan_configuration_signal,
access=AttrWriteType.READ,
dtype=str,
description=(
"The last valid scan configuration of the device. "
"Deliberately left in Tango layer, outside of component manager."
),
)
"""lastScanConfiguration device attribute"""
# --- Device Commands --- #
configure_scan_name = "ConfigureScan"
scan_name = "Scan"
end_scan_name = "EndScan"
go_to_idle_name = "GoToIdle"
obs_reset_name = "ObsReset"
abort_name = "Abort"
# ----------------------------------------------------------------------- #
# Methods #
# ----------------------------------------------------------------------- #
# --- Device Commands --- #
[docs]
def check_obs_command_allowed(
self: CbfObsDevice,
command_name: str,
allowed_obs_states: list[ObsState],
) -> bool:
"""
Check if an observing command is allowed.
:param command_name: name of command to check
:param allowed_obs_states: list of allowed observing states
:return: True if allowed, else False.
"""
self.logger.info(f"Checking if {command_name} is allowed.")
if not self.component_manager.is_communicating:
return False
if self.obs_state_signal not in allowed_obs_states:
self.logger.warning(
f"{command_name} not allowed in {ObsState(self.obs_state_signal)}"
)
return False
return True
[docs]
def is_Scan_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if Scan is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
return self.check_obs_command_allowed(self.scan_name, [ObsState.READY])
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-csp-Scan schema",
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def Scan(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType:
"""
Start an observing scan.
:return: tuple containing a return code and a unique command identifier
"""
return self.submit_long_running_command(
command_name=self.scan_name,
task=self.component_manager.scan,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("scan_invoked"),
)
[docs]
def is_EndScan_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if EndScan is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
return self.check_obs_command_allowed(
self.end_scan_name, [ObsState.SCANNING]
)
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-csp-endscan schema",
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def EndScan(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType:
"""
End a running scan.
:return: tuple containing a return code and a unique command identifier
"""
return self.submit_long_running_command(
command_name=self.end_scan_name,
task=self.component_manager.end_scan,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("end_scan_invoked"),
)
[docs]
def is_GoToIdle_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if GoToIdle is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
return self.check_obs_command_allowed(
self.go_to_idle_name, [ObsState.READY]
)
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-gotoidle schema",
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def GoToIdle(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType:
"""
Transit the device from READY to IDLE obsState.
To keep in line with LMC, using "GoToIdle" rather than the SKA base class
equivalent "End".
:return: tuple containing a return code and a unique command identifier
"""
return self.submit_long_running_command(
command_name=self.go_to_idle_name,
task=self.component_manager.go_to_idle,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("end_invoked"),
)
[docs]
def is_ObsReset_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if ObsReset is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
# SKB-796: continue with command execution even when CommunicationStatus
# is not ESTABLISHED, so that the full command can be executed (even if only
# partially successful) in order to hit all obs state model triggers
self.logger.debug(f"Checking if {self.obs_reset_name} is allowed.")
if not self.component_manager.is_communicating:
self.logger.warning(
f"Continuing with {self.obs_reset_name} command."
)
if self.obs_state_signal not in [ObsState.ABORTED, ObsState.FAULT]:
self.logger.warning(
f"{self.obs_reset_name} not allowed in ObsState {self.obs_state_signal}"
)
return False
return True
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-obsreset schema",
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def ObsReset(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType:
"""
Reset the observing device from a FAULT/ABORTED obsState to IDLE.
:return: tuple containing a return code and a unique command identifier
"""
return self.submit_long_running_command(
command_name=self.obs_reset_name,
task=self.component_manager.obs_reset,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("obsreset_invoked"),
completed_callback=lambda: self.obs_state_action(
"obsreset_completed"
),
)
[docs]
def is_Abort_allowed(
self: CbfObsDevice, request_type: LRCReqType = LRCReqType.ENQUEUE_REQ
) -> bool:
"""
Check if Abort is allowed.
:param request_type: if LRCReqType.ENQUEUE_REQ, the task has only been
submitted to the LRC queue, and this method will always return True;
otherwise, task has been popped off the LRC queue, and the command
allowance can be checked.
:return: True if allowed, else False.
"""
if request_type is LRCReqType.ENQUEUE_REQ:
return True
self.logger.info(f"Checking if {self.abort_name} is allowed.")
# SKB-796: continue with command execution even when CommunicationStatus
# is not ESTABLISHED, so that the full command can be executed (even if only
# partially successful) in order to hit all obs state model triggers
if not self.component_manager.is_communicating:
self.logger.warning(f"Continuing with {self.abort_name} command.")
if self.obs_state_signal in [
ObsState.EMPTY,
ObsState.FAULT,
ObsState.ABORTED,
ObsState.ABORTING,
ObsState.RESTARTING,
]:
self.logger.warning(
f"{self.abort_name} not allowed in ObsState {self.obs_state_signal}"
)
return False
return True
[docs]
@command(
dtype_in="DevString",
doc_in="JSON string conforming to the ska-mid-cbf-abort schema",
dtype_out="DevVarLongStringArray",
doc_out=(
"A tuple containing a return code and a string message "
"indicating status. The message is for information purpose "
"only."
),
)
def Abort(self: CbfObsDevice, params: str) -> DevVarLongStringArrayType:
"""
Abort the current observing process and move to ABORTED obsState.
:return: tuple containing a return code and a unique command identifier
"""
return self.submit_long_running_command(
command_name=self.abort_name,
task=self.component_manager.abort,
kwargs={"params": params},
started_callback=lambda: self.obs_state_action("abort_invoked"),
completed_callback=lambda: self.obs_state_action(
"abort_completed"
),
)
# --- Callbacks --- #
[docs]
def obs_state_action(self: CbfObsDevice, action: str) -> None:
"""
Callback provided to command tracker to drive the obs state model into
transitioning states during the relevant command's submitted thread.
:param action: the observing state model action to perform
"""
self.obs_state_model.perform_action(action)
self.logger.info(
f"ObsState after {action}: {ObsState(self.obs_state_signal)}"
)
def _component_state_changed(
self: CbfObsDevice,
fault: Optional[bool] = None,
power: Optional[PowerState] = None,
resourced: Optional[bool] = None,
configured: Optional[bool] = None,
scanning: Optional[bool] = None,
obsfault: Optional[bool] = None,
) -> None:
super()._component_state_changed(fault=fault, power=power)
if resourced is not None:
if resourced:
self.obs_state_action("component_resourced")
else:
self.obs_state_action("component_unresourced")
if configured is not None:
if configured:
self.obs_state_action("component_configured")
else:
self.obs_state_action("component_unconfigured")
if scanning is not None:
if scanning:
self.obs_state_action("component_scanning")
else:
self.obs_state_action("component_not_scanning")
if obsfault is not None:
if obsfault:
self.obs_state_action("component_obsfault")
# NOTE: to recover from obsfault, ObsReset or Restart must be invoked
# --- Device Initialization --- #
def _init_state_model(self: CbfObsDevice) -> None:
"""Set up the state model for the device."""
super()._init_state_model()
# CbfObsDevice uses the reduced observing state machine defined above
self.obs_state_model = ObsStateModel(
logger=self.logger,
callback=lambda value: setattr(self, "obs_state_signal", value),
state_machine_factory=CbfSubElementObsStateMachine,
)
# --- Run Device Server --- #
def main(*args: str, **kwargs: str) -> int:
"""
Entry point for module.
:param args: positional arguments
:param kwargs: keyword arguments
:return: exit code
"""
raise TypeError(
"CbfObsDevice is an Abstract Class; device server cannot be run."
)
if __name__ == "__main__":
main()