Source code for sfini.state._state

# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2019/05/13

"""State definitions."""

import datetime
import typing as T
import logging as lg

from . import _base
from .. import _util

_logger = lg.getLogger(__name__)
_default = _util.DefaultParameter()


[docs]class Succeed(_base.State): """End execution successfully. Args: name: name of state comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output """ pass
[docs]class Fail(_base.State): """End execution unsuccessfully. Args: name: name of state comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output error: error type cause: failure description """ def __init__( self, name, comment=_default, input_path=_default, output_path=_default, cause: str = _default, error: str = _default): super().__init__( name, comment=comment, input_path=input_path, output_path=output_path) self.error = error self.cause = cause
[docs] def to_dict(self): defn = super().to_dict() if self.cause != _default: defn["Cause"] = self.cause if self.error != _default: defn["Error"] = self.error return defn
[docs]class Pass(_base.HasResultPath, _base.HasNext, _base.State): """No-op state, possibly introducing data. The name specifies the location of any introduced data. Args: name: name of state comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output result_path: task output location JSONPath, ``None`` for discarded output result: return value of state, stored in the variable ``name`` Attributes: next: next state to execute """ def __init__( self, name, comment=_default, input_path=_default, output_path=_default, result_path=_default, result: _util.JSONable = _default): super().__init__( name, comment=comment, input_path=input_path, output_path=output_path, result_path=result_path) self.result = result
[docs] def to_dict(self): defn = super().to_dict() if self.result != _default: defn["Result"] = self.result return defn
[docs]class Wait(_base.HasNext, _base.State): """Wait for a time before continuing. Args: name: name of state until: time to wait. If ``int``, then seconds to wait; if ``datetime.datetime``, then time to wait until; if ``str``, then name of state-variable containing time to wait until comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output Attributes: next: next state to execute """ def __init__( self, name, until: T.Union[int, datetime.datetime, str], comment=_default, input_path=_default, output_path=_default): super().__init__( name, comment=comment, input_path=input_path, output_path=output_path) self.until = until
[docs] def to_dict(self): defn = super().to_dict() if isinstance(self.until, int): defn["Seconds"] = self.until elif isinstance(self.until, datetime.datetime): tzinfo = self.until.tzinfo if tzinfo is None or tzinfo.utcoffset(self.until) is None: raise ValueError("Wait time must be aware") defn["Timestamp"] = self.until.isoformat("T") elif isinstance(self.until, str): defn["TimestampPath"] = self.until else: fmt = "Invalid type for wait time: %s" raise TypeError(fmt % type(self.until)) return defn
[docs]class Parallel( _base.HasResultPath, _base.HasNext, _base.CanRetry, _base.CanCatch, _base.State): """Run states-machines in parallel. Args: name: name of state comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output result_path: task output location JSONPath, ``None`` for discarded output Attributes: branches (list[sfini.StateMachine]): state-machines to run in parallel. These state-machines do not need to be registered with AWS Step Functions. next: next state to execute retriers: retry conditions catchers: handled state errors """ def __init__( self, name, comment=_default, input_path=_default, output_path=_default, result_path=_default): super().__init__( name, comment=comment, input_path=input_path, output_path=output_path, result_path=result_path) self.branches = []
[docs] def add(self, state_machine): """Add a state-machine to be executed. The input to the state-machine execution is the input into this parallel state. The output of the parallel state is a list of each state-machine's output (in order of adding). Args: state_machine (sfini.StateMachine): state-machine to add. It will be run when this task is executed. Added state-machines do not need to be registered with AWS Step Functions """ self.branches.append(state_machine)
[docs] def to_dict(self): defn = super().to_dict() defn["Branches"] = [sm.to_dict() for sm in self.branches] return defn
[docs]class Choice(_base.State): """Branch execution based on comparisons. Args: name: name of state comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output Attributes: choices (list[sfini.choice.ChoiceRule]): choice rules determining branch conditions default: fall-back state if all comparisons fail, or ``None`` for no fall-back (Step Functions will raise a 'States.NoChoiceMatched' error) """ def __init__( self, name, comment=_default, input_path=_default, output_path=_default): super().__init__( name, comment=comment, input_path=input_path, output_path=output_path) self.choices = [] self.default: T.Union[_base.State, None] = None
[docs] def add_to(self, states): super().add_to(states) for rule in self.choices: if rule.next_state.name not in states: rule.next_state.add_to(states) if self.default is not None and self.default.name not in states: self.default.add_to(states)
[docs] def add(self, rule): """Add a choice-rule. Args: rule (sfini.choice.ChoiceRule): branch execution condition and specification to add Raises: RuntimeError: rule doesn't specify next-state """ if rule.next_state is None: msg = "Top-level choice rule '%s' must specify next state" raise RuntimeError(msg % rule) self.choices.append(rule)
[docs] def remove(self, rule): """Remove a branch. Args: rule (sfini.choice.ChoiceRule): branch execution condition and specification to remove Raises: ValueError: if rule is not a registered branch """ if rule not in self.choices: fmt = "Rule '%s' is not registered with this state" raise ValueError(fmt % rule) self.choices.remove(rule)
[docs] def set_default(self, state: _base.State): """Set the default state to execute when no conditions were met. Args: state: default state to execute """ if self.default is not None: fmt = "Overwriting current default state '%s' with '%s'" _logger.warning(fmt % (self.default, state)) self.default = state
[docs] def to_dict(self): if not self.choices and self.default is None: raise RuntimeError("Choice '%s' has no next path") defn = super().to_dict() defn["Choices"] = [cr.to_dict() for cr in self.choices] if self.default is not None: defn["Default"] = self.default.name return defn
[docs]class Task( _base.HasResultPath, _base.HasNext, _base.CanRetry, _base.CanCatch, _base.State): """Activity execution. Args: name: name of state resource (sfini.task_resource.TaskResource): task executor, eg activity or Lambda function comment: state description input_path: state input filter JSONPath, ``None`` for empty input output_path: state output filter JSONPath, ``None`` for discarded output result_path: task output location JSONPath, ``None`` for discarded output timeout: seconds before task time-out Attributes: next: next state to execute retriers: retry conditions catchers: handled state errors """ _heartbeat_extra = 5 def __init__( self, name, resource, comment=_default, input_path=_default, output_path=_default, result_path=_default, timeout: int = _default): super().__init__( name, comment=comment, input_path=input_path, output_path=output_path, result_path=result_path) self.resource = resource self.timeout = timeout
[docs] def to_dict(self): defn = super().to_dict() defn["Resource"] = self.resource.arn if self.timeout != _default: defn["TimeoutSeconds"] = self.timeout if hasattr(self.resource, "heartbeat"): _h = self._heartbeat_extra defn["HeartbeatSeconds"] = self.resource.heartbeat + _h return defn