Source code for ska_mid_cbf_mcs.base.obs.obs_component_manager

# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.

from __future__ import annotations

from threading import Event
from typing import Any, Callable, Optional

from ska_control_model import ResultCode, TaskStatus
from ska_schemas.schema import validate as schema_validate

from ska_mid_cbf_mcs.base.component_manager import CbfComponentManager
from ska_mid_cbf_mcs.commons.global_enum import param_keys
from ska_mid_cbf_mcs.commons.validate_interface import validate_interface

__all__ = ["CbfObsComponentManager"]


[docs] class CbfObsComponentManager(CbfComponentManager): """ A base observing device component manager for SKA Mid.CBF MCS """ def __init__( self: CbfObsComponentManager, *args: Any, **kwargs: Any, ) -> None: """Initialise a new CbfObsComponentManager instance.""" # Initialize observing state machine trigger keywords "configured", # "scanning", "resourced" and "obsfault" super().__init__( *args, configured=None, scanning=None, resourced=None, obsfault=None, **kwargs, ) # --------------- # Command Methods # --------------- # --- Helper Methods --- # def _set_transaction_id( self: CbfObsComponentManager, params: dict ) -> None: """ Sets the transaction id of the input parameters if they don't already exist. Must follow short SKUID format defined in ADR-129 see: https://confluence.skatelescope.org/display/SWSI/ADR-129+Short+SKUIDs :param params: dictionary of input command parameters """ # TODO: Implement fully if not params.get(param_keys.TRANSACTION_ID): # FHS downstream needs the transaction_id key in its input parameters. params[param_keys.TRANSACTION_ID] = "" def _handle_command_validation( self: CbfObsComponentManager, command: str, params: str, task_callback: Optional[Callable] = None, ) -> tuple[bool, dict]: """ Handles validating and issuing the task_callback for commands :param command: TANGO command name :param params: command input JSON string :param task_callback: Callback callable, defaults to None :return: Tuple of (bool, Dict) which is if the parameters are valid for the command, and the command parameters in a python dictionary """ valid, param_dict, invalid_reason = self._validate_command( command, params ) if not valid: message = ( f"{command} input parameters are not valid: {invalid_reason}" ) self.logger.error(message) # NOTE: ResultCode.FAILED is pushed to indicated failure due to invalid # input; ResultCode.REJECTED is reserved for the task executor to indicate # that a task was not submitted to the queue. # Additionally, the task executor callback uses TaskStatus.COMPLETED # to indicate that a task was run to completion, even if the result # is a failure; TaskStatus.FAILED is reserved for indicating that a # task failed to run in its entirety. if task_callback is not None: task_callback( status=TaskStatus.COMPLETED, result=( ResultCode.FAILED, message, ), ) return False, None self._set_transaction_id(param_dict) return True, param_dict def _validate_command( self: CbfObsComponentManager, command: str, params: str ) -> tuple[bool, dict, str]: """ Validates the command parameters. Checks that the command's input JSON parameters can be parsed, have a valid and supported interface, and are valid against ska-schemas. Does not perform any MCS specific validation. :param command: Name of the command to validate. :param params: The JSON string with the command's input parameters. :return: Tuple containing: bool if the params are valid, dictionary of the parsed command parameters, and a reason message if the params are not valid. """ self.logger.info( f"Validating {command} parameters against Ska-Schemas." ) # Validate Interface # If invalid, validate_interface will return a message instead of the # param dict. valid, params_dict = validate_interface(params, command.lower()) if not valid: message = f"Failed to validate {command} interface: {params_dict}" return (False, None, message) # Validate Parameters interface = params_dict[param_keys.INTERFACE] try: schema_validate( version=interface, config=params_dict, strictness=2 ) except ValueError as value_error: self.logger.error( f"{command}: {interface} schema validation failed: {value_error}" ) # noqa: E501s message = "JSON validation against the ska-schemas schema failed." return (False, None, message) self.logger.info( f"Input {command} parameters are valid against Ska-Schemas!" ) return (True, params_dict, None) def _log_transaction_id( self: CbfObsComponentManager, command: str, params: dict ) -> None: """ logs the transaction id if available in the command parameters :param command: Name of the command being processed :param params: Command input parameters dictionary """ transaction_id = params.get(param_keys.TRANSACTION_ID, "") message = f"Processing {command} command" if transaction_id != "": message += f" with transaction ID: {transaction_id}" self.logger.info(message)
[docs] def configure_scan( self: CbfComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Execute configure scan operation. :param params: JSON string with input parameters :raises NotImplementedError: Not implemented in abstract class """ raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs] def scan( self: CbfComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Begin scan operation. :param params: JSON string with input parameters :raises NotImplementedError: Not implemented in abstract class """ raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs] def end_scan( self: CbfComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ End scan operation. :param params: JSON string with input parameters :raises NotImplementedError: Not implemented in abstract class """ raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs] def go_to_idle( self: CbfComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Execute observing state transition from READY to IDLE. :param params: JSON string with input parameters :raises NotImplementedError: Not implemented in abstract class """ raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs] def abort( self: CbfComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Abort the current scan operation. :param params: JSON string with input parameters :raises NotImplementedError: Not implemented in abstract class """ raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs] def obs_reset( self: CbfComponentManager, params: str, task_callback: Optional[Callable] = None, task_abort_event: Optional[Event] = None, ) -> None: """ Reset observing state from ABORTED or FAULT to IDLE. :param params: JSON string with input parameters :raises NotImplementedError: Not implemented in abstract class """ raise NotImplementedError("CbfObsComponentManager is abstract.")