Source code for ska_mid_cbf_mcs.subarray.subarray_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

import os
from collections.abc import Iterable
from itertools import chain
from queue import Full, Queue
from threading import Event, Lock, Thread
from typing import Any, Callable, Dict, Generator, List, Optional

import orjson
import tango
from ska_control_model import (
    AdminMode,
    CommunicationStatus,
    ObsMode,
    ObsState,
    PowerState,
    ResultCode,
    TaskStatus,
)
from ska_mid_cbf_common.events.event_handler import EventHandler
from ska_schemas.schema import validate as schema_validate
from ska_tango_testing import context

from ska_mid_cbf_mcs.base.obs.obs_component_manager import (
    CbfObsComponentManager,
)
from ska_mid_cbf_mcs.commons.dish_utils import DISHUtils
from ska_mid_cbf_mcs.commons.global_enum import (
    const,
    freq_band_dict,
    obs_mode_key_to_name,
    param_keys,
    supported_values,
)
from ska_mid_cbf_mcs.commons.validate_interface import (
    CBF_ABORT_VER1_0,
    CBF_GOTOIDLE_VER1_0,
    CBF_OBSRESET_VER1_0,
    CBF_RESTART_VER1_0,
    validate_interface,
)
from ska_mid_cbf_mcs.commons.vcc_unit_utils import VCCUnitUtil
from ska_mid_cbf_mcs.subarray.fsp_configuration.generator import fsp_config_gen
from ska_mid_cbf_mcs.subarray.scan_configuration_validator.validator import (
    SubarrayScanConfigurationValidator,
)
from ska_mid_cbf_mcs.subarray.vcc_all_bands_configuration.generator import (
    vcc_config_gen,
)

__all__ = ["CbfSubarrayComponentManager"]

COMMAND_ASSIGN_RESOURCES = "AssignResources"
COMMAND_RELEASE_RESOURCES = "ReleaseResources"
COMMAND_RELEASE_ALL_RESOURCES = "ReleaseAllResources"
COMMAND_RESTART = "Restart"


