# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2018/07/11
"""State-machine execution."""
import time
import json
import datetime
import typing as T
import logging as lg
from .. import _util
from . import history
_logger = lg.getLogger(__name__)
_default = _util.DefaultParameter()
[docs]class Execution:
"""A state-machine execution.
Args:
name: name of execution
state_machine_arn: executed state-machine ARN
execution_input: execution input (must be JSON-serialisable)
arn: execution ARN (if known: provided when execution is posted to
AWS SFN)
session: session to use for AWS communication
"""
_wait_sleep_time = 3.0
_not_provided = object()
def __init__(
self,
name: str,
state_machine_arn: str,
execution_input: _util.JSONable = _default,
arn: str = None,
*,
session: _util.AWSSession = None):
self.name = name
self.state_machine_arn = state_machine_arn
self.execution_input = execution_input
self.arn = arn
self.session = session or _util.AWSSession()
self._status = None
self._start_date = None
self._stop_date = None
self._output = _default
def __str__(self):
status_str = (" [%s]" % self._status) if self._status else ""
return "%s%s" % (self.name, status_str)
__repr__ = _util.easy_repr
[docs] @classmethod
def from_arn(
cls,
arn: str,
*,
session: _util.AWSSession = None
) -> "Execution":
"""Construct an ``Execution`` from an existing execution.
Queries AWS Step Functions for the execution with the given ARN
Args:
arn: existing execution ARN
session: session to use for AWS communication
Returns:
described execution
"""
session = session or _util.AWSSession()
resp = session.sfn.describe_execution(executionArn=arn)
assert resp["executionArn"] == arn
execution_input = _default
if "input" in resp:
execution_input = json.loads(resp["input"])
self = cls(
resp["name"],
resp["stateMachineArn"],
execution_input=execution_input,
arn=arn,
session=session)
self._status = resp["status"]
self._start_date = resp["startDate"]
self._stop_date = resp.get("stopDate")
if "output" in resp:
self._output = json.loads(resp["output"])
return self
[docs] @classmethod
def from_execution_list_item(
cls,
item: T.Dict[str, _util.JSONable],
*,
session: _util.AWSSession = None
) -> "Execution":
"""Construct an ``Execution`` from a response list-item.
Args:
item: execution list item
session: session to use for AWS communication
Returns:
described execution
"""
self = cls(
item["name"],
item["stateMachineArn"],
cls._not_provided,
arn=item["executionArn"],
session=session)
self._status = item["status"]
self._start_date = item["startDate"]
self._stop_date = item.get("stopDate")
return self
@property
def status(self) -> str:
"""Execution status."""
if self._status in (None, "RUNNING"):
self._update()
return self._status
@property
def start_date(self) -> datetime.datetime:
"""Execution start time."""
if self._start_date is None:
self._update()
return self._start_date
@property
def stop_date(self) -> datetime.datetime:
"""Execution stop time.
Raises:
RuntimeError: if execution is not yet finished
"""
if self._stop_date is None:
self._update()
self._raise_unfinished()
return self._stop_date
@property
def output(self) -> _util.JSONable:
"""Output of execution.
Raises:
RuntimeError: if execution is not yet finished, or execution
failed
"""
if self._output == _default:
self._update()
self._raise_unfinished()
self._raise_on_failure()
return self._output
def _update(self):
"""Update execution information from AWS.
Raises:
RuntimeError: if execution ARN is not known (must be provided)
"""
status_known = self._status not in (None, "RUNNING")
input_known = self.execution_input != self._not_provided
if status_known and input_known:
_logger.debug("Execution finished: update is unnecessary")
return
self._raise_no_arn()
resp = self.session.sfn.describe_execution(executionArn=self.arn)
assert resp["executionArn"] == self.arn
self._status = resp["status"]
self._start_date = resp["startDate"]
self._stop_date = resp.get("stopDate")
if "input" in resp:
input_ = json.loads(resp["input"])
if self.execution_input == self._not_provided:
self.execution_input = input_
else:
assert self.execution_input == input_
if "output" in resp:
self._output = json.loads(resp["output"])
def _raise_on_failure(self):
"""Raise ``RuntimeError`` on execution failure."""
failed = self._status not in ("RUNNING", "SUCCEEDED")
if failed:
raise RuntimeError("Execution '%s' %s" % (self, self._status))
def _raise_unfinished(self):
"""Raise ``RuntimeError`` when requiring execution to be finished."""
if self._status == "RUNNING":
raise RuntimeError("Execution '%s' not yet finished" % self)
def _raise_no_arn(self):
"""Raise ``RuntimeError`` when ARN is not provided."""
if self.arn is None:
raise RuntimeError("Execution '%s' ARN is unknown" % self)
[docs] def start(self):
"""Start this state-machine execution.
Sets the ``arn`` attribute.
"""
_util.assert_valid_name(self.name)
input_ = self.execution_input
if input_ == _default:
input_ = {}
resp = self.session.sfn.start_execution(
stateMachineArn=self.state_machine_arn,
name=self.name,
input=json.dumps(input_))
self.arn = resp["executionArn"]
self._status = "RUNNING"
self._start_date = resp["startDate"]
[docs] def wait(self, raise_on_failure: bool = True, timeout: float = None):
"""Wait for execution to finish.
Args:
raise_on_failure: raise error when execution fails
timeout: time to wait for execution to finish (seconds),
default: no time-out
Raises:
RuntimeError: if execution fails, or if time-out is reached
before execution finishes
"""
t = time.time()
while True:
self._update()
if self._status != "RUNNING":
break
if timeout is not None and time.time() - t > timeout:
raise RuntimeError("Time-out waiting on execution '%s'" % self)
time.sleep(self._wait_sleep_time)
if raise_on_failure:
self._raise_on_failure()
[docs] def stop(self, error_code: str = _default, details: str = _default):
"""Stop an existing execution.
Args:
error_code: stop reason identification
details: stop reason
Raises:
RuntimeError: if execution is already finished
"""
kw = {}
if error_code != _default:
kw["error"] = error_code
if details != _default:
kw["cause"] = details
self._raise_no_arn()
resp = self.session.sfn.stop_execution(executionArn=self.arn, **kw)
self._stop_date = resp["stopDate"]
_logger.info("Execution stopped on %s" % resp["stopDate"])
[docs] def get_history(self) -> T.List[history.Event]:
"""List the execution history.
Returns:
history of execution events
"""
self._raise_no_arn()
resp = _util.collect_paginated(
self.session.sfn.get_execution_history,
executionArn=self.arn)
return history.parse_history(resp["events"])
[docs] def format_history(self) -> str:
"""Format the execution history for printing.
Returns:
history formatted
"""
events = self.get_history()
lines = []
for event in events:
ds = event.details_str
line = ("%s:\n %s" % (event, ds)) if ds else str(event)
lines.append(line)
self._update()
if self._output != _default:
line = "Output: %s" % json.dumps(self._output)
lines.append(line)
return "\n".join(lines)