Source code for sfini.task_resource

# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2019/05/09

"""Task resource interfacing.

'Task' states require some executor to implement the task, which
different AWS services can provide, including Step Functions activities
and Lambda functions.
"""

import logging as lg

from . import _util

_logger = lg.getLogger(__name__)


[docs]class TaskResource: """Task execution. An instance of this represents a service which can run tasks defined in a state-machine. Args: name: name of resource session: session to use for AWS communication Attributes: service: resource type """ service: str = None def __init__(self, name: str, *, session: _util.AWSSession = None): self.name = name self.session = session def __str__(self): return "%s [%s]" % (self.name, self.service) __repr__ = _util.easy_repr @_util.cached_property def arn(self) -> str: """Task resource generated ARN.""" region = self.session.region account = self.session.account_id fmt = "arn:aws:states:%s:%s:%s:%s" return fmt % (region, account, self.service, self.name)
[docs]class Lambda(TaskResource): """AWS Lambda function executor for a task. Args: name: name of Lambda function session: session to use for AWS communication """ service = "function" @_util.cached_property def arn(self): arn_split = super().arn.split(":") arn_split[2] = "lambda" return ":".join(arn_split)