sfini.activity module

Activity interfacing.

Activities are separate from state-machines, and are used as implementations of ‘Task’ states. Activities are registered separately.

class sfini.activity.Activity(name: str, *, session: sfini._util.AWSSession = None)[source]

Bases: sfini.task_resource.TaskResource

Activity execution.

Note that activity names must be unique (within a region). It’s recommended to put your code’s title and version in the activity name. Activities makes this straight-forward.

An activity is attached to state-machine tasks, and is called when that task is executed. A worker registers itself able to run some activities using their names.

Parameters:
  • name – name of activity
  • session – session to use for AWS communication
deregister()[source]

Remove activity from AWS SFN.

is_registered() → bool[source]

See if this activity is registered with AWS SFN.

Returns:if this activity is registered
register()[source]

Register activity with AWS SFN.

service = 'activity'
class sfini.activity.ActivityRegistration(prefix: str = '', *, session: sfini._util.AWSSession = None)[source]

Bases: object

Activities registration.

Provides convenience for grouping activities, generating activity names, bulk-registering activities, and activity function decoration.

An activity is attached to state-machine tasks, and is called when that task is executed. A worker registers itself able to run an activity using the registered activity name.

Parameters:
  • prefix – prefix for activity names
  • session – session to use for AWS communication
activities

registered activities

Example

>>> activities = ActivityRegistration(prefix="foo")
>>> @activities.activity(name="MyActivity")
>>> def fn(data):
...     print("hi")
>>> print(fn.name)
fooMyActivity
activity(name: str = None, heartbeat: int = 20) → Callable[[Callable], sfini.activity.CallableActivity][source]

Activity function decorator.

The decorated function will be passed one argument: the input to the task state that executes the activity.

Parameters:
  • name – name of activity, default: function name
  • heartbeat – seconds between heartbeat during activity running
add_activity(activity: sfini.activity.Activity)[source]

Add an activity to the group.

Parameters:activity – activity to add
Raises:ValueError – if activity name already in-use in group
deregister()[source]

Remove activities in AWS SFN.

register()[source]

Add registered activities to AWS SFN.

smart_activity(name: str = None, heartbeat: int = 20) → Callable[[Callable], sfini.activity.SmartCallableActivity][source]

Smart activity function decorator.

The decorated function will be passed values to its parameters from the input to the task state that executes the activity.

Parameters:
  • name – name of activity, default: function name
  • heartbeat – seconds between heartbeat during activity running
class sfini.activity.CallableActivity(name, fn: Callable, heartbeat=20, *, session=None)[source]

Bases: sfini.activity.Activity

Activity execution defined by a callable.

Note that activity names must be unique (within a region). It’s recommended to put your application’s name and version in the activity name. ActivityRegistration makes this straight-forward.

An activity is attached to state-machine tasks, and is called when that task is executed. A worker registers itself able to run an activity using the registered activity name.

Parameters:
  • name – name of activity
  • fn – function to run activity
  • heartbeat – seconds between heartbeat during activity running
  • session – session to use for AWS communication
call_with(task_input: Union[None, bool, str, int, float, List[JSONable], Dict[str, JSONable]]) → Union[None, bool, str, int, float, List[JSONable], Dict[str, JSONable]][source]

Call with task-input context.

Parameters:task_input – task input
Returns:function return-value
classmethod decorate(name: str, heartbeat: int = 20, *, session: sfini._util.AWSSession = None) → Callable[[Callable], sfini.activity.CallableActivity][source]

Decorate a callable as an activity implementation.

Parameters:
  • name – name of activity
  • heartbeat – seconds between heartbeat during activity running
  • session – session to use for AWS communication
class sfini.activity.SmartCallableActivity(name, fn: Callable, heartbeat=20, *, session=None)[source]

Bases: sfini.activity.CallableActivity

Activity execution defined by a callable, processing input.

The arguments to fn are extracted from the input provided by AWS Step Functions.

Note that activity names must be unique (within a region). It’s recommended to put your application’s name and version in the activity name. ActivityRegistration makes this straight-forward.

An activity is attached to state-machine tasks, and is called when that task is executed. A worker registers itself able to run an activity using the registered activity name.

Parameters:
  • name – name of activity
  • fn – function to run activity
  • heartbeat – seconds between heartbeat during activity running
  • session – session to use for AWS communication
sig

function signature

call_with(task_input: Dict[str, Union[None, bool, str, int, float, List[JSONable], Dict[str, JSONable]]])[source]

Call with task-input context.

Parameters:task_input – task input
Returns:function return-value