sfini package

Module contents

AWS Step Functions service.

class sfini.AWSSession(session: boto3.session.Session = None)[source]

Bases: object

AWS session, for preconfigure communication with AWS.

Parameters:session – session to use
account_id

Session’s account’s account ID.

credentials

AWS session credentials.

region

Session AWS region.

sfn

Step Functions client.

class sfini.Activity(name: str, *, session: sfini._util.AWSSession = None)[source]

Bases: sfini.task_resource.TaskResource

Activity execution.

Note that activity names must be unique (within a region). It’s recommended to put your code’s title and version in the activity name. Activities makes this straight-forward.

An activity is attached to state-machine tasks, and is called when that task is executed. A worker registers itself able to run some activities using their names.

Parameters:
  • name – name of activity
  • session – session to use for AWS communication
deregister()[source]

Remove activity from AWS SFN.

is_registered() → bool[source]

See if this activity is registered with AWS SFN.

Returns:if this activity is registered
register()[source]

Register activity with AWS SFN.

service = 'activity'
class sfini.ActivityRegistration(prefix: str = '', *, session: sfini._util.AWSSession = None)[source]

Bases: object

Activities registration.

Provides convenience for grouping activities, generating activity names, bulk-registering activities, and activity function decoration.

An activity is attached to state-machine tasks, and is called when that task is executed. A worker registers itself able to run an activity using the registered activity name.

Parameters:
  • prefix – prefix for activity names
  • session – session to use for AWS communication
activities

registered activities

Example

>>> activities = ActivityRegistration(prefix="foo")
>>> @activities.activity(name="MyActivity")
>>> def fn(data):
...     print("hi")
>>> print(fn.name)
fooMyActivity
activity(name: str = None, heartbeat: int = 20) → Callable[[Callable], sfini.activity.CallableActivity][source]

Activity function decorator.

The decorated function will be passed one argument: the input to the task state that executes the activity.

Parameters:
  • name – name of activity, default: function name
  • heartbeat – seconds between heartbeat during activity running
add_activity(activity: sfini.activity.Activity)[source]

Add an activity to the group.

Parameters:activity – activity to add
Raises:ValueError – if activity name already in-use in group
deregister()[source]

Remove activities in AWS SFN.

register()[source]

Add registered activities to AWS SFN.

smart_activity(name: str = None, heartbeat: int = 20) → Callable[[Callable], sfini.activity.SmartCallableActivity][source]

Smart activity function decorator.

The decorated function will be passed values to its parameters from the input to the task state that executes the activity.

Parameters:
  • name – name of activity, default: function name
  • heartbeat – seconds between heartbeat during activity running
class sfini.CLI(state_machine=None, activities=None, role_arn: str = None, version: str = None, prog: str = None)[source]

Bases: object

sfini command-line interface.

Parameters:
  • state_machine (sfini.StateMachine) – state-machine interact with
  • activities (sfini.ActivityRegistration) – activities to poll for
  • role_arn – AWS ARN for state-machine IAM role
  • version – version to display, default: no version display
  • prog – program name displayed in program help, default: sys.argv[0]
parse_args()[source]

Parse command-line arguments and run CLI.

class sfini.Lambda(name: str, *, session: sfini._util.AWSSession = None)[source]

Bases: sfini.task_resource.TaskResource

AWS Lambda function executor for a task.

Parameters:
  • name – name of Lambda function
  • session – session to use for AWS communication
arn

Task resource generated ARN.

service = 'function'
sfini.construct_state_machine(name: str, start_state: sfini.state._base.State, comment: str = DefaultParameter(), timeout: int = DefaultParameter(), *, session: sfini._util.AWSSession = None) → sfini.state_machine.StateMachine[source]

Construct a state-machine from the starting state.

Make sure to construct the state-machine after all states have been defined: subsequent states will need to be added to the state-machine manually.

Only states referenced by the provided first state (and their children) will be in the state-machine definition. Add states via an impossible choice rule to include them in the definition.

Parameters:
  • name – name of state-machine
  • start_state – starting state of state-machine
  • comment – description of state-maching
  • timeout – execution time-out (seconds)
  • session – session to use for AWS communication
Returns:

constructed state-machine

class sfini.Worker(activity, name: str = None, *, session: sfini._util.AWSSession = None)[source]

Bases: object

Worker to poll for activity task executions.

Parameters:
  • activity (sfini.activity.CallableActivity) – activity to poll and run executions of
  • name – name of worker, used for identification, default: a combination of UUID and host’s FQDN
  • session – session to use for AWS communication
end()[source]

End polling.

join()[source]

Block until polling exit.

run()[source]

Run worker to poll for and execute specified tasks.

start()[source]

Start polling.

exception sfini.WorkerCancel(*args, **kwargs)[source]

Bases: KeyboardInterrupt

Workflow execution interrupted by user.