# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2018/07/12
"""Activity interfacing.
Activities are separate from state-machines, and are used as
implementations of 'Task' states. Activities are registered separately.
"""
import inspect
import typing as T
import logging as lg
import functools as ft
from . import _util
from . import task_resource as sfini_task_resource
_logger = lg.getLogger(__name__)
[docs]class Activity(sfini_task_resource.TaskResource):
"""Activity execution.
Note that activity names must be unique (within a region). It's
recommended to put your code's title and version in the activity name.
``Activities`` makes this straight-forward.
An activity is attached to state-machine tasks, and is called when that
task is executed. A worker registers itself able to run some
activities using their names.
Args:
name: name of activity
session: session to use for AWS communication
"""
service = "activity"
[docs] def register(self):
"""Register activity with AWS SFN."""
_logger.debug("Registering activity '%s' on SFN" % self)
_util.assert_valid_name(self.name)
resp = self.session.sfn.create_activity(name=self.name)
assert resp["activityArn"] == self.arn
fmt = "Activity '%s' registered with ARN '%s' at %s"
_logger.info(fmt % (self, self.arn, resp["creationDate"]))
[docs] def is_registered(self) -> bool:
"""See if this activity is registered with AWS SFN.
Returns:
if this activity is registered
"""
_logger.debug("Testing for registration of '%s' on SFN" % self)
resp = _util.collect_paginated(self.session.sfn.list_activities)
arns = {sm["activityArn"] for sm in resp["activities"]}
return self.arn in arns
[docs] def deregister(self):
"""Remove activity from AWS SFN."""
_logger.info("Deleting activity '%s' from SFN" % self)
self.session.sfn.delete_activity(activityArn=self.arn)
[docs]class CallableActivity(Activity):
"""Activity execution defined by a callable.
Note that activity names must be unique (within a region). It's
recommended to put your application's name and version in the activity
name. ``ActivityRegistration`` makes this straight-forward.
An activity is attached to state-machine tasks, and is called when that
task is executed. A worker registers itself able to run an activity
using the registered activity name.
Args:
name: name of activity
fn: function to run activity
heartbeat: seconds between heartbeat during activity running
session: session to use for AWS communication
"""
def __init__(self, name, fn: T.Callable, heartbeat=20, *, session=None):
super().__init__(name, session=session)
self.fn = fn
self.heartbeat = heartbeat
def __call__(self, task_input: _util.JSONable, *args, **kwargs):
return self.fn(task_input, *args, **kwargs)
[docs] @classmethod
def decorate(
cls,
name: str,
heartbeat: int = 20,
*,
session: _util.AWSSession = None
) -> T.Callable[[T.Callable], "CallableActivity"]:
"""Decorate a callable as an activity implementation.
Args:
name: name of activity
heartbeat: seconds between heartbeat during activity running
session: session to use for AWS communication
"""
def wrap(fn: T.Callable):
activity = cls(name, fn, heartbeat=heartbeat, session=session)
return ft.update_wrapper(activity, fn)
return wrap
[docs] def call_with(self, task_input: _util.JSONable) -> _util.JSONable:
"""Call with task-input context.
Args:
task_input: task input
Returns:
function return-value
"""
return self.fn(task_input)
[docs]class SmartCallableActivity(CallableActivity):
"""Activity execution defined by a callable, processing input.
The arguments to ``fn`` are extracted from the input provided by AWS
Step Functions.
Note that activity names must be unique (within a region). It's
recommended to put your application's name and version in the activity
name. ``ActivityRegistration`` makes this straight-forward.
An activity is attached to state-machine tasks, and is called when that
task is executed. A worker registers itself able to run an activity
using the registered activity name.
Args:
name: name of activity
fn: function to run activity
heartbeat: seconds between heartbeat during activity running
session: session to use for AWS communication
Attributes:
sig: function signature
"""
def __init__(self, name, fn: T.Callable, heartbeat=20, *, session=None):
super().__init__(name, fn, heartbeat=heartbeat, session=session)
self.sig: inspect.Signature = inspect.Signature.from_callable(fn)
def __call__(self, *args, **kwargs):
return self.fn(*args, **kwargs)
def _get_input_from(
self,
task_input: T.Dict[str, _util.JSONable]
) -> T.Dict[str, _util.JSONable]:
"""Parse task input for execution input.
Does not perform input validation: ``fn(**kwargs)`` in
``call_with`` will do that anyway.
Args:
task_input: task input
Returns:
activity input
"""
kinds = {n: p.kind for n, p in self.sig.parameters.items()}
if any(k == inspect.Parameter.VAR_KEYWORD for k in kinds.values()):
return task_input
var_pos = inspect.Parameter.VAR_POSITIONAL
kwargs = {}
for name, arg_val in task_input.items():
if kinds.get(name, var_pos) != var_pos:
kwargs[name] = arg_val
return kwargs
[docs] def call_with(self, task_input: T.Dict[str, _util.JSONable]):
kwargs = self._get_input_from(task_input)
return self.fn(**kwargs)
[docs]class ActivityRegistration:
"""Activities registration.
Provides convenience for grouping activities, generating activity
names, bulk-registering activities, and activity function decoration.
An activity is attached to state-machine tasks, and is called when that
task is executed. A worker registers itself able to run an activity
using the registered activity name.
Args:
prefix: prefix for activity names
session: session to use for AWS communication
Attributes:
activities: registered activities
Example:
>>> activities = ActivityRegistration(prefix="foo")
>>> @activities.activity(name="MyActivity")
>>> def fn(data):
... print("hi")
>>> print(fn.name)
fooMyActivity
"""
_activity_class = CallableActivity
_smart_activity_class = SmartCallableActivity
def __init__(self, prefix: str = "", *, session: _util.AWSSession = None):
self.prefix = prefix
self.session = session or _util.AWSSession()
self.activities: T.Dict[str, Activity] = {}
def __str__(self):
return "'%s' activities" % self.prefix
__repr__ = _util.easy_repr
[docs] def add_activity(self, activity: Activity):
"""Add an activity to the group.
Args:
activity: activity to add
Raises:
ValueError: if activity name already in-use in group
"""
if activity.name in self.activities:
raise ValueError("Activity '%s' already in group" % activity.name)
self.activities[activity.name] = activity
def _activity(
self,
activity_cls: T.Type[CallableActivity],
name: str = None,
heartbeat: int = 20
) -> T.Callable[[T.Callable], CallableActivity]:
"""Activity function decorator.
Args:
activity_cls: activity class
name: name of activity, default: function name
heartbeat: seconds between heartbeat during activity running
"""
def wrap(fn):
suff = fn.__name__ if name is None else name
activity = activity_cls.decorate(
self.prefix + suff,
heartbeat=heartbeat,
session=self.session)(fn)
self.add_activity(activity)
return activity
return wrap
[docs] def activity(
self,
name: str = None,
heartbeat: int = 20
) -> T.Callable[[T.Callable], CallableActivity]:
"""Activity function decorator.
The decorated function will be passed one argument: the input to
the task state that executes the activity.
Args:
name: name of activity, default: function name
heartbeat: seconds between heartbeat during activity running
"""
return self._activity(
self._activity_class,
name=name,
heartbeat=heartbeat)
[docs] def smart_activity(
self,
name: str = None,
heartbeat: int = 20
) -> T.Callable[[T.Callable], SmartCallableActivity]:
"""Smart activity function decorator.
The decorated function will be passed values to its parameters
from the input to the task state that executes the activity.
Args:
name: name of activity, default: function name
heartbeat: seconds between heartbeat during activity running
"""
return self._activity(
self._smart_activity_class,
name=name,
heartbeat=heartbeat)
[docs] def register(self):
"""Add registered activities to AWS SFN."""
for activity in self.activities.values():
activity.register()
def _list_activities(self) -> T.List[T.Tuple[str, str, str]]:
"""List activities in SFN."""
resp = _util.collect_paginated(self.session.sfn.list_activities)
acts = []
for act in resp["activities"]:
prefix = act["name"][:len(self.prefix)]
if prefix != self.prefix and act["name"] not in self.activities:
continue
acts.append((act["name"], act["activityArn"], act["creationDate"]))
return acts
def _deregister_activities(
self,
activity_items: T.Sequence[T.Tuple[str, str, str]]):
"""Deregister activities."""
_logger.info("Deregistering %d activities" % len(activity_items))
for act in activity_items:
_logger.debug("Deregistering '%s'" % act[0])
self.session.sfn.delete_activity(activityArn=act[1])
[docs] def deregister(self):
"""Remove activities in AWS SFN."""
acts = self._list_activities()
self._deregister_activities(acts)