Source code for ska_mid_cbf_mcs.fsp.fsp_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from functools import partial
from queue import Full, Queue
from threading import Event, Thread
from typing import Any, Callable, List, Optional

import orjson
import tango
from ska_control_model import AdminMode, ObsMode, PowerState, TaskStatus
from ska_mid_cbf_common.events.event_handler import EventHandler
from ska_tango_base.commands import ResultCode
from ska_tango_testing import context

from ska_mid_cbf_mcs.base.component_manager import (
    CbfComponentManager,
    CommunicationStatus,
)
from ska_mid_cbf_mcs.commons.global_enum import const

__all__ = ["FspComponentManager"]


[docs] class FspComponentManager(CbfComponentManager): """ A component manager for the Fsp device. """ # -------------- # Initialization # -------------- def __init__( self: FspComponentManager, *args: Any, fsp_id: int, all_fsp_corr_subarray_fqdn: List[str], all_fsp_pst_subarray_fqdn: List[str], fhs_fsp_mode_controller_fqdns: List[str], fhs_fsp_corr_controller_fqdns: List[str], **kwargs: Any, ) -> None: """ Initialise a new instance. :param logger: a logger for this object to use :param fsp_unit_fpga_idx: the Index of the FPGA in the FSP-Unit (1-8) :param all_fsp_corr_subarray_fqdn: list of all fsp corr subarray fqdns :param all_fsp_pst_subarray_fqdn: list of all fsp pst subarray fqdns :param fhs_fsp_mode_controller_fqdns: FQDN of the FHS FSP controller device """ super().__init__(*args, **kwargs) self._fsp_id = fsp_id self._all_fsp_corr_subarray_fqdn = all_fsp_corr_subarray_fqdn self._all_fsp_pst_subarray_fqdn = all_fsp_pst_subarray_fqdn self._fhs_fsp_mode_controller_fqdns = fhs_fsp_mode_controller_fqdns self._fhs_fsp_corr_controller_fqdns = fhs_fsp_corr_controller_fqdns # Corner turner synchronization self._has_initial_write_timestamp = { fpga_id: 0 for fpga_id in range(1, const.NUM_FPGA_PER_FSP_UNIT + 1) } self._initial_write_timestamp_queue = Queue( maxsize=const.NUM_FPGA_PER_FSP_UNIT * const.MAX_SUBARRAY ) self._initial_write_timestamp_thread = None # ------------- # Communication # -------------
[docs] def start_communicating( self: FspComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for start_communicating operation. """ for obs_mode, fqdn_list in ( (ObsMode.IMAGING.name, self._all_fsp_corr_subarray_fqdn), (ObsMode.PULSAR_TIMING.name, self._all_fsp_pst_subarray_fqdn), ): self.proxy_dict[obs_mode] = {} for subarray_id, fqdn in enumerate(fqdn_list, start=1): try: self.proxy_dict[obs_mode][subarray_id] = ( context.DeviceProxy(fqdn) ) except tango.DevFailed as dev_failed: self.logger.error( f"Failure in connecting to {fqdn}; {dev_failed}" ) self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return for fhs_device_type, fqdn_list in ( ( const.FHS_FSP_MODE_CONTROLLER, self._fhs_fsp_mode_controller_fqdns, ), ( const.FHS_FSP_CORR_CONTROLLER, self._fhs_fsp_corr_controller_fqdns, ), ): self.proxy_dict[fhs_device_type] = {} for fpga_id, fqdn in enumerate(fqdn_list, start=1): proxy = context.DeviceProxy(device_name=fqdn) self.proxy_dict[fhs_device_type][fpga_id] = proxy self._event_handler.subscribe_events( proxy=proxy, change_event_callbacks={ "longRunningCommandResult": self.results_callback, }, ) if fhs_device_type == const.FHS_FSP_CORR_CONTROLLER: self._has_initial_write_timestamp[fpga_id] = 0 # Try to connect to FHS devices, which are deployed during the # CbfController OnCommand sequence # TODO CIP-3600 super().start_communicating(admin_mode=admin_mode) self._update_component_state(power=PowerState.ON)
[docs] def stop_communicating( self: FspComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ): """ Thread for stop_communicating operation. """ # Halt all event monitoring self._event_handler.unsubscribe_events() super().stop_communicating(admin_mode=admin_mode)
# -------- # Commands # -------- def _validate_update_obs_mode( self: FspComponentManager, obs_mode_config: str ) -> tuple[Any]: """ Validate UpdateObsMode argument. Will succeed if FSP can add the provided subarray ID to its membership and process the requested ObsMode. Validation will fail if: - UpdateObsMode argument is an invalid JSON-formatted string - UpdateObsMode argument data cannot be parsed - subarray_id value is outside the range [1, 16] - obs_mode value is not supported - ObsMode.IDLE was requested, but the provided subarray ID is not already in the subarray membership - obs_mode value is not either IDLE or the current ObsMode, as FSPs may only transition to new observing modes from IDLE :param obs_mode_config: UpdateObsMode JSON-formatted string argument :return: If validation succeeds, return the subarray ID to be added and the requested ObsMode; if validation fails, return None for the ID and an error message instead of ObsMode """ try: config = orjson.loads(obs_mode_config) subarray_id = config["subarray_id"] obs_mode = config["obs_mode"] except (orjson.JSONDecodeError, KeyError) as exc: message = f"{obs_mode_config} not a valid JSON; {exc}." self.logger.error(message) return (None, message) # Validate subarray ID if subarray_id not in range(1, const.MAX_SUBARRAY + 1): message = f"Subarray {subarray_id} invalid; must be in range [1, {const.MAX_SUBARRAY}]" self.logger.error(message) return (None, message) # Validate observing mode # TODO: remove these conditions as new observing modes are implemented if obs_mode not in ObsMode._member_names_ or obs_mode in [ "PULSAR_SEARCH", "VLBI", ]: message = f"FSP ObsMode {obs_mode} not currently supported." self.logger.error(message) return (None, message) # Observing mode can only be changed to IDLE by member subarrays if ( obs_mode == "IDLE" and subarray_id not in self._device.subarray_membership_signal ): message = f"Subarray {subarray_id} not in subarrayMembership, cannot set FSP to {obs_mode}." self.logger.error(message) return (None, message) # Observing mode can only be changed in IDLE, or can remain unchanged if obs_mode != "IDLE" and self._device.obs_mode_signal.name not in [ "IDLE", obs_mode, ]: message = f"FSP in {self._device.obs_mode_signal.name}, cannot be set to {obs_mode}." self.logger.error(message) return (None, message) return (subarray_id, obs_mode)
[docs] def initial_write_timestamp_callback( self: FspComponentManager, event_data: Optional[tango.EventData], fhs_fsp_corr_id: int, ): """ Callback for FSPCorrController initialWriteTimestamp change events :param event_data: the received change event data :param fhs_fsp_corr_id: The ID of the FSPCorrController that sent the initialWriteTimestamp change event """ # NOTE: event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._initial_write_timestamp_queue.put_nowait( (fhs_fsp_corr_id, event_data) ) except Full: # Start dropping data if queue is full self._initial_write_timestamp_queue.get_nowait() self._initial_write_timestamp_queue.task_done() self._initial_write_timestamp_queue.put(event_data)
def _sync_initial_write_timestamp( self: FspComponentManager, ) -> None: """ Thread to synchronize the HBM Corner Turner initialWriteTimestamp across all FSPCorrController devices. """ with tango.EnsureOmniThread(): self.logger.info( "Started processing initialWriteTimestamp events." ) while True: timestamp_data = self._initial_write_timestamp_queue.get() self._initial_write_timestamp_queue.task_done() # NOTE: this callback queues a tuple of arguments if timestamp_data[1] is EventHandler.EXIT_CONDITION: # This ends the thread break fhs_fsp_corr_id = timestamp_data[0] timestamp_value = timestamp_data[1].attr_value.value self._has_initial_write_timestamp[fhs_fsp_corr_id] = ( timestamp_value ) # There is a valid timestamp as long as at least one value is non-zero has_valid_timestamp = any( self._has_initial_write_timestamp.values() ) # If corner turners were not previously started and valid timestamp # was received, synchronize the corner turners now. if ( self._device.corner_turner_started_signal is False and has_valid_timestamp is True ): # NOTE: additional attribute checks for will not currently # work with simulators as non-state-model attribute values # are not updated in LRC simulation. command_results = self.issue_group_command( command_name="ConfigureCornerTurner", proxies=self.proxy_dict[ const.FHS_FSP_CORR_CONTROLLER ].values(), command_args=orjson.dumps( {"initial_read_timestamp": timestamp_value} ).decode(), check_on_fail={ "initialWriteTimestamp": lambda attr_val: attr_val == timestamp_value, }, ) # If synchronization was successful, indicate corner turner # is started and update device attribute signal. if all( result_code == ResultCode.OK for result_code in command_results.values() ): self.logger.info( "Corner turner synchronized successfully." ) self._device.corner_turner_started_signal = True # If corner turners were already started and valid timestamp was # received, do nothing. elif ( self._device.corner_turner_started_signal and has_valid_timestamp is True ): pass # If corner turners were previously started and zero timestamp was # received, update attribute signal to indicate corner turners have # now stopped. elif ( self._device.corner_turner_started_signal and has_valid_timestamp is False ): self.logger.info("Corner turner stopped.") self._device.corner_turner_started_signal = False else: # corner_turner_started is False and has_valid_timestamp is False self.logger.warning( f"Corner turner not started but invalid timestamp ({timestamp_value}) " "received, it is likely the subscription has just been initialized " "but possible an error has occurred." ) # If thread is ending, FSP is transitioning to IDLE, so ensure corner # turner is re-synchronized by setting attribute signal to False. self._device.corner_turner_started_signal = False self.logger.info( "Stopped processing initialWriteTimestamp events; FSP reset to IDLE, " "corner turner must be re-synchronized for subsequent scan configuration." ) def _configure_fhs_obs_mode( self: FspComponentManager, obs_mode: str ) -> bool: """ If configuring from IDLE to active observing mode, set the FHS devices to the requested observing mode and perform the HBM corner turner synchronization. If configuring from active observing mode to IDLE, set the FHS devices to IDLE and reset corner turner synchronization thread for future configurations. :param obs_mode: observing mode string (validated during update_obs_mode) :return: True if FHS observing mode configuration was successful, otherwise False """ # TODO: SetFspMode should be using ObsMode enum for argument # NOTE: obsMode attribute check_on_fail will not work with simulators # as non-state-model attribute values are not updated. command_results = self.issue_group_command( command_name="SetFspMode", proxies=self.proxy_dict[const.FHS_FSP_MODE_CONTROLLER].values(), command_args=ObsMode[obs_mode].value, # check_on_fail= ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): return False # Initialize corner turner synchronization if going from IDLE to correlation. if obs_mode == "IMAGING": # Subscribe to initialWriteTimestamp change events for fpga_id, fhs_proxy in self.proxy_dict[ const.FHS_FSP_CORR_CONTROLLER ].items(): self._event_handler.subscribe_events( proxy=fhs_proxy, change_event_callbacks={ "initialWriteTimestamp": partial( self.initial_write_timestamp_callback, fhs_fsp_corr_id=fpga_id, ) }, ) # Start processing initialWriteTimestamp values. if self._initial_write_timestamp_thread is None: self._initial_write_timestamp_thread = Thread( target=self._sync_initial_write_timestamp, name="Fsp._sync_initial_write_timestamp", daemon=True, ) self._initial_write_timestamp_thread.start() # Reset corner turner synchronization if going from correlation to IDLE. elif obs_mode == "IDLE": # Unsubscribe from initialWriteTimestamp change events for fpga_id, fhs_proxy in self.proxy_dict[ const.FHS_FSP_CORR_CONTROLLER ].items(): self._event_handler.unsubscribe_events( proxy=fhs_proxy, attr_names=["initialWriteTimestamp"], ) # Stop processing initialWriteTimestamp values. if self._initial_write_timestamp_thread is not None: self.initial_write_timestamp_callback( event_data=EventHandler.EXIT_CONDITION, fhs_fsp_corr_id=0 ) self._initial_write_timestamp_thread.join() self._initial_write_timestamp_thread = None # Clear queue; thread may have exited before reaching EXIT_CONDITION self._clear_queue( self._initial_write_timestamp_queue, "initialWriteTimestamp", ) return True
[docs] def update_obs_mode( self: FspComponentManager, obs_mode_config: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Used by subarray devices to update the observing mode and subarrayMembership of this FSP. If the requested ObsMode is not IDLE, observing mode will only be set if this FSP is currently in ObsMode.IDLE or already in the requested ObsMode. Subarray ID will only be added to subarrayMembership if the requesting subarray has not already been added. If IDLE is requested, the observing mode will only be set if this FSP does not belong to any other subarrays. Whether or not the observing mode is set to IDLE, the subarray ID will be removed from subarrayMembership. :param obs_mode_config: a JSON-formatted string containing a subarray ID to be added to membership and the ObsMode of the requested configuration. :param task_callback: Callback function to update task status :param task_abort_event: Event to signal task abort. """ task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "UpdateObsMode", task_callback, task_abort_event ): return # Validate UpdateObsMode input argument # _validate_update_obs_mode will return an error message instead of an # ObsMode string if invalid subarray_id, obs_mode = self._validate_update_obs_mode(obs_mode_config) if subarray_id is None: task_callback( result=( ResultCode.FAILED, obs_mode, ), status=TaskStatus.COMPLETED, ) return # Set up FHS FSP requested observing mode and corner turner synchronization. if ( len(self._device.subarray_membership_signal) == 0 and obs_mode != "IDLE" ) or ( len(self._device.subarray_membership_signal) == 1 and obs_mode == "IDLE" ): configure_fhs_success = self._configure_fhs_obs_mode(obs_mode) if not configure_fhs_success: task_callback( result=( ResultCode.FAILED, f"Failed to configure FHS devices for ObsMode.{obs_mode}", ), status=TaskStatus.COMPLETED, ) return # Updating to non-IDLE ObsMode # TODO: observing modes other than correlation if obs_mode != "IDLE": # Set requested observing mode subarray device ONLINE if OFFLINE obs_mode_proxy = self.proxy_dict[obs_mode][subarray_id] try: if obs_mode_proxy.adminMode == AdminMode.OFFLINE: obs_mode_proxy.adminMode = AdminMode.ONLINE except tango.DevFailed as dev_failed: message = f"Failed to set {obs_mode_proxy.dev_name()} ONLINE; {dev_failed}." self.logger.error(message) task_callback( result=(ResultCode.FAILED, message), status=TaskStatus.COMPLETED, ) return # Update obsMode and subarrayMembership attribute signals self._device.obs_mode_signal = ObsMode[obs_mode] self._device.subarray_membership_signal = sorted( [subarray_id] + self._device.subarray_membership_signal ) self.logger.info( f"ObsMode set to {obs_mode}; added subarray {subarray_id} to membership." ) # Updating to ObsMode.IDLE # Set requested observing mode subarray device OFFLINE if ONLINE elif obs_mode == "IDLE": proxy = self.proxy_dict[self._device.obs_mode_signal.name][ subarray_id ] try: if proxy.adminMode == AdminMode.ONLINE: proxy.adminMode = AdminMode.OFFLINE except tango.DevFailed as dev_failed: message = ( f"Failed to set {proxy.dev_name()} OFFLINE; {dev_failed}." ) self.logger.error(message) task_callback( result=(ResultCode.FAILED, message), status=TaskStatus.COMPLETED, ) return # Update subarrayMembership attribute signal updated_membership = set(self._device.subarray_membership_signal) updated_membership.discard(subarray_id) self._device.subarray_membership_signal = sorted( updated_membership ) self.logger.info( f"Removed subarray {subarray_id} from membership." ) # Can only be set to IDLE if not belonging to any other subarray if len(self._device.subarray_membership_signal) == 0: # Update obsMode attribute signal self._device.obs_mode_signal = ObsMode.IDLE for fhs_proxy in self.proxy_dict[ const.FHS_FSP_CORR_CONTROLLER ].values(): self._event_handler.unsubscribe_events(fhs_proxy) self.logger.info( "ObsMode set to IDLE, cleared out initialWriteTimestamp events." ) task_callback( result=( ResultCode.OK, "UpdateObsMode completed OK", ), status=TaskStatus.COMPLETED, ) return