CbfObsDevice Class

class ska_mid_cbf_mcs.base.obs.obs_device.CbfObsDevice(*args: Any, **kwargs: Any)[source]

Bases: CbfDevice

A generic base observing device for Mid.CBF. Extends CbfDevice to implement key values for observing devices.

obs_state_signal: AttrSignal[ObsState]

Signal for the obsState attribute. Values are emitted for this signal whenever a client changes the attribute.

scan_id_signal: AttrSignal[int]

Signal for the scanID attribute. Values are emitted for this signal whenever a client changes the attribute.

configuration_id_signal: AttrSignal[str]

Signal for the configurationID attribute. Values are emitted for this signal whenever a client changes the attribute.

delay_model_signal: AttrSignal[str]

Signal for the delayModel attribute. Values are emitted for this signal whenever a client changes the attribute.

last_scan_configuration_signal: AttrSignal[str]

Signal for the lastScanConfiguration attribute. Values are emitted for this signal whenever a client changes the attribute.

obsState: attribute_from_signal

obsState device attribute

scanID: attribute_from_signal

scanID device attribute

configurationID: attribute_from_signal

configurationID device attribute

delayModel: attribute_from_signal

delayModel device attribute

lastScanConfiguration: attribute_from_signal

lastScanConfiguration device attribute

configure_scan_name = 'ConfigureScan'
scan_name = 'Scan'
end_scan_name = 'EndScan'
go_to_idle_name = 'GoToIdle'
obs_reset_name = 'ObsReset'
abort_name = 'Abort'
check_obs_command_allowed(command_name: str, allowed_obs_states: list[ObsState]) bool[source]

Check if an observing command is allowed.

Parameters:
  • command_name – name of command to check

  • allowed_obs_states – list of allowed observing states

Returns:

True if allowed, else False.

is_ConfigureScan_allowed(request_type: LRCReqType = LRCReqType.ENQUEUE_REQ) bool[source]

Check if ConfigureScan is allowed.

Parameters:

request_type – if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked.

Returns:

True if allowed, else False.

ConfigureScan(params: str) tuple[list[ResultCode], list[str]][source]

Configure the observing device parameters for the current scan.

Returns:

tuple containing a return code and a unique command identifier

is_Scan_allowed(request_type: LRCReqType = LRCReqType.ENQUEUE_REQ) bool[source]

Check if Scan is allowed.

Parameters:

request_type – if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked.

Returns:

True if allowed, else False.

Scan(params: str) tuple[list[ResultCode], list[str]][source]

Start an observing scan.

Returns:

tuple containing a return code and a unique command identifier

is_EndScan_allowed(request_type: LRCReqType = LRCReqType.ENQUEUE_REQ) bool[source]

Check if EndScan is allowed.

Parameters:

request_type – if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked.

Returns:

True if allowed, else False.

EndScan(params: str) tuple[list[ResultCode], list[str]][source]

End a running scan.

Returns:

tuple containing a return code and a unique command identifier

is_GoToIdle_allowed(request_type: LRCReqType = LRCReqType.ENQUEUE_REQ) bool[source]

Check if GoToIdle is allowed.

Parameters:

request_type – if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked.

Returns:

True if allowed, else False.

GoToIdle(params: str) tuple[list[ResultCode], list[str]][source]

Transit the device from READY to IDLE obsState. To keep in line with LMC, using “GoToIdle” rather than the SKA base class equivalent “End”.

Returns:

tuple containing a return code and a unique command identifier

is_ObsReset_allowed(request_type: LRCReqType = LRCReqType.ENQUEUE_REQ) bool[source]

Check if ObsReset is allowed.

Parameters:

request_type – if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked.

Returns:

True if allowed, else False.

ObsReset(params: str) tuple[list[ResultCode], list[str]][source]

Reset the observing device from a FAULT/ABORTED obsState to IDLE.

Returns:

tuple containing a return code and a unique command identifier

is_Abort_allowed(request_type: LRCReqType = LRCReqType.ENQUEUE_REQ) bool[source]

Check if Abort is allowed.

Parameters:

request_type – if LRCReqType.ENQUEUE_REQ, the task has only been submitted to the LRC queue, and this method will always return True; otherwise, task has been popped off the LRC queue, and the command allowance can be checked.

Returns:

True if allowed, else False.

Abort(params: str) tuple[list[ResultCode], list[str]][source]

Abort the current observing process and move to ABORTED obsState.

Returns:

tuple containing a return code and a unique command identifier

obs_state_action(action: str) None[source]

Callback provided to command tracker to drive the obs state model into transitioning states during the relevant command’s submitted thread.

Parameters:

action – the observing state model action to perform