Source code for sfini.execution.history

# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2018/08/09

"""State-machine execution history events.

Use ``sfini.execution.Execution.format_history`` for nice history
printing.
"""

import json
import typing as T
import logging as lg

from .. import _util

_logger = lg.getLogger(__name__)
_default = _util.DefaultParameter()
_type_keys = {
    "ActivityFailed": "activityFailedEventDetails",
    "ActivityScheduleFailed": "activityScheduleFailedEventDetails",
    "ActivityScheduled": "activityScheduledEventDetails",
    "ActivityStarted": "activityStartedEventDetails",
    "ActivitySucceeded": "activitySucceededEventDetails",
    "ActivityTimedOut": "activityTimedOutEventDetails",
    "ChoiceStateEntered": "stateEnteredEventDetails",
    "ChoiceStateExited": "stateExitedEventDetails",
    "ExecutionFailed": "executionFailedEventDetails",
    "ExecutionStarted": "executionStartedEventDetails",
    "ExecutionSucceeded": "executionSucceededEventDetails",
    "ExecutionAborted": "executionAbortedEventDetails",
    "ExecutionTimedOut": "executionTimedOutEventDetails",
    "FailStateEntered": "stateEnteredEventDetails",
    "LambdaFunctionFailed": "lambdaFunctionFailedEventDetails",
    "LambdaFunctionScheduleFailed": "lambdaFunctionScheduleFailedEventDetails",
    "LambdaFunctionScheduled": "lambdaFunctionScheduledEventDetails",
    "LambdaFunctionStartFailed": "lambdaFunctionStartFailedEventDetails",
    # "LambdaFunctionStarted": "lambdaFunctionStartedEventDetails",
    "LambdaFunctionSucceeded": "lambdaFunctionSucceededEventDetails",
    "LambdaFunctionTimedOut": "lambdaFunctionTimedOutEventDetails",
    "SucceedStateEntered": "stateEnteredEventDetails",
    "SucceedStateExited": "stateExitedEventDetails",
    # "TaskStateAborted": "stateAbortedEventDetails",
    "TaskStateEntered": "stateEnteredEventDetails",
    "TaskStateExited": "stateExitedEventDetails",
    "PassStateEntered": "stateEnteredEventDetails",
    "PassStateExited": "stateExitedEventDetails",
    # "ParallelStateAborted": "stateAbortedEventDetails",
    "ParallelStateEntered": "stateEnteredEventDetails",
    "ParallelStateExited": "stateExitedEventDetails",
    "ParallelStateFailed": "stateFailedEventDetails",
    # "ParallelStateStarted": "stateStartedEventDetails",
    # "ParallelStateSucceeded": "stateSucceededEventDetails",
    # "WaitStateAborted": "stateAbortedEventDetails",
    "WaitStateEntered": "stateEnteredEventDetails",
    "WaitStateExited": "stateExitedEventDetails"}