[docs] class CbfSubarrayComponentManager(CbfObsComponentManager): """ A component manager for the CbfSubarray device. """ def __init__( self: CbfSubarrayComponentManager, *args: Any, subarray_id: int, controller: str, vcc_all_bands: List[str], fsp: List[str], fsp_corr_sub: List[str], **kwargs: Any, ) -> None: """ Initialise a new instance. :param subarray_id: ID of subarray :param controller: FQDN of controller device :param vcc_all_bands: FQDNs of subordinate VCCAllBands devices :param fsp: FQDNs of subordinate FSP devices :param fsp_corr_sub: FQDNs of subordinate FSP CORR subarray devices """ super().__init__(*args, **kwargs) # Configuration management self._assigned_dish_ids = set() self._subarray_id = subarray_id self._dish_utils = None self._vcc_unit_util = None # Device FQDNs self._fqdn_controller = controller self._fqdn_vcc_all_bands = vcc_all_bands self._fqdn_fsp = fsp self._fqdn_fsp_corr = fsp_corr_sub # Delay model management # TODO: maxsize could be just 1 or 2? self._delay_model_queue = Queue(maxsize=64) self._delay_model_thread: Thread = None self._fsp_subarray_scan_start_time_events = {} self._scan_start_time_rounded_queue = Queue( maxsize=const.MAX_FSP * const.MAX_SUBARRAY ) self._scan_start_time_rounded_thread = None # for easy device-reference self._rfi_flagging_mask = {} self._frequency_band_offset_stream1 = 0 self._frequency_band_offset_stream2 = 0 self._stream_tuning = [0, 0] # DeviceProxy management self._assigned_vcc_ids = set() self._configured_vcc_ids = set() self._assigned_fsp_ids = set() self._configured_fsp_mode_ids = { ObsMode.IMAGING.name: set(), ObsMode.PULSAR_TIMING.name: set(), ObsMode.PULSAR_SEARCH.name: set(), } # Mutex Lock for _scan_started, to sync FSP Output Time self._scan_started_lock = Lock() self._scan_started = False # ------------------ # Mutex Lock Setters # ------------------ def _update_scan_started(self: CbfSubarrayComponentManager, value: bool): """ Sets the _scan_started flag while invoking a mutex lock :param value: The boolean value to set _scan_started """ with self._scan_started_lock: self._scan_started = value def _read_scan_started(self: CbfSubarrayComponentManager) -> bool: """ Reads the _scan_started flag while invoking a mutex lock :param value: The boolean value to set _scan_started """ with self._scan_started_lock: return self._scan_started # ------------- # Communication # ------------- def _init_proxies(self: CbfSubarrayComponentManager) -> bool: """ Initialize proxies to FSP and VCC subelements :return: True if proxy initialization succeed, otherwise False """ self.proxy_dict = { const.CONTROLLER: None, const.VCC_ALL_BANDS: {}, const.FSP: {}, ObsMode.IMAGING.name: {}, ObsMode.PULSAR_TIMING.name: {}, ObsMode.PULSAR_SEARCH.name: {}, } try: self.proxy_dict[const.CONTROLLER] = context.DeviceProxy( device_name=self._fqdn_controller, ) for vcc_id, fqdn in enumerate(self._fqdn_vcc_all_bands, 1): self.proxy_dict[const.VCC_ALL_BANDS][vcc_id] = ( context.DeviceProxy( device_name=fqdn, ) ) for fsp_id, (fsp_fqdn, fsp_corr_fqdn) in enumerate( zip(self._fqdn_fsp, self._fqdn_fsp_corr), 1 ): self.proxy_dict[const.FSP][fsp_id] = context.DeviceProxy( device_name=fsp_fqdn ) self.proxy_dict[ObsMode.IMAGING.name][fsp_id] = ( context.DeviceProxy(device_name=fsp_corr_fqdn) ) except tango.DevFailed as dev_failed: self.logger.error(f"{dev_failed}") return False return True def _get_vcc_unit_config(self: CbfSubarrayComponentManager) -> bool: """ Retrieve vcc unit configuration data from the CBF Controller """ try: # get_property returns a dict of requested properties vcc_id_to_vcc_unit_id_mapping = self.proxy_dict[ const.CONTROLLER ].get_property("VCCIDToVCCUnitIDMapping")[ "VCCIDToVCCUnitIDMapping" ] except tango.DevFailed as dev_failed: self.logger.error( f"failed to get VCC ID to VCC Unit ID mapping from the controller; {dev_failed}" ) return False self._vcc_unit_util = VCCUnitUtil(vcc_id_to_vcc_unit_id_mapping) return True
[docs] def start_communicating( self: CbfSubarrayComponentManager, *args: Any, admin_mode: AdminMode, **kwargs: Any, ) -> None: """ Thread for start_communicating operation. """ success = self._init_proxies() if not success: self.logger.error("Failed to initialize proxies.") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return success = self._get_vcc_unit_config() if not success: self.logger.error("Failed to retrieve vcc unit configuration") self._update_communication_state( communication_state=CommunicationStatus.NOT_ESTABLISHED ) return super().start_communicating(admin_mode=admin_mode) self._update_component_state(power=PowerState.ON)
# -------------------- # Delay Model Handling # --------------------
[docs] def delay_model_event_callback( self: CbfSubarrayComponentManager, event_data: tango.EventData ) -> None: """ " Callback for delayModel change event subscription. :param event_data: the received change event data """ # NOTE (CIP-3559): event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._delay_model_queue.put_nowait(event_data) except Full: # Start dropping data if queue is full self._delay_model_queue.get_nowait() self._delay_model_queue.task_done() self._delay_model_queue.put(event_data)
def _update_delay_model(self: CbfSubarrayComponentManager) -> None: """ Update FSP delay models. This method is always started in a separate thread. """ # Indicate that this thread performs Tango operations with tango.EnsureOmniThread(): self.logger.info("Starting delay model processing thread.") while True: event_data = self._delay_model_queue.get() self._delay_model_queue.task_done() if event_data is EventHandler.EXIT_CONDITION: # This ends the thread break # No need to check event data validity as this is done in EventTracer, # just check if subarray is in the proper state to ingest the data. model = event_data.attr_value.value current_model = str(self._device.delay_model_signal) if ( not self.is_communicating or model == "" or model == current_model or self._device.obs_state_signal not in [ ObsState.CONFIGURING, ObsState.READY, ObsState.SCANNING, ] ): self.logger.warning(f"Ignoring delay data; {model}") continue # Validate delay model against telmodel schema try: delay_model_json = orjson.loads(model) self.logger.debug( f"Attempting to validate the following delay model JSON against the telescope model: {delay_model_json}" ) schema_validate( version=delay_model_json["interface"], config=delay_model_json, strictness=1, ) self.logger.debug("Delay model is valid!") except (orjson.JSONDecodeError, ValueError) as error: self.logger.error( f"Delay model JSON validation against the telescope model schema failed, ignoring delay model; {error}." ) continue # Validate start_validity_sec current_start_validity_sec = orjson.loads(current_model)[ "start_validity_sec" ] new_start_validity_sec = delay_model_json["start_validity_sec"] if new_start_validity_sec < current_start_validity_sec: self.logger.warning( f"Ignoring delay model; start_validity_sec ({new_start_validity_sec}) is earlier than current delay model ({current_start_validity_sec})." ) continue # Validate DISH IDs, then convert them to VCC ID integers for FSPs valid_dish_ids = True for delay_detail in delay_model_json["receptor_delays"]: dish_id = delay_detail["receptor"] if dish_id not in self._assigned_dish_ids: self.logger.warning( f"Delay model contains data for DISH ID {dish_id} not belonging to this subarray, ignoring delay model." ) valid_dish_ids = False break delay_detail["receptor"] = ( self._dish_utils.dish_id_to_vcc_id[dish_id] ) # If valid, issue delay model to assigned resources if valid_dish_ids: self.logger.info( f"Updating delay model; {delay_model_json}" ) # We don't check result codes as we simply log a warning in # issue_group_command and then continue if there is a failure # in updating a delay model delay_model_arg = orjson.dumps(delay_model_json).decode() self.issue_group_command( command_name="UpdateDelayModel", command_args=delay_model_arg, proxies=self.get_configured_fsp_mode_proxies( ObsMode.IMAGING ), is_lrc=False, # UpdateDelayModel is a fast command check_on_fail={ "delayModel": lambda attr_val: attr_val == delay_model_arg }, ) # Update delayModel attribute signal self._device.delay_model_signal = model # ------------------- # Resourcing Commands # ------------------- @property def assigned_fsp_proxies( self: CbfSubarrayComponentManager, ) -> Generator[tango.DeviceProxy]: """Return an iterable containing all assigned Fsp device proxies.""" return ( self.proxy_dict[const.FSP][fsp_id] for fsp_id in self._assigned_fsp_ids )
[docs] def get_configured_fsp_mode_proxies( self: CbfSubarrayComponentManager, obs_mode: ObsMode ) -> Generator[tango.DeviceProxy]: """Return an iterable containing all configured device proxies for a given ObsMode.""" return ( self.proxy_dict[obs_mode.name][fsp_id] for fsp_id in self._configured_fsp_mode_ids[obs_mode.name] )
@property def configured_fsp_mode_proxies( self: CbfSubarrayComponentManager, ) -> Generator[tango.DeviceProxy]: """Return an iterable containing all configured FSP ObsMode subarray device proxies.""" return chain( self.get_configured_fsp_mode_proxies(ObsMode.IMAGING), self.get_configured_fsp_mode_proxies(ObsMode.PULSAR_TIMING), self.get_configured_fsp_mode_proxies(ObsMode.PULSAR_SEARCH), ) @property def configured_vcc_proxies( self: CbfSubarrayComponentManager, ) -> Generator[tango.DeviceProxy]: """Return a list of proxies to all currently configured VCCAllBandsController devices.""" return ( self.proxy_dict[const.VCC_ALL_BANDS][vcc_id] for vcc_id in self._configured_vcc_ids ) @property def configured_obs_proxies( self: CbfSubarrayComponentManager, ) -> Generator[tango.DeviceProxy]: """Return a list of proxies to all currently assigned observing devices.""" return chain( self.configured_vcc_proxies, self.configured_fsp_mode_proxies ) # --- AssignResources Command --- # def _fqdn_contains_vcc_id( self: CbfSubarrayComponentManager, fqdn: str, vcc_id: int ): return vcc_id == int(fqdn.split("/")[2])
[docs] def assign_vcc( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Assign resources to subarray. :param params: The JSON object containing a list of DISH (receptor) IDs to be assigned :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( COMMAND_ASSIGN_RESOURCES, task_callback, task_abort_event ): return # Validate Interface valid, param_dict = self._handle_command_validation( COMMAND_ASSIGN_RESOURCES, params, task_callback ) if not valid: return self._set_transaction_id(param_dict) self._log_transaction_id(COMMAND_ASSIGN_RESOURCES, param_dict) if self._dish_utils is None: try: self._dish_utils = DISHUtils( orjson.loads(self._device.sys_param_signal) ) except (KeyError, orjson.JSONDecodeError) as exc: self.logger.error( f"Failed to update DISH ID to VCC ID and frequency offset k mapping, " f"current system parameters: {self._device.sys_param_signal}; {exc}" ) task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to initialize system parameters.", ), ) return # Build a dict of VCCs to assign. vcc_to_assign = {} for dish_id in param_dict[param_keys.RECEPTOR_IDS]: self.logger.debug(f"Attempting to assign DISH ID {dish_id}") try: if dish_id in self._assigned_dish_ids: raise KeyError("already assigned to this subarray.") vcc_id = self._dish_utils.dish_id_to_vcc_id[dish_id] if 0 >= vcc_id > len(self._fqdn_vcc_all_bands): raise KeyError( f"VCC {vcc_id} not in current capabilities." ) # Select VCC proxy to assign, subscribe to LRC results vcc_proxy = self.proxy_dict[const.VCC_ALL_BANDS][vcc_id] vcc_to_assign[vcc_proxy.dev_name()] = { "dish_id": dish_id, "vcc_id": vcc_id, } self._event_handler.subscribe_events( proxy=vcc_proxy, change_event_callbacks={ "longRunningCommandResult": self.results_callback, }, ) except (KeyError, tango.DevFailed) as error: self.logger.warning( f"Failed to assign DISH ID {dish_id}; {error}" ) continue # First check to see if any valid data added to vcc_to_assign. if len(vcc_to_assign) == 0: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to assign any DISH IDs.", ), ) return # UpdateSubarrayMembership allowed to partially succeed to attempt # resource assignment on best-effort basis. command_results = self.issue_group_command( command_name="UpdateSubarrayMembership", command_args=self._subarray_id, proxies=( self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]] for vcc in vcc_to_assign.values() ), partial_success=True, check_on_fail={ "subarrayID": lambda attr_val: attr_val == self._subarray_id, "obsState": lambda attr_val: attr_val == ObsState.IDLE, }, ) for dev_name, result_code in command_results.items(): if result_code != ResultCode.OK: vcc = vcc_to_assign.pop(dev_name) self._event_handler.unsubscribe_events( proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]] ) self.logger.warning( f"Failed to assign DISH ID {vcc['dish_id']}" ) # Second check to see if any values in vcc_to_assign remain after # checking command results. if len(vcc_to_assign) == 0: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to assign any DISH IDs.", ), ) return # If there are VCCs to assign, Update obsState machine via callback if previously # unresourced. if len(self._assigned_dish_ids) == 0: self._update_component_state(resourced=True) # Update instance attributes and device attribute signals dish_ids = sorted(vcc["dish_id"] for vcc in vcc_to_assign.values()) self._assigned_dish_ids.update(dish_ids) self._device.dish_ids_signal = sorted(self._assigned_dish_ids) self._assigned_vcc_ids.update( vcc["vcc_id"] for vcc in vcc_to_assign.values() ) self._device.vcc_ids_signal = sorted(self._assigned_vcc_ids) self.logger.info( f"DISH IDs after assignment: {self._assigned_dish_ids}" ) task_callback( result=( ResultCode.OK, ( "AssignResources completed OK; successfully assigned " + ", ".join(dish_ids) ), ), status=TaskStatus.COMPLETED, ) return
# --- ReleaseResources Command --- # def _release_vcc_resources( self: CbfSubarrayComponentManager, dish_ids: Iterable[str] ) -> List[str]: """ Main loop for use in releasing VCC resources, shared between resource-releasing commands and Restart command :param dish_ids: list of DISH IDs :return: list of successfully removed DISH IDs """ # Build a dict of VCCs to release. vcc_to_release = {} for dish_id in dish_ids: self.logger.debug(f"Attempting to remove {dish_id}") if dish_id not in self._assigned_dish_ids: self.logger.warning( f"Skipping receptor {dish_id} as it is not currently assigned to this subarray." ) continue vcc_id = self._dish_utils.dish_id_to_vcc_id[dish_id] vcc_to_release[ self.proxy_dict[const.VCC_ALL_BANDS][vcc_id].dev_name() ] = { "dish_id": dish_id, "vcc_id": vcc_id, } # First check to see if any valid data added to vcc_to_release. if len(vcc_to_release) == 0: self.logger.error("Failed to release any DISH IDs.") return [] # UpdateSubarrayMembership allowed to partially succeed to attempt # resource release on best-effort basis. command_results = self.issue_group_command( command_name="UpdateSubarrayMembership", command_args=0, proxies=( self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]] for vcc in vcc_to_release.values() ), partial_success=True, check_on_fail={ "subarrayID": lambda attr_val: attr_val == 0, "obsState": lambda attr_val: attr_val == ObsState.IDLE, }, ) for dev_name, result_code in command_results.items(): if result_code != ResultCode.OK: vcc = vcc_to_release.pop(dev_name) self.logger.warning( f"Failed to release DISH ID {vcc['dish_id']}" ) # Second check to see if any values in vcc_to_release remain after # checking command results. if len(vcc_to_release) == 0: self.logger.error("Failed to release any DISH IDs.") return [] # Unsubscribe from VCC LRC results. for vcc in vcc_to_release.values(): self._event_handler.unsubscribe_events( proxy=self.proxy_dict[const.VCC_ALL_BANDS][vcc["vcc_id"]] ) # Update instance attributes and device attribute signals dish_ids = sorted(vcc["dish_id"] for vcc in vcc_to_release.values()) self._assigned_dish_ids.difference_update(dish_ids) self._device.dish_ids_signal = sorted(self._assigned_dish_ids) self._assigned_vcc_ids.difference_update( vcc["vcc_id"] for vcc in vcc_to_release.values() ) self._device.vcc_ids_signal = sorted(self._assigned_vcc_ids) # Update ObsState machine via callback and reset system parameters if now unresourced if len(self._assigned_dish_ids) == 0: self._update_component_state(resourced=False) self._dish_utils = None self.logger.info(f"DISH IDs after removal: {self._assigned_dish_ids}") return dish_ids
[docs] def release_vcc( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Release dishes from subarray. :param params: JSON string with the list of DISH (receptor) IDs to be removed :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( COMMAND_RELEASE_RESOURCES, task_callback, task_abort_event ): return # Validate Command valid, param_dict = self._handle_command_validation( COMMAND_RELEASE_RESOURCES, params, task_callback ) if not valid: return self._set_transaction_id(param_dict) self._log_transaction_id(COMMAND_RELEASE_RESOURCES, param_dict) dish_ids = param_dict[param_keys.RECEPTOR_IDS] input_dish_valid, msg = self._dish_utils.are_Valid_DISH_Ids(dish_ids) if not input_dish_valid: task_callback( status=TaskStatus.COMPLETED, result=(ResultCode.FAILED, msg), ) return dish_ids_released = self._release_vcc_resources(dish_ids=dish_ids) if len(dish_ids_released) == 0: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to release any DISH IDs.", ), ) return task_callback( result=( ResultCode.OK, "ReleaseResources completed OK; successfully released " + ", ".join(dish_ids_released), ), status=TaskStatus.COMPLETED, ) return
[docs] def release_all_vcc( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Release all resources/dishes from subarray. :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( COMMAND_RELEASE_RESOURCES, task_callback, task_abort_event ): return # Validate the input valid, release_all_vcc_params_dict = self._handle_command_validation( COMMAND_RELEASE_ALL_RESOURCES, params, task_callback, ) if not valid: return self._set_transaction_id(release_all_vcc_params_dict) self._log_transaction_id( COMMAND_RELEASE_ALL_RESOURCES, release_all_vcc_params_dict ) self._release_vcc_resources( dish_ids=list(self._device.dish_ids_signal) ) if len(self._device.dish_ids_signal) != 0: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to remove all DISH IDs.", ), ) return task_callback( result=(ResultCode.OK, "ReleaseAllResources completed OK"), status=TaskStatus.COMPLETED, ) return
# ------------- # Scan Commands # ------------- # --- ConfigureScan Command --- # def _validate_configure_scan_input( self: CbfSubarrayComponentManager, config: str ) -> tuple[bool, dict]: """ Validate scan configuration JSON. :param config: The configuration as JSON formatted string. :return: False and None if validation failed, otherwise True and valid decoded dict """ self.logger.info("Validating ConfigureScan input JSON...") # Validate supported interface passed in the JSON string. # If invalid, validate_interface will return a message instead of the # param dict. (valid, config_dict) = validate_interface(config, "configurescan") if not valid: self.logger.error(config_dict) return False, None # Validate full_configuration against the telescope model try: # Workaround to enable strict validation here. # Need to set both PYTEST_VERSION to something and # SKA_SCHEMAS_ALLOW_STRICT_VALIDATION to True pytest_version = os.environ.get("PYTEST_VERSION") if pytest_version is None: os.environ["PYTEST_VERSION"] = "7.2.0" os.environ["SKA_SCHEMAS_ALLOW_STRICT_VALIDATION"] = "True" schema_validate( version=config_dict["interface"], config=config_dict, strictness=2, ) self.logger.info("Scan configuration is valid against telmodel!") except ValueError as value_error: self.logger.error( f"ConfigureScan JSON validation against the telescope model schema failed; {value_error}." ) return False, None # At this point, validate FSP, VCC, subscription parameters validate_scan_config = self.proxy_dict[ const.CONTROLLER ].validateSupportedConfiguration # MCS Scan Configuration Validation if validate_scan_config is True: validator = SubarrayScanConfigurationValidator( scan_configuration=config_dict, dish_ids=list(self._assigned_dish_ids), subarray_id=self._subarray_id, logger=self.logger, count_fsp=len(self._fqdn_fsp), ) success, msg = validator.validate_input() if success: self.logger.info(msg) else: self.logger.error(msg) config_dict = None return success, config_dict else: self.logger.info( "Skipping MCS supported configuration validation." ) return True, config_dict def _vcc_configure_scan( self: CbfSubarrayComponentManager, configuration: Dict[Any, Any], ) -> bool: """ Issue Vcc ConfigureScan command :param configuration: scan configuration dict :return: True if VCC ConfigureScan was successful, otherwise False """ self.logger.info( "Issuing ConfigureScan command to FHS VCC all bands controllers." ) command_results = self.issue_group_command( command_name="ConfigureScan", command_args=vcc_config_gen( configuration=configuration, vcc_ids=self._assigned_vcc_ids, vcc_unit_utils=self._vcc_unit_util, dish_utils=self._dish_utils, ), proxies=( self.proxy_dict[const.VCC_ALL_BANDS][vcc_id] for vcc_id in self._assigned_vcc_ids ), check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.READY, }, ) configure_scan_failure = False for vcc_id in self._assigned_vcc_ids: if ( command_results[ self.proxy_dict[const.VCC_ALL_BANDS][vcc_id].dev_name() ] == ResultCode.OK ): self._configured_vcc_ids.add(vcc_id) else: configure_scan_failure = True if configure_scan_failure: return False return True def _get_obs_mode_fsp_ids( self: CbfSubarrayComponentManager, configuration: Dict[Any, Any], ) -> bool: """ Return mapping of FSP IDs to observing modes requested in ConfigureScan. :param configuration: ConfigureScan dict :return: dict containing observing mode key names to FSP IDs """ obs_mode_to_fsp_ids = {} for key in configuration["midcbf"].keys(): if ( key in obs_mode_key_to_name and ObsMode[obs_mode_key_to_name[key]] in supported_values["observing_modes"] ): obs_mode_to_fsp_ids[key] = [] for processing_region in configuration["midcbf"][key][ "processing_regions" ]: obs_mode_to_fsp_ids[key].extend( processing_region["fsp_ids"] ) return obs_mode_to_fsp_ids def _fsp_configure_scan( self: CbfSubarrayComponentManager, configuration: Dict[Any, Any], ) -> bool: """ Issue FSP function mode subarray ConfigureScan command :param configuration: ConfigureScan dict :return: True if successfully configured all FSP devices, otherwise False """ self.logger.info("Configuring FSPs for scan...") # Configuration has been validated at this point so we don't need to worry # about repeated FSP IDs across processing regions. obs_mode_fsp_ids = self._get_obs_mode_fsp_ids(configuration) if len(obs_mode_fsp_ids) == 0: self.logger.error( "Failed to parse configuration for valid processing regions, " "may contain unsupported observing mode data; supported modes: " f"{supported_values['observing_modes']}" ) return False for obs_mode_key, fsp_ids in obs_mode_fsp_ids.items(): obs_mode = obs_mode_key_to_name[obs_mode_key] # TODO: remove as observing modes become supported if obs_mode in ( ObsMode.PULSAR_TIMING.name, ObsMode.PULSAR_SEARCH.name, ObsMode.VLBI.name, ): self.logger.error(f"ObsMode.{obs_mode} currently unsupported.") continue # --- FSP UpdateObsMode command --- # # Subscribe to FSP device LRC change events for fsp_id in fsp_ids: self._event_handler.subscribe_events( proxy=self.proxy_dict[const.FSP][fsp_id], change_event_callbacks={ "longRunningCommandResult": self.results_callback }, ) # Issue UpdateObsMode command to all FSPs in the same mode. update_obs_mode_arg = orjson.dumps( { "subarray_id": self._subarray_id, "obs_mode": obs_mode, } ).decode() command_results = self.issue_group_command( command_name="UpdateObsMode", command_args=update_obs_mode_arg, proxies=( self.proxy_dict[const.FSP][fsp_id] for fsp_id in fsp_ids ), check_on_fail={ "subarrayMembership": lambda attr_val: self._subarray_id in attr_val, "obsMode": lambda attr_val: attr_val == ObsMode[obs_mode], }, ) update_obs_mode_failure = False for fsp_id in fsp_ids: dev_name = self.proxy_dict[const.FSP][fsp_id].dev_name() if command_results[dev_name] == ResultCode.OK: self._assigned_fsp_ids.add(fsp_id) self.logger.info(f"FSP {fsp_id} successfully assigned.") else: update_obs_mode_failure = True self.logger.error(f"Failed to assign FSP {fsp_id}.") if update_obs_mode_failure: return False # --- FSP ObsMode Subarray ConfigureScan command --- # # Subscribe to FSP ObsMode Subarray device LRC change events for fsp_id in fsp_ids: self._event_handler.subscribe_events( proxy=self.proxy_dict[obs_mode][fsp_id], change_event_callbacks={ "longRunningCommandResult": self.results_callback }, ) command_results = self.issue_group_command( command_name="ConfigureScan", command_args=fsp_config_gen( obs_mode_key=obs_mode_key, configuration=configuration, dish_utils=self._dish_utils, subarray_dish_ids=self._assigned_dish_ids, wideband_shift=0, ), proxies=( self.proxy_dict[obs_mode][fsp_id] for fsp_id in fsp_ids ), check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.READY, }, ) configure_failure = False for fsp_id in fsp_ids: dev_name = self.proxy_dict[obs_mode][fsp_id].dev_name() if command_results[dev_name] == ResultCode.OK: self.logger.info(f"{dev_name} successfully configured.") self._configured_fsp_mode_ids[obs_mode].add(fsp_id) else: self.logger.error(f"Failed to configure {dev_name}.") self._event_handler.unsubscribe_events( proxy=self.proxy_dict[obs_mode][fsp_id] ) configure_failure = True if configure_failure: return False return True def _subscribe_tm_event( self: CbfSubarrayComponentManager, subscription_point: str, callback: Callable, ) -> bool: """ Subscribe to change events on TM-published data subscription point :param subscription_point: FQDN of TM data subscription point :param callback: callback for event subscription :return: False if VCC ConfigureScan command failed, otherwise True """ self.logger.info(f"Attempting subscription to {subscription_point}") # split delay_model_subscription_point between device FQDN and attribute name subscription_point_split = subscription_point.split("/") fqdn = "/".join(subscription_point_split[:-1]) attr_name = subscription_point_split[-1] try: self.proxy_dict[const.TM_LEAF_NODE] = context.DeviceProxy( device_name=fqdn ) self._event_handler.subscribe_events( proxy=self.proxy_dict[const.TM_LEAF_NODE], change_event_callbacks={attr_name: callback}, ) except tango.DevFailed as dev_failed: self.logger.error( f"Failed to subscribe to change events for {subscription_point}; {dev_failed}" ) return False return True def _deconfigure_vcc( self: CbfSubarrayComponentManager, transaction_id: str ) -> bool: """ Return VCC to IDLE state if possible :param transaction_id: optional transaction ID :return: False if failed to release FSP device, True otherwise """ if len(self._configured_vcc_ids) == 0: return True self.logger.info( f"Deconfiguring the following VCCs: {self._configured_vcc_ids}" ) # Deconfigure any successfully configured VCC proxies. go_to_idle_params = orjson.dumps( { "interface": CBF_GOTOIDLE_VER1_0, "transaction_id": transaction_id, } ).decode() vcc_failure = False command_results = self.issue_group_command( command_name="GoToIdle", command_args=go_to_idle_params, proxies=self.configured_vcc_proxies, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.IDLE, }, ) deconfigured_vcc_ids = [] for vcc_id in self._configured_vcc_ids: if ( command_results[ self.proxy_dict[const.VCC_ALL_BANDS][vcc_id].dev_name() ] == ResultCode.OK ): deconfigured_vcc_ids.append(vcc_id) else: vcc_failure = True self._configured_vcc_ids.difference_update(deconfigured_vcc_ids) if vcc_failure: self.logger.error( f"Failed to deconfigure the following VCCs: {self._configured_vcc_ids}" ) return False self.logger.info("Successfully deconfigured all VCCs.") return True def _release_all_fsp( self: CbfSubarrayComponentManager, transaction_id: str ) -> bool: """ Remove subarray membership and return FSP to IDLE state if possible :param transaction_id: optional transaction ID :return: False if failed to release FSP device, True otherwise """ if len(self._assigned_fsp_ids) == 0: return True self.logger.info( f"Releasing all FSP from subarray; current assigned FSPs: {self._assigned_fsp_ids}" ) # Deconfigure any successfully configured FSP mode proxies. go_to_idle_params = orjson.dumps( { "interface": CBF_GOTOIDLE_VER1_0, "transaction_id": transaction_id, } ).decode() fsp_failure = False command_results = self.issue_group_command( command_name="GoToIdle", command_args=go_to_idle_params, proxies=self.configured_fsp_mode_proxies, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.IDLE, }, ) deconfigured_fsp_ids = [] for obs_mode, fsp_ids in self._configured_fsp_mode_ids.items(): for fsp_id in fsp_ids: fsp_mode_proxy = self.proxy_dict[obs_mode][fsp_id] if command_results[fsp_mode_proxy.dev_name()] == ResultCode.OK: self._event_handler.unsubscribe_events( proxy=fsp_mode_proxy ) deconfigured_fsp_ids.append((obs_mode, fsp_id)) else: fsp_failure = True for obs_mode, fsp_id in deconfigured_fsp_ids: self._configured_fsp_mode_ids[obs_mode].remove(fsp_id) # Remove subarray membership from successfully assigned FSP self.logger.info( f"Setting FSPs {self._assigned_fsp_ids} to ObsMode.IDLE." ) update_obs_mode_params = orjson.dumps( { "subarray_id": self._subarray_id, "obs_mode": "IDLE", } ).decode() command_results = self.issue_group_command( command_name="UpdateObsMode", command_args=update_obs_mode_params, proxies=self.assigned_fsp_proxies, # Not checking if obsMode == ObsMode.IDLE as FSP may still be in use # by other subarrays. check_on_fail={ "subarrayMembership": lambda attr_val: self._subarray_id not in attr_val, }, ) fsp_ids_to_remove = [] for fsp_id in self._assigned_fsp_ids: fsp_proxy = self.proxy_dict[const.FSP][fsp_id] if command_results[fsp_proxy.dev_name()] == ResultCode.OK: self._event_handler.unsubscribe_events(proxy=fsp_proxy) fsp_ids_to_remove.append(fsp_id) else: fsp_failure = True self._assigned_fsp_ids.difference_update(fsp_ids_to_remove) if fsp_failure: self.logger.error( f"Failed to release all FSPs; assigned FSP IDs: {self._assigned_fsp_ids}; configured FSP modes: {self._configured_fsp_mode_ids}" ) return False self.logger.info("Successfully released all FSPs.") return True def _deconfigure( self: CbfSubarrayComponentManager, transaction_id: str = "" ) -> bool: """ Completely deconfigure the subarray; all initialization performed by the ConfigureScan command must be 'undone' here. This method is invoked by GoToIdle, ConfigureScan, ObsReset and Restart in CbfSubarray :param transaction_id: optional transaction ID :return: False if failed to deconfigure, otherwise True """ self.logger.info("Deconfiguring subarray...") # Update device attribute signals self._device.scan_id_signal = 0 self._device.configuration_id_signal = "" self._device.frequency_band_signal = 0 # Unsubscribe from TMC events. try: if self.proxy_dict.get(const.TM_LEAF_NODE) is not None: self.logger.info("Unsubscribing from TM events.") self._event_handler.unsubscribe_events( proxy=self.proxy_dict[const.TM_LEAF_NODE] ) self.proxy_dict[const.TM_LEAF_NODE] = None except tango.DevFailed as dev_failed: self.logger.error( f"Error in unsubscribing from TM events; {dev_failed}" ) # Event processing thread exits when this is popped from the queue if self._delay_model_thread is not None: self.delay_model_event_callback(EventHandler.EXIT_CONDITION) self._delay_model_thread.join() self._delay_model_thread = None # Clear queue; thread may have exited before reaching EXIT_CONDITION self._clear_queue(self._delay_model_queue, "delayModel") # Update delayModel attribute signal self._device.delay_model_signal = orjson.dumps( {"start_validity_sec": 0} ).decode() fsp_success = self._release_all_fsp(transaction_id) if not fsp_success: return False vcc_success = self._deconfigure_vcc(transaction_id) if not vcc_success: return False return True
[docs] def configure_scan( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Execute configure scan operation. :param params: JSON string with the configure scan parameters :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "ConfigureScan", task_callback, task_abort_event ): return # NOTE: this should be the only dict of the full scan configuration loaded # into memory from here on out; it is important that it is not modified # by any methods that require it. # Loading many copies of this potentially very large data structure into # memory can lead to performance issues - hence why we use generators for # the VCC and FSP argument builders. valid, scan_configuration = self._validate_configure_scan_input(params) if not valid: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to validate ConfigureScan input JSON", ), ) return self._set_transaction_id(scan_configuration) self._log_transaction_id("ConfigureScan", scan_configuration) transaction_id = scan_configuration[param_keys.TRANSACTION_ID] self.logger.debug( f"Subarray ConfigureScan Configuration: {scan_configuration}" ) # deconfigure to reset assigned FSPs and unsubscribe from events. deconfigure_success = self._deconfigure(transaction_id) if not deconfigure_success: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to deconfigure subarray", ), ) return # --- Configure VCC --- # vcc_configure_scan_success = self._vcc_configure_scan( configuration=scan_configuration, ) if not vcc_configure_scan_success: self._deconfigure(transaction_id) task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to issue ConfigureScan command to VCC", ), ) return # --- Configure FSP --- # fsp_configure_scan_success = self._fsp_configure_scan( scan_configuration ) if not fsp_configure_scan_success: self._deconfigure(transaction_id) task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to issue ConfigureScan command to FSP", ), ) return # --- Delay Models --- # # Subscribe to delay model subscription point delay_model_success = self._subscribe_tm_event( subscription_point=scan_configuration["midcbf"][ "delay_model_subscription_point" ], callback=self.delay_model_event_callback, ) if not delay_model_success: self.logger.error("Failed to subscribe to TM events.") # If unsuccessful, reset all assigned FSP devices, unsubscribe from events self._deconfigure(transaction_id) task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to subscribe to delayModel attribute", ), ) return # Start delay model update thread if self._delay_model_thread is None: self._delay_model_thread = Thread( target=self._update_delay_model, name="CbfSubarray._update_delay_model", daemon=True, ) self._delay_model_thread.start() # Update device attribute signals self._device.configuration_id_signal = str( scan_configuration["common"]["config_id"] ) self._device.frequency_band_signal = freq_band_dict[ scan_configuration["common"]["frequency_band"] ]["band_index"] # Update ObsState machine via callback self._update_component_state(configured=True) task_callback( result=(ResultCode.OK, "ConfigureScan completed OK"), status=TaskStatus.COMPLETED, ) return
# --- Scan Command --- #
[docs] def scan_start_time_rounded_callback( self: CbfSubarrayComponentManager, event_data: tango.EventData ): """ Callback for FspModeSubarray's scanStartTimeRounded attribute change. If _scan_started is False, set _scanStarted to True and sync the received scanStartTimeRounded as firstOutputTime to all FspModeSubarray devices in the subarray. :param event_data: Tango attribute change event data :type event_data: Optional[tango.EventData] """ # NOTE (CIP-3559): event callbacks must be as short as possible # Here we simply queue event data and handle possible exceptions try: self._scan_start_time_rounded_queue.put(event_data) except Full: # Start dropping data if queue is full self._scan_start_time_rounded_queue.get_nowait() self._scan_start_time_rounded_queue.task_done() self._scan_start_time_rounded_queue.put(event_data)
def _set_first_output_time( self: CbfSubarrayComponentManager, ): """ Process FspModeSubarray's scanStartTimeRounded attribute change events and synchronize first output timestamps. If _scan_started is False, set _scanStarted to True and sync the received scanStartTimeRounded as firstOutputTime to all FspModeSubarray device in the subarray. :param event_data: Tango attribute change event data :type event_data: Optional[tango.EventData] """ with tango.EnsureOmniThread(): self.logger.info("Entering scanStartTimeRounded Thread...") # Subscribe to scanStartTimeRounded change events, then begin processing # events queue. for fsp_mode_proxy in self.configured_fsp_mode_proxies: self._event_handler.subscribe_events( proxy=fsp_mode_proxy, change_event_callbacks={ "scanStartTimeRounded": self.scan_start_time_rounded_callback }, ) while True: event_data = self._scan_start_time_rounded_queue.get() self._scan_start_time_rounded_queue.task_done() if event_data is EventHandler.EXIT_CONDITION: # This ends the thread self.logger.error( "scanStartTimeRounded event received is EventHandler.EXIT_CONDITION" ) break # Skip zero timestamps. timestamp = int(event_data.attr_value.value) if timestamp == 0: continue current_scan_started = self._read_scan_started() # Next sync the event value (startScanTimeRounded) to FspModeSubarrays's firstOutputTime if ( current_scan_started is False and self._device.obs_state_signal == ObsState.SCANNING ): command_results = self.issue_group_command( command_name="SetFirstOutputTime", proxies=self.configured_fsp_mode_proxies, command_args=timestamp, check_on_fail={ "firstOutputTime": lambda attr_val: attr_val == timestamp }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): self.logger.error("Failed to sync firstOutputTime.") # TODO: break here, but should we continue to try with # the next scanStartTimeRounded event? break # Indicate the scan start time has been synchronized. self._update_scan_started(True) break elif ( current_scan_started is True and self._device.obs_state_signal == ObsState.SCANNING ): self.logger.debug( "Scan already started and firstOutputTime has already been set, skipping syncing firstOutputTime" ) break elif ( current_scan_started is True and self._device.obs_state_signal != ObsState.SCANNING ): self.logger.debug( f"Current Subarray is in ObsState.{self._device.obs_state_signal.name} " "but _scan_started flag is set to True. Skipping syncing firstOutputTime" ) break elif ( current_scan_started is False and self._device.obs_state_signal != ObsState.SCANNING ): self.logger.warning( f"Scan has not been started, skipping syncing firstOutputTime of value {event_data.attr_value.value}; " f"scan_started: {current_scan_started}; " f"observing state: {self._device.obs_state_signal}" ) # the elif case covers all permutations, no else case needed # Since we only need one timestamp, we can unsubscribe from all events here. for proxy in self.configured_fsp_mode_proxies: self._event_handler.unsubscribe_events( proxy=proxy, attr_names=["scanStartTimeRounded"], ) self.logger.info("Exiting scanStartTimeRounded Thread.")
[docs] def scan( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Start subarray Scan operation. :param params: JSON formatted string with the scan id :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "Scan", task_callback, task_abort_event ): return # Validate the input valid, scan_params_dict = self._handle_command_validation( const.COMMAND_SCAN, params, task_callback ) if not valid: return self._set_transaction_id(scan_params_dict) self._log_transaction_id(const.COMMAND_SCAN, scan_params_dict) # Flag for scanStartTimeRounded callbacks to determine if the Subarray has already sync the value to the FSPs # TODO: Need to update comment when FspPstSubarray is implemented? # Is scan consider started when either FspModeSubarray sends the first scanStartTimeRounded? self._update_scan_started(False) if self._scan_start_time_rounded_thread is None: self._scan_start_time_rounded_thread = Thread( target=self._set_first_output_time, name="CbfSubarray._set_first_output_time", daemon=True, ) self._scan_start_time_rounded_thread.start() # Issue Scan command to assigned resources. command_results = self.issue_group_command( command_name="Scan", command_args=orjson.dumps(scan_params_dict).decode(), proxies=self.configured_obs_proxies, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.SCANNING, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to issue Scan command to VCC/FSP", ), ) return # Update scanID attribute signal self._device.scan_id_signal = int(scan_params_dict["scan_id"]) # Update ObsState machine via callback self._update_component_state(scanning=True) task_callback( result=(ResultCode.OK, "Scan completed OK"), status=TaskStatus.COMPLETED, ) return
# --- EndScan Command --- #
[docs] def end_scan( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ End scan operation. :param params: JSON string with endscan input parameters :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "EndScan", task_callback, task_abort_event ): return # Validate the input valid, endscan_params_dict = self._handle_command_validation( const.COMMAND_ENDSCAN, params, task_callback ) if not valid: return self._set_transaction_id(endscan_params_dict) self._log_transaction_id(const.COMMAND_ENDSCAN, endscan_params_dict) # Issue EndScan to assigned resources. command_results = self.issue_group_command( command_name="EndScan", command_args=params, proxies=self.configured_obs_proxies, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.READY, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to issue EndScan command to VCC/FSP", ), ) return if self._scan_start_time_rounded_thread is not None: self.scan_start_time_rounded_callback(EventHandler.EXIT_CONDITION) self._scan_start_time_rounded_thread.join() self._scan_start_time_rounded_thread = None # Clear queue; thread may have exited before reaching EXIT_CONDITION self._clear_queue( self._scan_start_time_rounded_queue, "scanStartTimeRounded" ) self._update_scan_started(False) # Update ObsState machine via callback self._update_component_state(scanning=False) task_callback( result=(ResultCode.OK, "EndScan completed OK"), status=TaskStatus.COMPLETED, ) return
# --- GoToIdle Command --- #
[docs] def go_to_idle( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Execute observing state transition from READY to IDLE. :param params: JSON string with endscan input parameters :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating Abort has been issued """ # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "GoToIdle", task_callback, task_abort_event ): return # Validate the input valid, go_to_idle_params_dict = self._handle_command_validation( const.COMMAND_GOTOIDLE, params, task_callback ) if not valid: return self._set_transaction_id(go_to_idle_params_dict) self._log_transaction_id( const.COMMAND_GOTOIDLE, go_to_idle_params_dict ) transaction_id = go_to_idle_params_dict[param_keys.TRANSACTION_ID] # deconfigure to reset assigned FSPs and unsubscribe from events. deconfigure_success = self._deconfigure(transaction_id) if not deconfigure_success: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, "Failed to deconfigure subarray", ), ) return # Update ObsState machine via callback self._update_component_state(configured=False) task_callback( result=(ResultCode.OK, "GoToIdle completed OK"), status=TaskStatus.COMPLETED, ) return
# -------------- # Abort Commands # -------------- # --- Abort Command --- #
[docs] def abort( self: CbfSubarrayComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Abort the current scan operation. Due to the obsState model trigger "abort_completed" always transitioning out of ABORTING to ABORTED, this thread must not return until the very end, otherwise the device could successfully transition to ABORTED without issuing a scan abort to sub-devices (refactor in response to SKB-796). :param params: JSON string with endscan input parameters :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating AbortCommands has been issued :return: None """ result = (ResultCode.OK, "Abort completed OK") status = TaskStatus.COMPLETED # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "Abort", task_callback, task_abort_event ): message = "Abort command aborted by task executor abort event" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Validate the input # Use _validate_command over _handle_command_validation because: # 1) We don't want to set the task_callback (see method docstring) # 2) We want the invalid reason to put into the result valid, param_dict, invalid_reason = self._validate_command( const.COMMAND_ABORT, params ) if not valid: result = (ResultCode.FAILED, invalid_reason) status = TaskStatus.COMPLETED param_dict = { param_keys.INTERFACE: CBF_ABORT_VER1_0, param_keys.TRANSACTION_ID: "", } self._set_transaction_id(param_dict) self._log_transaction_id(const.COMMAND_ABORT, param_dict) # TODO: Determine subarray Abort command control of VCCAllBandsController. # Currently VCCAllBandsController inherits from ska-tango-base v1.0.0, # implementing only the AbortCommands command, which has a different # use procedure than the way Abort was implemented here previously. # Issue Abort to assigned resources. command_results = self.issue_group_command( command_name="Abort", proxies=self.configured_obs_proxies, command_args=orjson.dumps(param_dict).decode(), check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.ABORTED, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): message = "Failed to issue Abort command to VCC/FSP" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Update ObsState machine via callback self._update_component_state(scanning=False) task_callback(result=result, status=status) self._update_scan_started(False) return
# --- ObsReset Command --- #
[docs] def obs_reset( self: CbfSubarrayComponentManager, obsreset_params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Reset the scan operation to IDLE from ABORTED or FAULT. Due to the obsState model trigger "obsreset_invoked" only allowing a transition out of ABORTED and FAULT states, and "obsreset_completed" always transitioning out of RESETTING to IDLE, this thread must not return until the very end, otherwise the device could successfully transition to IDLE without properly deconfiguring (refactor in response to SKB-796). :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating AbortCommands has been issued :return: None """ result = (ResultCode.OK, "ObsReset completed OK") status = TaskStatus.COMPLETED # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "ObsReset", task_callback, task_abort_event ): message = "ObsReset command aborted by task executor abort event" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Validate the input # Use _validate_command over _handle_command_validation because: # 1) We don't want to set the task_callback (see method docstring) # 2) We want the invalid reason to put into the result valid, param_dict, invalid_reason = self._validate_command( const.COMMAND_OBSRESET, obsreset_params ) if not valid: result = (ResultCode.FAILED, invalid_reason) status = TaskStatus.COMPLETED param_dict = { param_keys.INTERFACE: CBF_OBSRESET_VER1_0, param_keys.TRANSACTION_ID: "", } self._set_transaction_id(param_dict) self._log_transaction_id(const.COMMAND_OBSRESET, param_dict) transaction_id = param_dict[param_keys.TRANSACTION_ID] # if subarray is in FAULT, we must first abort VCC and FSP operation # this will allow us to call ObsReset on them even if they are not in FAULT if self._component_state["obsfault"]: # build abort command parameters abort_params = { "interface": CBF_ABORT_VER1_0, "transaction_id": transaction_id, } command_results = self.issue_group_command( command_name="Abort", proxies=self.configured_obs_proxies, command_args=orjson.dumps(abort_params).decode(), check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.ABORTED, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): message = "Failed to issue Abort command to VCC/FSP" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED command_results = self.issue_group_command( command_name="ObsReset", command_args=orjson.dumps(param_dict).decode(), proxies=self.configured_obs_proxies, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.IDLE, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): message = "Failed to issue ObsReset command to VCC/FSP" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # We might have interrupted a long-running command such as a Configure # or a Scan, so we need to clean up from that. # deconfigure to reset assigned FSPs and unsubscribe from events. deconfigure_success = self._deconfigure(transaction_id) if not deconfigure_success: message = "Failed to deconfigure subarray" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Update ObsState machine via callback # There is no obsfault == False action implemented, however, # we reset it to False so that obsfault == True may be triggered in the future, # by updating the component state dict in BaseComponentManager. self._update_component_state(configured=False, obsfault=False) task_callback(result=result, status=status) return
# --- Restart Command --- #
[docs] def restart( self: CbfSubarrayComponentManager, restart_params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Reset the scan operation to EMPTY from ABORTED or FAULT. Due to the obsState model trigger "restart_invoked" only allowing a transition out of ABORTED and FAULT states, and "restart_completed" always transitioning out of RESTARTING to EMPTY, this thread must not return until the very end, otherwise the device could successfully transition to EMPTY without properly deconfiguring (refactor in response to SKB-796). :param task_callback: callback for driving status of task executor's current LRC task :param task_abort_event: event indicating AbortCommands has been issued :return: None """ result = (ResultCode.OK, "Restart completed OK") status = TaskStatus.COMPLETED # set task status in progress, check for abort event task_callback(status=TaskStatus.IN_PROGRESS) if self.task_abort_event_is_set( "Restart", task_callback, task_abort_event ): message = "Restart command aborted by task executor abort event" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Validate the input # Use _validate_command over _handle_command_validation because: # 1) We don't want to set the task_callback (see method docstring) # 2) We want the invalid reason to put into the result valid, param_dict, invalid_reason = self._validate_command( COMMAND_RESTART, restart_params ) if not valid: result = (ResultCode.FAILED, invalid_reason) status = TaskStatus.COMPLETED param_dict = { param_keys.INTERFACE: CBF_RESTART_VER1_0, param_keys.TRANSACTION_ID: "", } self._set_transaction_id(param_dict) self._log_transaction_id(COMMAND_RESTART, param_dict) transaction_id = param_dict[param_keys.TRANSACTION_ID] # if subarray is in FAULT, we must first abort VCC and FSP operation # this will allow us to call ObsReset on them even if they are not in FAULT if self._component_state["obsfault"]: # build abort command parameters abort_params = orjson.dumps( { "interface": CBF_ABORT_VER1_0, "transaction_id": transaction_id, } ).decode() command_results = self.issue_group_command( command_name="Abort", proxies=self.configured_obs_proxies, command_args=abort_params, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.ABORTED, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): message = "Failed to issue Abort command to VCC/FSP" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # build ObsReset command parameters obsreset_params = orjson.dumps( { "interface": CBF_OBSRESET_VER1_0, "transaction_id": param_dict[param_keys.TRANSACTION_ID], } ).decode() command_results = self.issue_group_command( command_name="ObsReset", command_args=obsreset_params, proxies=self.configured_obs_proxies, check_on_fail={ "obsState": lambda attr_val: attr_val == ObsState.IDLE, }, ) if any( result_code != ResultCode.OK for result_code in command_results.values() ): message = "Failed to issue ObsReset command to VCC/FSP" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # We might have interrupted a long-running command such as a Configure # or a Scan, so we need to clean up from that. # deconfigure to reset assigned FSPs and unsubscribe from events. deconfigure_success = self._deconfigure(transaction_id) if not deconfigure_success: message = "Failed to deconfigure subarray" self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Update ObsState machine via callback self._update_component_state(configured=False) # remove all assigned VCCs to return to EMPTY self._release_vcc_resources(dish_ids=list(self._assigned_dish_ids)) if len(self._assigned_dish_ids) != 0: message = "Failed to release all DISH resources." self.logger.error(message) result = (ResultCode.FAILED, message) status = TaskStatus.COMPLETED # Update ObsState machine via callback # There is no obsfault == False action implemented, however, # we reset it to False so that obsfault == True may be triggered in the future, # by updating the component state dict in BaseComponentManager. self._update_component_state(obsfault=False) task_callback(result=result, status=status) return