# -*- coding: utf-8 -*-
#
# This file is part of the ska-mid-cbf-mcs project
#
# Distributed under the terms of the BSD 3-Clause license.
# See LICENSE for more info.
from __future__ import annotations
from threading import Event
from typing import Any, Callable, Optional
from ska_control_model import ResultCode, TaskStatus
from ska_schemas.schema import validate as schema_validate
from ska_mid_cbf_mcs.base.component_manager import CbfComponentManager
from ska_mid_cbf_mcs.commons.global_enum import param_keys
from ska_mid_cbf_mcs.commons.validate_interface import validate_interface
__all__ = ["CbfObsComponentManager"]
[docs]
class CbfObsComponentManager(CbfComponentManager):
"""
A base observing device component manager for SKA Mid.CBF MCS
"""
def __init__(
self: CbfObsComponentManager,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialise a new CbfObsComponentManager instance."""
# Initialize observing state machine trigger keywords "configured",
# "scanning", "resourced" and "obsfault"
super().__init__(
*args,
configured=None,
scanning=None,
resourced=None,
obsfault=None,
**kwargs,
)
# ---------------
# Command Methods
# ---------------
# --- Helper Methods --- #
def _set_transaction_id(
self: CbfObsComponentManager, params: dict
) -> None:
"""
Sets the transaction id of the input parameters if they don't already
exist. Must follow short SKUID format defined in ADR-129
see:
https://confluence.skatelescope.org/display/SWSI/ADR-129+Short+SKUIDs
:param params: dictionary of input command parameters
"""
# TODO: Implement fully
if not params.get(param_keys.TRANSACTION_ID):
# FHS downstream needs the transaction_id key in its input parameters.
params[param_keys.TRANSACTION_ID] = ""
def _handle_command_validation(
self: CbfObsComponentManager,
command: str,
params: str,
task_callback: Optional[Callable] = None,
) -> tuple[bool, dict]:
"""
Handles validating and issuing the task_callback for commands
:param command: TANGO command name
:param params: command input JSON string
:param task_callback: Callback callable, defaults to None
:return: Tuple of (bool, Dict) which is if the parameters are valid for
the command, and the command parameters in a python dictionary
"""
valid, param_dict, invalid_reason = self._validate_command(
command, params
)
if not valid:
message = (
f"{command} input parameters are not valid: {invalid_reason}"
)
self.logger.error(message)
# NOTE: ResultCode.FAILED is pushed to indicated failure due to invalid
# input; ResultCode.REJECTED is reserved for the task executor to indicate
# that a task was not submitted to the queue.
# Additionally, the task executor callback uses TaskStatus.COMPLETED
# to indicate that a task was run to completion, even if the result
# is a failure; TaskStatus.FAILED is reserved for indicating that a
# task failed to run in its entirety.
if task_callback is not None:
task_callback(
status=TaskStatus.COMPLETED,
result=(
ResultCode.FAILED,
message,
),
)
return False, None
self._set_transaction_id(param_dict)
return True, param_dict
def _validate_command(
self: CbfObsComponentManager, command: str, params: str
) -> tuple[bool, dict, str]:
"""
Validates the command parameters. Checks that the command's input
JSON parameters can be parsed, have a valid and supported interface, and
are valid against ska-schemas. Does not perform any MCS specific validation.
:param command: Name of the command to validate.
:param params: The JSON string with the command's input parameters.
:return: Tuple containing: bool if the params are valid, dictionary of
the parsed command parameters, and a reason message if the params
are not valid.
"""
self.logger.info(
f"Validating {command} parameters against Ska-Schemas."
)
# Validate Interface
# If invalid, validate_interface will return a message instead of the
# param dict.
valid, params_dict = validate_interface(params, command.lower())
if not valid:
message = f"Failed to validate {command} interface: {params_dict}"
return (False, None, message)
# Validate Parameters
interface = params_dict[param_keys.INTERFACE]
try:
schema_validate(
version=interface, config=params_dict, strictness=2
)
except ValueError as value_error:
self.logger.error(
f"{command}: {interface} schema validation failed: {value_error}"
) # noqa: E501s
message = "JSON validation against the ska-schemas schema failed."
return (False, None, message)
self.logger.info(
f"Input {command} parameters are valid against Ska-Schemas!"
)
return (True, params_dict, None)
def _log_transaction_id(
self: CbfObsComponentManager, command: str, params: dict
) -> None:
"""
logs the transaction id if available in the command parameters
:param command: Name of the command being processed
:param params: Command input parameters dictionary
"""
transaction_id = params.get(param_keys.TRANSACTION_ID, "")
message = f"Processing {command} command"
if transaction_id != "":
message += f" with transaction ID: {transaction_id}"
self.logger.info(message)
[docs]
def scan(
self: CbfComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Begin scan operation.
:param params: JSON string with input parameters
:raises NotImplementedError: Not implemented in abstract class
"""
raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs]
def end_scan(
self: CbfComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
End scan operation.
:param params: JSON string with input parameters
:raises NotImplementedError: Not implemented in abstract class
"""
raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs]
def go_to_idle(
self: CbfComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Execute observing state transition from READY to IDLE.
:param params: JSON string with input parameters
:raises NotImplementedError: Not implemented in abstract class
"""
raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs]
def abort(
self: CbfComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Abort the current scan operation.
:param params: JSON string with input parameters
:raises NotImplementedError: Not implemented in abstract class
"""
raise NotImplementedError("CbfObsComponentManager is abstract.")
[docs]
def obs_reset(
self: CbfComponentManager,
params: str,
task_callback: Optional[Callable] = None,
task_abort_event: Optional[Event] = None,
) -> None:
"""
Reset observing state from ABORTED or FAULT to IDLE.
:param params: JSON string with input parameters
:raises NotImplementedError: Not implemented in abstract class
"""
raise NotImplementedError("CbfObsComponentManager is abstract.")