[docs]class Event: """An execution history event. Args: timestamp (datetime.datetime): event time-stamp event_type: type of event event_id: identifying index of event previous_event_id: identifying index of causal event """ def __init__( self, timestamp, event_type: str, event_id: int, previous_event_id: int = None): self.timestamp = timestamp self.event_type = event_type self.event_id = event_id self.previous_event_id = previous_event_id def __str__(self): fmt = "%s [%s] @ %s" return fmt % (self.event_type, self.event_id, self.timestamp) __repr__ = _util.easy_repr @staticmethod def _get_args( history_event: T.Dict[str, _util.JSONable] ) -> T.Tuple[tuple, T.Dict[str, T.Any], T.Dict[str, _util.JSONable]]: """Get initialisation arguments by parsing history event. Args: history_event: execution history event, provided by AWS API Returns: initialisation positional and keyword arguments, and event details """ # _logger.debug("history_event: %s" % history_event) timestamp = history_event["timestamp"] event_type = history_event["type"] event_id = history_event["id"] args = (timestamp, event_type, event_id) kwargs = {} if "previousEventId" in history_event: kwargs["previous_event_id"] = history_event["previousEventId"] details = history_event.get(_type_keys.get(event_type), {}) return args, kwargs, details
[docs] @classmethod def from_history_event( cls, history_event: T.Dict[str, _util.JSONable] ) -> "Event": """Parse an history event. Args: history_event: execution history event date, provided by AWS API Returns: Event: constructed execution history event """ args, kwargs, _ = cls._get_args(history_event) return cls(*args, **kwargs)
@_util.cached_property def details_str(self) -> str: """Format the event details. Returns: str: event details, formatted as string """ return ""
[docs]class Failed(Event): """An execution history failure event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event previous_event_id: identifying index of causal event error: error type cause: failure details """ def __init__( self, timestamp, event_type, event_id, previous_event_id=None, error: str = None, cause: str = None): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.error = error self.cause = cause @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) if "error" in details: kwargs["error"] = details["error"] if "cause" in details: kwargs["cause"] = details["cause"] return args, kwargs, details @_util.cached_property def details_str(self): return "error: %s" % self.error
[docs]class LambdaFunctionScheduled(Event): """An execution history AWS Lambda task-schedule event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event resource: AWS Lambda function ARN previous_event_id: identifying index of causal event task_input: task input timeout: time-out (seconds) of task execution """ def __init__( self, timestamp, event_type, event_id, resource: str, previous_event_id=None, task_input: _util.JSONable = _default, timeout: int = None): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.resource = resource self.task_input = task_input self.timeout = timeout @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) args += (details["resource"],) if "input" in details: kwargs["task_input"] = json.loads(details["input"]) if "timeoutInSeconds" in details: kwargs["timeout"] = details["timeoutInSeconds"] return args, kwargs, details @_util.cached_property def details_str(self): return "resource: %s" % self.resource
[docs]class ActivityScheduled(LambdaFunctionScheduled): """An execution history activity task-schedule event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event resource: AWS Lambda function ARN previous_event_id: identifying index of causal event task_input: task input timeout: time-out (seconds) of task execution heartbeat: heartbeat time-out (seconds) """ def __init__( self, timestamp, event_type, event_id, resource, previous_event_id=None, task_input=_default, timeout=None, heartbeat: int = None): super().__init__( timestamp, event_type, event_id, resource, previous_event_id=previous_event_id, task_input=task_input, timeout=timeout) self.heartbeat = heartbeat @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) if "heartbeatInSeconds" in details: kwargs["heartbeat"] = details["heartbeatInSeconds"] return args, kwargs, details
[docs]class ActivityStarted(Event): """An execution history activity task-start event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event worker_name: name of activity worker executing activity task previous_event_id: identifying index of causal event """ def __init__( self, timestamp, event_type, event_id, worker_name: str, previous_event_id=None): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.worker_name = worker_name @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) args += (details["workerName"],) return args, kwargs, details @_util.cached_property def details_str(self): return "worker: %s" % self.worker_name
[docs]class ObjectSucceeded(Event): """An execution history succeed event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event previous_event_id: identifying index of causal event output: output of state/execution """ def __init__( self, timestamp, event_type, event_id, previous_event_id=None, output: _util.JSONable = _default): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.output = output @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) if "output" in details: kwargs["output"] = json.loads(details["output"]) return args, kwargs, details
[docs]class ExecutionStarted(Event): """An execution history execution-start event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event previous_event_id: identifying index of causal event execution_input: execution input role_arn: execution AWS IAM role ARN """ def __init__( self, timestamp, event_type, event_id, previous_event_id=None, execution_input: _util.JSONable = _default, role_arn: str = None): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.execution_input = execution_input self.role_arn = role_arn @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) if "input" in details: kwargs["execution_input"] = json.loads(details["input"]) if "roleArn" in details: kwargs["role_arn"] = details["roleArn"] return args, kwargs, details
[docs]class StateEntered(Event): """An execution history state-enter event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event state_name: state name previous_event_id: identifying index of causal event state_input: state input """ def __init__( self, timestamp, event_type, event_id, state_name: str, previous_event_id=None, state_input: _util.JSONable = _default): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.state_name = state_name self.state_input = state_input @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) args += (details["name"],) if "input" in details: kwargs["state_input"] = json.loads(details["input"]) return args, kwargs, details @_util.cached_property def details_str(self): return "name: %s" % self.state_name
[docs]class StateExited(Event): """An execution history state-exit event. Args: timestamp: event time-stamp event_type: type of event event_id: identifying index of event state_name: state name previous_event_id: identifying index of causal event output: state output """ def __init__( self, timestamp, event_type, event_id, state_name: str, previous_event_id=None, output: _util.JSONable = _default): super().__init__( timestamp, event_type, event_id, previous_event_id=previous_event_id) self.state_name = state_name self.output = output @classmethod def _get_args(cls, history_event): args, kwargs, details = super()._get_args(history_event) args += (details["name"],) if "output" in details: kwargs["output"] = json.loads(details["output"]) return args, kwargs, details @_util.cached_property def details_str(self): return "name: %s" % self.state_name
_type_classes = { "ActivityFailed": Failed, "ActivityScheduleFailed": Failed, "ActivityScheduled": ActivityScheduled, "ActivityStarted": ActivityStarted, "ActivitySucceeded": ObjectSucceeded, "ActivityTimedOut": Failed, "ChoiceStateEntered": StateEntered, "ChoiceStateExited": StateExited, "ExecutionFailed": Failed, "ExecutionStarted": ExecutionStarted, "ExecutionSucceeded": ObjectSucceeded, "ExecutionAborted": Failed, "ExecutionTimedOut": Failed, "FailStateEntered": StateEntered, "LambdaFunctionFailed": Failed, "LambdaFunctionScheduleFailed": Failed, "LambdaFunctionScheduled": LambdaFunctionScheduled, "LambdaFunctionStartFailed": Failed, "LambdaFunctionStarted": Event, "LambdaFunctionSucceeded": ObjectSucceeded, "LambdaFunctionTimedOut": Failed, "SucceedStateEntered": StateEntered, "SucceedStateExited": StateExited, "TaskStateAborted": Event, "TaskStateEntered": StateEntered, "TaskStateExited": StateExited, "PassStateEntered": StateEntered, "PassStateExited": StateExited, "ParallelStateAborted": Event, "ParallelStateEntered": StateEntered, "ParallelStateExited": StateExited, "ParallelStateFailed": Failed, "ParallelStateStarted": Event, "ParallelStateSucceeded": Event, "WaitStateAborted": Event, "WaitStateEntered": StateEntered, "WaitStateExited": StateExited}
[docs]def parse_history( history_events: T.List[T.Dict[str, _util.JSONable]] ) -> T.List[Event]: """List the execution history. Args: history_events: history events as provided by AWS API Returns: history of execution events """ events = [] for history_event in history_events: eclass = _type_classes[history_event["type"]] event = eclass.from_history_event(history_event) events.append(event) return events