# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from functools import partial
from queue import Full, Queue
from threading import Event, Thread
from typing import Any, Callable, List, Optional
import orjson
import tango
from ska_control_model import AdminMode, ObsMode, PowerState, TaskStatus
from ska_mid_cbf_common.events.event_handler import EventHandler
from ska_tango_base.commands import ResultCode
from ska_tango_testing import context
from ska_mid_cbf_mcs.base.component_manager import (
CbfComponentManager,
CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const
__all__ = ["FspComponentManager"]
[docs]
class FspComponentManager(CbfComponentManager):
"""
A component manager for the Fsp device.
"""
# --------------
# Initialization
# --------------
def __init__(
self: FspComponentManager,
*args: Any,
fsp_id: int,
all_fsp_corr_subarray_fqdn: List[str],
all_fsp_pst_subarray_fqdn: List[str],
fhs_fsp_mode_controller_fqdns: List[str],
fhs_fsp_corr_controller_fqdns: List[str],
**kwargs: Any,
) -> None:
"""
Initialise a new instance.
:param logger: a logger for this object to use
:param fsp_unit_fpga_idx: the Index of the FPGA in the FSP-Unit (1-8)
:param all_fsp_corr_subarray_fqdn: list of all fsp corr subarray fqdns
:param all_fsp_pst_subarray_fqdn: list of all fsp pst subarray fqdns
:param fhs_fsp_mode_controller_fqdns: FQDN of the FHS FSP controller device
"""
super().__init__(*args, **kwargs)
self._fsp_id = fsp_id
self._all_fsp_corr_subarray_fqdn = all_fsp_corr_subarray_fqdn
self._all_fsp_pst_subarray_fqdn = all_fsp_pst_subarray_fqdn
self._fhs_fsp_mode_controller_fqdns = fhs_fsp_mode_controller_fqdns
self._fhs_fsp_corr_controller_fqdns = fhs_fsp_corr_controller_fqdns
# Corner turner synchronization
self._has_initial_write_timestamp = {
fpga_id: 0 for fpga_id in range(1, const.NUM_FPGA_PER_FSP_UNIT + 1)
}
self._initial_write_timestamp_queue = Queue(
maxsize=const.NUM_FPGA_PER_FSP_UNIT * const.MAX_SUBARRAY
)
self._initial_write_timestamp_thread = None
# -------------
# Communication
# -------------
[docs]
def start_communicating(
self: FspComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
) -> None:
"""
Thread for start_communicating operation.
"""
for obs_mode, fqdn_list in (
(ObsMode.IMAGING.name, self._all_fsp_corr_subarray_fqdn),
(ObsMode.PULSAR_TIMING.name, self._all_fsp_pst_subarray_fqdn),
):
self.proxy_dict[obs_mode] = {}
for subarray_id, fqdn in enumerate(fqdn_list, start=1):
try:
self.proxy_dict[obs_mode][subarray_id] = (
context.DeviceProxy(fqdn)
)
except tango.DevFailed as dev_failed:
self.logger.error(
f"Failure in connecting to {fqdn}; {dev_failed}"
)
self._update_communication_state(
communication_state=CommunicationStatus.NOT_ESTABLISHED
)
return
for fhs_device_type, fqdn_list in (
(
const.FHS_FSP_MODE_CONTROLLER,
self._fhs_fsp_mode_controller_fqdns,
),
(
const.FHS_FSP_CORR_CONTROLLER,
self._fhs_fsp_corr_controller_fqdns,
),
):
self.proxy_dict[fhs_device_type] = {}
for fpga_id, fqdn in enumerate(fqdn_list, start=1):
proxy = context.DeviceProxy(device_name=fqdn)
self.proxy_dict[fhs_device_type][fpga_id] = proxy
self._event_handler.subscribe_events(
proxy=proxy,
change_event_callbacks={
"longRunningCommandResult": self.results_callback,
},
)
if fhs_device_type == const.FHS_FSP_CORR_CONTROLLER:
self._has_initial_write_timestamp[fpga_id] = 0
# Try to connect to FHS devices, which are deployed during the
# CbfController OnCommand sequence
# TODO CIP-3600
super().start_communicating(admin_mode=admin_mode)
self._update_component_state(power=PowerState.ON)
[docs]
def stop_communicating(
self: FspComponentManager,
*args: Any,
admin_mode: AdminMode,
**kwargs: Any,
):
"""
Thread for stop_communicating operation.
"""
# Halt all event monitoring
self._event_handler.unsubscribe_events()
super().stop_communicating(admin_mode=admin_mode)
# --------
# Commands
# --------
def _validate_update_obs_mode(
self: FspComponentManager, obs_mode_config: str
) -> tuple[Any]:
"""
Validate UpdateObsMode argument. Will succeed if FSP can add the provided
subarray ID to its membership and process the requested ObsMode.
Validation will fail if:
- UpdateObsMode argument is an invalid JSON-formatted string
- UpdateObsMode argument data cannot be parsed
- subarray_id value is outside the range [1, 16]
- obs_mode value is not supported
- ObsMode.IDLE was requested, but the provided subarray ID is not already
in the subarray membership
- obs_mode value is not either IDLE or the current ObsMode, as FSPs may
only transition to new observing modes from IDLE
:param obs_mode_config: UpdateObsMode JSON-formatted string argument
:return: If validation succeeds, return the subarray ID to be added and
the requested ObsMode; if validation fails, return None for the ID
and an error message instead of ObsMode
"""
try:
config = orjson.loads(obs_mode_config)
subarray_id = config["subarray_id"]
obs_mode = config["obs_mode"]
except (orjson.JSONDecodeError, KeyError) as exc:
message = f"{obs_mode_config} not a valid JSON; {exc}."
self.logger.error(message)
return (None, message)
# Validate subarray ID
if subarray_id not in range(1, const.MAX_SUBARRAY + 1):
message = f"Subarray {subarray_id} invalid; must be in range [1, {const.MAX_SUBARRAY}]"
self.logger.error(message)
return (None, message)
# Validate observing mode
# TODO: remove these conditions as new observing modes are implemented
if obs_mode not in ObsMode._member_names_ or obs_mode in [
"PULSAR_SEARCH",
"VLBI",
]:
message = f"FSP ObsMode {obs_mode} not currently supported."
self.logger.error(message)
return (None, message)
# Observing mode can only be changed to IDLE by member subarrays
if (
obs_mode == "IDLE"
and subarray_id not in self._device.subarray_membership_signal
):
message = f"Subarray {subarray_id} not in subarrayMembership, cannot set FSP to {obs_mode}."
self.logger.error(message)
return (None, message)
# Observing mode can only be changed in IDLE, or can remain unchanged
if obs_mode != "IDLE" and self._device.obs_mode_signal.name not in [
"IDLE",
obs_mode,
]:
message = f"FSP in {self._device.obs_mode_signal.name}, cannot be set to {obs_mode}."
self.logger.error(message)
return (None, message)
return (subarray_id, obs_mode)
[docs]
def initial_write_timestamp_callback(
self: FspComponentManager,
event_data: Optional[tango.EventData],
fhs_fsp_corr_id: int,
):
"""
Callback for FSPCorrController initialWriteTimestamp change events
:param event_data: the received change event data
:param fhs_fsp_corr_id: The ID of the FSPCorrController that sent the
initialWriteTimestamp change event
"""
# NOTE: event callbacks must be as short as possible
# Here we simply queue event data and handle possible exceptions
try:
self._initial_write_timestamp_queue.put_nowait(
(fhs_fsp_corr_id, event_data)
)
except Full:
# Start dropping data if queue is full
self._initial_write_timestamp_queue.get_nowait()
self._initial_write_timestamp_queue.task_done()
self._initial_write_timestamp_queue.put(event_data)
def _sync_initial_write_timestamp(
self: FspComponentManager,
) -> None:
"""
Thread to synchronize the HBM Corner Turner initialWriteTimestamp across
all FSPCorrController devices.
"""
with tango.EnsureOmniThread():
self.logger.info(
"Started processing initialWriteTimestamp events."
)
while True:
timestamp_data = self._initial_write_timestamp_queue.get()
self._initial_write_timestamp_queue.task_done()
# NOTE: this callback queues a tuple of arguments
if timestamp_data[1] is EventHandler.EXIT_CONDITION:
# This ends the thread
break
fhs_fsp_corr_id = timestamp_data[0]
timestamp_value = timestamp_data[1].attr_value.value
self._has_initial_write_timestamp[fhs_fsp_corr_id] = (
timestamp_value
)
# There is a valid timestamp as long as at least one value is non-zero
has_valid_timestamp = any(
self._has_initial_write_timestamp.values()
)
# If corner turners were not previously started and valid timestamp
# was received, synchronize the corner turners now.
if (
self._device.corner_turner_started_signal is False
and has_valid_timestamp is True
):
# NOTE: additional attribute checks for will not currently
# work with simulators as non-state-model attribute values
# are not updated in LRC simulation.
command_results = self.issue_group_command(
command_name="ConfigureCornerTurner",
proxies=self.proxy_dict[
const.FHS_FSP_CORR_CONTROLLER
].values(),
command_args=orjson.dumps(
{"initial_read_timestamp": timestamp_value}
).decode(),
check_on_fail={
"initialWriteTimestamp": lambda attr_val: attr_val
== timestamp_value,
},
)
# If synchronization was successful, indicate corner turner
# is started and update device attribute signal.
if all(
result_code == ResultCode.OK
for result_code in command_results.values()
):
self.logger.info(
"Corner turner synchronized successfully."
)
self._device.corner_turner_started_signal = True
# If corner turners were already started and valid timestamp was
# received, do nothing.
elif (
self._device.corner_turner_started_signal
and has_valid_timestamp is True
):
pass
# If corner turners were previously started and zero timestamp was
# received, update attribute signal to indicate corner turners have
# now stopped.
elif (
self._device.corner_turner_started_signal
and has_valid_timestamp is False
):
self.logger.info("Corner turner stopped.")
self._device.corner_turner_started_signal = False
else: # corner_turner_started is False and has_valid_timestamp is False
self.logger.warning(
f"Corner turner not started but invalid timestamp ({timestamp_value}) "
"received, it is likely the subscription has just been initialized "
"but possible an error has occurred."
)
# If thread is ending, FSP is transitioning to IDLE, so ensure corner
# turner is re-synchronized by setting attribute signal to False.
self._device.corner_turner_started_signal = False
self.logger.info(
"Stopped processing initialWriteTimestamp events; FSP reset to IDLE, "
"corner turner must be re-synchronized for subsequent scan configuration."
)
def _configure_fhs_obs_mode(
self: FspComponentManager, obs_mode: str
) -> bool:
"""
If configuring from IDLE to active observing mode, set the FHS devices to
the requested observing mode and perform the HBM corner turner synchronization.
If configuring from active observing mode to IDLE, set the FHS devices to
IDLE and reset corner turner synchronization thread for future configurations.
:param obs_mode: observing mode string (validated during update_obs_mode)
:return: True if FHS observing mode configuration was successful,
otherwise False
"""
# TODO: SetFspMode should be using ObsMode enum for argument
# NOTE: obsMode attribute check_on_fail will not work with simulators
# as non-state-model attribute values are not updated.
command_results = self.issue_group_command(
command_name="SetFspMode",
proxies=self.proxy_dict[const.FHS_FSP_MODE_CONTROLLER].values(),
command_args=ObsMode[obs_mode].value,
# check_on_fail=
)
if any(
result_code != ResultCode.OK
for result_code in command_results.values()
):
return False
# Initialize corner turner synchronization if going from IDLE to correlation.
if obs_mode == "IMAGING":
# Subscribe to initialWriteTimestamp change events
for fpga_id, fhs_proxy in self.proxy_dict[
const.FHS_FSP_CORR_CONTROLLER
].items():
self._event_handler.subscribe_events(
proxy=fhs_proxy,
change_event_callbacks={
"initialWriteTimestamp": partial(
self.initial_write_timestamp_callback,
fhs_fsp_corr_id=fpga_id,
)
},
)
# Start processing initialWriteTimestamp values.
if self._initial_write_timestamp_thread is None:
self._initial_write_timestamp_thread = Thread(
target=self._sync_initial_write_timestamp,
name="Fsp._sync_initial_write_timestamp",
daemon=True,
)
self._initial_write_timestamp_thread.start()
# Reset corner turner synchronization if going from correlation to IDLE.
elif obs_mode == "IDLE":
# Unsubscribe from initialWriteTimestamp change events
for fpga_id, fhs_proxy in self.proxy_dict[
const.FHS_FSP_CORR_CONTROLLER
].items():
self._event_handler.unsubscribe_events(
proxy=fhs_proxy,
attr_names=["initialWriteTimestamp"],
)
# Stop processing initialWriteTimestamp values.
if self._initial_write_timestamp_thread is not None:
self.initial_write_timestamp_callback(
event_data=EventHandler.EXIT_CONDITION, fhs_fsp_corr_id=0
)
self._initial_write_timestamp_thread.join()
self._initial_write_timestamp_thread = None
# Clear queue; thread may have exited before reaching EXIT_CONDITION
self._clear_queue(
self._initial_write_timestamp_queue,
"initialWriteTimestamp",
)
return True
[docs]
def update_obs_mode(
self: FspComponentManager,
obs_mode_config: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Used by subarray devices to update the observing mode and subarrayMembership
of this FSP.
If the requested ObsMode is not IDLE, observing mode will only be set if
this FSP is currently in ObsMode.IDLE or already in the requested ObsMode.
Subarray ID will only be added to subarrayMembership if the requesting
subarray has not already been added.
If IDLE is requested, the observing mode will only be set if this FSP does
not belong to any other subarrays.
Whether or not the observing mode is set to IDLE, the subarray ID will be
removed from subarrayMembership.
:param obs_mode_config: a JSON-formatted string containing a subarray ID
to be added to membership and the ObsMode of the requested configuration.
:param task_callback: Callback function to update task status
:param task_abort_event: Event to signal task abort.
"""
task_callback(status=TaskStatus.IN_PROGRESS)
if self.task_abort_event_is_set(
"UpdateObsMode", task_callback, task_abort_event
):
return
# Validate UpdateObsMode input argument
# _validate_update_obs_mode will return an error message instead of an
# ObsMode string if invalid
subarray_id, obs_mode = self._validate_update_obs_mode(obs_mode_config)
if subarray_id is None:
task_callback(
result=(
ResultCode.FAILED,
obs_mode,
),
status=TaskStatus.COMPLETED,
)
return
# Set up FHS FSP requested observing mode and corner turner synchronization.
if (
len(self._device.subarray_membership_signal) == 0
and obs_mode != "IDLE"
) or (
len(self._device.subarray_membership_signal) == 1
and obs_mode == "IDLE"
):
configure_fhs_success = self._configure_fhs_obs_mode(obs_mode)
if not configure_fhs_success:
task_callback(
result=(
ResultCode.FAILED,
f"Failed to configure FHS devices for ObsMode.{obs_mode}",
),
status=TaskStatus.COMPLETED,
)
return
# Updating to non-IDLE ObsMode
# TODO: observing modes other than correlation
if obs_mode != "IDLE":
# Set requested observing mode subarray device ONLINE if OFFLINE
obs_mode_proxy = self.proxy_dict[obs_mode][subarray_id]
try:
if obs_mode_proxy.adminMode == AdminMode.OFFLINE:
obs_mode_proxy.adminMode = AdminMode.ONLINE
except tango.DevFailed as dev_failed:
message = f"Failed to set {obs_mode_proxy.dev_name()} ONLINE; {dev_failed}."
self.logger.error(message)
task_callback(
result=(ResultCode.FAILED, message),
status=TaskStatus.COMPLETED,
)
return
# Update obsMode and subarrayMembership attribute signals
self._device.obs_mode_signal = ObsMode[obs_mode]
self._device.subarray_membership_signal = sorted(
[subarray_id] + self._device.subarray_membership_signal
)
self.logger.info(
f"ObsMode set to {obs_mode}; added subarray {subarray_id} to membership."
)
# Updating to ObsMode.IDLE
# Set requested observing mode subarray device OFFLINE if ONLINE
elif obs_mode == "IDLE":
proxy = self.proxy_dict[self._device.obs_mode_signal.name][
subarray_id
]
try:
if proxy.adminMode == AdminMode.ONLINE:
proxy.adminMode = AdminMode.OFFLINE
except tango.DevFailed as dev_failed:
message = (
f"Failed to set {proxy.dev_name()} OFFLINE; {dev_failed}."
)
self.logger.error(message)
task_callback(
result=(ResultCode.FAILED, message),
status=TaskStatus.COMPLETED,
)
return
# Update subarrayMembership attribute signal
updated_membership = set(self._device.subarray_membership_signal)
updated_membership.discard(subarray_id)
self._device.subarray_membership_signal = sorted(
updated_membership
)
self.logger.info(
f"Removed subarray {subarray_id} from membership."
)
# Can only be set to IDLE if not belonging to any other subarray
if len(self._device.subarray_membership_signal) == 0:
# Update obsMode attribute signal
self._device.obs_mode_signal = ObsMode.IDLE
for fhs_proxy in self.proxy_dict[
const.FHS_FSP_CORR_CONTROLLER
].values():
self._event_handler.unsubscribe_events(fhs_proxy)
self.logger.info(
"ObsMode set to IDLE, cleared out initialWriteTimestamp events."
)
task_callback(
result=(
ResultCode.OK,
"UpdateObsMode completed OK",
),
status=TaskStatus.COMPLETED,
)
return