Source code for sfini._cli

# --- 80 characters -----------------------------------------------------------
# Created by: Laurie 2018/08/06

"""SFN service CLI helper.

Use in your ``__main__`` module to provide a CLI to your service.
"""

import sys
import json
import pathlib
import argparse
import logging as lg

from . import _util
from . import worker as sfini_worker


[docs]class CLI: """``sfini`` command-line interface. Args: state_machine (sfini.StateMachine): state-machine interact with activities (sfini.ActivityRegistration): activities to poll for role_arn: AWS ARN for state-machine IAM role version: version to display, default: no version display prog : program name displayed in program help, default: ``sys.argv[0]`` """ _worker_class = sfini_worker.Worker _parser_class = argparse.ArgumentParser def __init__( self, state_machine=None, activities=None, role_arn: str = None, version: str = None, prog: str = None): self.state_machine = state_machine self.activities = activities self.role_arn = role_arn self.version = version self.prog = prog assert state_machine or activities __repr__ = _util.easy_repr def _build_parser(self) -> argparse.ArgumentParser: """Build argument parser. Returns: configured command-line argument parser """ d = None if self.state_machine and self.activities: fmt = "Control %s and %s" d = fmt % (self.state_machine, self.activities) elif self.state_machine: d = "Control %s" % self.state_machine elif self.activities: d = "Control %s" % self.activities parser = self._parser_class(description=d, prog=self.prog) if self.version: parser.add_argument( "-V", "--version", action="version", version=self.version) parser.add_argument( "-v", "--verbose", default=0, action="count", help="increase verbosity") parser.add_argument( "-q", "--quiet", default=0, action="count", help="decrease verbosity") subparsers = parser.add_subparsers( metavar="COMMAND", # help="description", dest="command") subparsers.required = True # Python 3.6 compatibility sma_str = { (True, True): "state-machine and/or activities", (True, False): "state-machine", (False, True): "activities"} sma_str = sma_str[bool(self.state_machine), bool(self.activities)] register_parser = subparsers.add_parser( "register", help="register %s with SFN" % sma_str, description="register %s with SFN" % sma_str) if self.state_machine: register_parser.add_argument( "-u", "--allow-update", action="store_true", help="allow updating of existing state-machine") if self.state_machine and self.activities: _g = register_parser.add_mutually_exclusive_group() _g.add_argument( "-s", "--state-machine-only", action="store_true", help="only register (or update) state-machine") _g.add_argument( "-a", "--activities-only", action="store_true", help="only register activities") deregister_parser = subparsers.add_parser( "deregister", help="deregister %s from SFN" % sma_str, description="deregister %s from SFN" % sma_str) if self.state_machine and self.activities: _g = deregister_parser.add_mutually_exclusive_group() _g.add_argument( "-s", "--state-machine-only", action="store_true", help="only deregister state-machine") _g.add_argument( "-a", "--activities-only", action="store_true", help="only deregister activities") if self.state_machine: start_parser = subparsers.add_parser( "start", help="start state-machine execution") start_parser.add_argument( "input_json", metavar="PATH", help="execution input JSON, use '-' for STDIN") start_parser.add_argument( "-w", "--wait", action="store_true", help="wait for execution to finish, and print output") if self.activities: worker_parser = subparsers.add_parser( "worker", help="run an activity worker", description="run an activity worker") worker_parser.add_argument( "activity_name", choices=self.activities.activities, metavar="NAME", help="name of activity to poll, choose from: %(choices)s)") if self.state_machine: executions_parser = subparsers.add_parser( "executions", help="list executions", description="list executions") choices = "RUNNING", "SUCCEEDED", "FAILED", "TIMED_OUT", "ABORTED" executions_parser.add_argument( "-s", "--status", default=None, metavar="STATUS", choices=choices, help="only list executions with this status") return parser def _register(self, args: argparse.Namespace): """Register state-machine and/or activities. Args: args: parsed command-line arguments """ if self.state_machine and self.activities: if args.state_machine_only: self.state_machine.register( self.role_arn, allow_update=args.allow_update) elif args.activities_only: self.activities.register() else: self.state_machine.register( self.role_arn, allow_update=args.allow_update) self.activities.register() elif self.state_machine: if not args.activities_only: self.state_machine.register( self.role_arn, allow_update=args.allow_update) elif self.activities: if not args.state_machine_only: self.activities.register() def _deregister(self, args: argparse.Namespace): """Deregister state-machine and/or activities. Args: args: parsed command-line arguments """ if self.state_machine and self.activities: if args.state_machine_only: self.state_machine.deregister() elif args.activities_only: self.activities.deregister() else: self.state_machine.deregister() self.activities.deregister() elif self.state_machine: if not args.activities_only: self.state_machine.deregister() elif self.activities: if not args.state_machine_only: self.activities.deregister() def _start(self, args: argparse.Namespace): """Start a state-machine execution. Args: args: parsed command-line arguments """ if args.input_json == "-": execution_input_str = sys.stdin.read() else: input_json = pathlib.Path(args.input_json) execution_input_str = input_json.read_text() execution_input = json.loads(execution_input_str) execution = self.state_machine.start_execution(execution_input) if args.wait: execution.wait() print(execution.output) def _worker(self, args: argparse.Namespace): """Run an activity worker. Args: args: parsed command-line arguments """ activity = self.activities.activities[args.activity_name] workers = self._worker_class(activity) workers.run() def _executions(self, args: argparse.Namespace): """List state-machine executions. Args: args: parsed command-line arguments """ execs = self.state_machine.list_executions(status=args.status) for execution in execs: print("\nExecution '%s':" % execution) print(execution.format_history()) def _delegate(self, args: argparse.Namespace): """Execute command. Args: args: parsed command-line arguments """ _lvl = max(lg.WARNING - 10 * (args.verbose - args.quiet), lg.DEBUG) _util.setup_logging(level=_lvl) command = { "register": self._register, "deregister": self._deregister, "start": self._start, "worker": self._worker, "executions": self._executions} command[args.command](args)
[docs] def parse_args(self): """Parse command-line arguments and run CLI.""" _util.setup_logging() parser = self._build_parser() args = parser.parse_args() self._delegate(args)