Source code for sfini.state_machine

# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2018/07/11

"""State-machine interfacing.

A state-machine defines the logic for a workflow of an application. It
is comprised of states (ie stages), and executions of which will run
the workflow over some given data.
"""

import json
import uuid
import datetime
import typing as T
import logging as lg

from . import _util
from . import execution as sfini_execution
from . import state as sfini_state

_logger = lg.getLogger(__name__)
_default = _util.DefaultParameter()


[docs]class StateMachine: """State machine structure for AWS Step Functions. Args: name: name of state-machine states: state-machine states start_state: name of start state comment: description of state-maching timeout: execution time-out (seconds) session: session to use for AWS communication """ _execution_class = sfini_execution.Execution def __init__( self, name: str, states: T.Dict[str, sfini_state.State], start_state: str, comment: str = _default, timeout: int = _default, *, session: _util.AWSSession = None): self.name = name self.states = states self.start_state = start_state self.comment = comment self.timeout = timeout self.session = session or _util.AWSSession() def __str__(self): return "'%s' (%d states)" % (self.name, len(self.states)) __repr__ = _util.easy_repr @_util.cached_property def arn(self) -> str: """State-machine generated ARN.""" region = self.session.region account = self.session.account_id fmt = "arn:aws:states:%s:%s:stateMachine:%s" return fmt % (region, account, self.name) @_util.cached_property def default_role_arn(self) -> str: """``sfini``-generated state-machine IAM role ARN.""" return "arn:aws:iam::%s:role/sfiniGenerated" % self.session.account_id
[docs] def to_dict(self) -> T.Dict[str, _util.JSONable]: """Convert this state-machine to a definition dictionary. Returns: definition """ _logger.debug("Converting '%s' to dictionary" % self) state_defns = {n: s.to_dict() for n, s in self.states.items()} defn = {"StartAt": self.start_state, "States": state_defns} if self.comment != _default: defn["Comment"] = self.comment if self.timeout != _default: defn["TimeoutSeconds"] = self.timeout return defn
[docs] def is_registered(self) -> bool: """See if this state-machine is registered with AWS SFN. Returns: if this state-machine is registered """ _logger.debug("Testing for registration of '%s' on SFN" % self) resp = _util.collect_paginated(self.session.sfn.list_state_machines) arns = {sm["stateMachineArn"] for sm in resp["stateMachines"]} return self.arn in arns
def _sfn_create(self, role_arn: str): """Create this state-machine in AWS SFN. Args: role_arn: state-machine IAM role ARN """ _logger.info("Creating '%s' on SFN" % self) resp = self.session.sfn.create_state_machine( name=self.name, definition=json.dumps(self.to_dict(), indent=4), roleArn=role_arn) assert resp["stateMachineArn"] == self.arn fmt = "State-machine '%s' registered with ARN '%s' at %s" _logger.info(fmt % (self, self.arn, resp["creationDate"])) def _sfn_update(self, role_arn: str = None): """Update this state-machine in AWS SFN. Args: role_arn: state-machine IAM role ARN to update to """ _logger.info("Updating '%s' on SFN" % self) kwargs = {} if role_arn is None else {"roleArn": role_arn} resp = self.session.sfn.update_state_machine( definition=json.dumps(self.to_dict(), indent=4), stateMachineArn=self.arn, **kwargs) _logger.info("'%s' updated at %s" % (self, resp["updateDate"]))
[docs] def register(self, role_arn: str = None, allow_update: bool = False): """Register state-machine with AWS SFN. Args: role_arn: state-machine IAM role ARN allow_update: allow overwriting of an existing state-machine with the same name """ _util.assert_valid_name(self.name) if allow_update and self.is_registered(): self._sfn_update(role_arn) else: self._sfn_create(role_arn or self.default_role_arn)
[docs] def deregister(self): """Remove state-machine from AWS SFN.""" _logger.info("Deleting state-machine '%s' from SFN" % self) self.session.sfn.delete_state_machine(stateMachineArn=self.arn)
[docs] def start_execution( self, execution_input: _util.JSONable ) -> _execution_class: """Start an execution. Args: execution_input: input to first state in state-machine Returns: started execution """ fmt = "Starting execution of '%s' with: %s" _logger.info(fmt % (self, execution_input)) _now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") name = self.name + "_" + _now + "_" + str(uuid.uuid4())[:8] execution = self._execution_class( name, self.arn, execution_input, session=self.session) execution.start() return execution
def _build_executions( self, items: T.List[T.Dict[str, _util.JSONable]] ) -> T.List[_execution_class]: """Build executions from response list-items. This state-machine is manually attached to the ``state_machine`` attribute of the resultant executions here. Args: items: execution list-items Returns: constructed executions """ executions = [] for item in items: assert item["stateMachineArn"] == self.arn execution = self._execution_class.from_execution_list_item( item, session=self.session) execution.state_machine = self executions.append(execution) fmt = "Found execution '%s' with stop-date: %s" _logger.debug(fmt % (execution, item.get("stopDate"))) return executions
[docs] def list_executions(self, status: str = None) -> T.List[_execution_class]: """List all executions of this state-machine. This state-machine is manually attached to the ``state_machine`` attribute of the resultant executions here. Args: status: only list executions with this status. Choose from 'RUNNING', 'SUCCEEDED', 'FAILED', 'TIMED_OUT' or 'ABORTED' Returns: executions of this state-machine """ _s = " with status '%s'" % status if status else "" _logger.info("Listing executions of '%s'" % self + _s) kwargs = {"stateMachineArn": self.arn} if status is not None: kwargs["statusFilter"] = status fn = self.session.sfn.list_executions resp = _util.collect_paginated(fn, **kwargs) return self._build_executions(resp["executions"])
[docs]def construct_state_machine( name: str, start_state: sfini_state.State, comment: str = _default, timeout: int = _default, *, session: _util.AWSSession = None ) -> StateMachine: """Construct a state-machine from the starting state. Make sure to construct the state-machine after all states have been defined: subsequent states will need to be added to the state-machine manually. Only states referenced by the provided first state (and their children) will be in the state-machine definition. Add states via an impossible choice rule to include them in the definition. Args: name: name of state-machine start_state: starting state of state-machine comment: description of state-maching timeout: execution time-out (seconds) session: session to use for AWS communication Returns: constructed state-machine """ states = {} start_state.add_to(states) return StateMachine( name, states, start_state.name, comment=comment, timeout=timeout, session=session)