# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2020 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Here is a minimal example of usage:
.. code-block:: python
>>> from openquake.baselib import sap
>>> def fun(input, inplace, output=None, out='/tmp'):
... 'Example'
... for item in sorted(locals().items()):
... print('%s = %s' % item)
>>> p = sap.script(fun)
>>> p.arg('input', 'input file or archive')
>>> p.flg('inplace', 'convert inplace')
>>> p.arg('output', 'output archive')
>>> p.opt('out', 'optional output file')
>>> p.callfunc(['a'])
inplace = False
input = a
out = /tmp
output = None
>>> p.callfunc(['a', 'b', '-i', '-o', 'OUT'])
inplace = True
input = a
out = OUT
output = b
Parsers can be composed too.
"""
import sys
import inspect
import argparse
NODEFAULT = object()
registry = {} # dotname -> function
[docs]def get_parentparser(parser, description=None, help=True):
"""
:param parser: :class:`argparse.ArgumentParser` instance or None
:param description: string used to build a new parser if parser is None
:param help: flag used to build a new parser if parser is None
:returns: if parser is None the new parser; otherwise the `.parentparser`
attribute (if set) or the parser itself (if not set)
"""
if parser is None:
return argparse.ArgumentParser(
description=description, add_help=help)
elif hasattr(parser, 'parentparser'):
return parser.parentparser
else:
return parser
[docs]def str_choices(choices):
"""Returns {choice1, ..., choiceN} or the empty string"""
if choices:
return '{%s}' % ', '.join(choices)
return ''
[docs]class Script(object):
"""
A simple way to define command processors based on argparse.
Each parser is associated to a function and parsers can be
composed together, by dispatching on a given name (if not given,
the function name is used).
"""
# for instance {'openquake.commands.run': run, ...}
def __init__(self, func, name=None, parentparser=None,
help=True,):
self.func = func
self.name = name or func.__name__
args, self.varargs, varkw, defaults = inspect.getfullargspec(func)[:4]
assert self.varargs is None, self.varargs
defaults = defaults or ()
nodefaults = len(args) - len(defaults)
alldefaults = (NODEFAULT,) * nodefaults + defaults
self.argdict = dict(zip(args, alldefaults))
self.description = descr = func.__doc__ if func.__doc__ else None
self.parentparser = get_parentparser(parentparser, descr, help)
self.names = []
self.all_arguments = []
self._group = self.parentparser
self._argno = 0 # used in the NameError check in the _add method
self.checked = False # used in the check_arguments method
registry['%s.%s' % (func.__module__, func.__name__)] = self
[docs] def group(self, descr):
"""Added a new group of arguments with the given description"""
self._group = self.parentparser.add_argument_group(descr)
def _add(self, name, *args, **kw):
"""
Add an argument to the underlying parser and grow the list
.all_arguments and the set .names
"""
argname = list(self.argdict)[self._argno]
if argname != name:
raise NameError(
'Setting argument %s, but it should be %s' % (name, argname))
self._group.add_argument(*args, **kw)
self.all_arguments.append((args, kw))
self.names.append(name)
self._argno += 1
[docs] def arg(self, name, help, type=None, choices=None, metavar=None,
nargs=None):
"""Describe a positional argument"""
kw = dict(help=help, type=type, choices=choices, metavar=metavar,
nargs=nargs)
default = self.argdict[name]
if default is not NODEFAULT:
kw['nargs'] = nargs or '?'
kw['default'] = default
kw['help'] = kw['help'] + ' [default: %s]' % repr(default)
self._add(name, name, **kw)
[docs] def opt(self, name, help, abbrev=None,
type=None, choices=None, metavar=None, nargs=None):
"""Describe an option"""
kw = dict(help=help, type=type, choices=choices, metavar=metavar,
nargs=nargs)
default = self.argdict[name]
if default is not NODEFAULT:
kw['default'] = default
kw['metavar'] = metavar or str_choices(choices) or str(default)
abbrev = abbrev or '-' + name[0]
abbrevs = set(args[0] for args, kw in self.all_arguments)
longname = '--' + name.replace('_', '-')
if abbrev == '-h' or abbrev in abbrevs:
# avoid conflicts with predefined abbreviations
self._add(name, longname, **kw)
else:
self._add(name, abbrev, longname, **kw)
[docs] def flg(self, name, help, abbrev=None):
"""Describe a flag"""
abbrev = abbrev or '-' + name[0]
longname = '--' + name.replace('_', '-')
self._add(name, abbrev, longname, action='store_true', help=help)
[docs] def check_arguments(self):
"""Make sure all arguments have a specification"""
for name, default in self.argdict.items():
if name not in self.names and default is NODEFAULT:
raise NameError('Missing argparse specification for %r' % name)
[docs] def callfunc(self, argv=None):
"""
Parse the argv list and extract a dictionary of arguments which
is then passed to the function underlying the script.
"""
if not self.checked:
self.check_arguments()
self.checked = True
namespace = self.parentparser.parse_args(argv or sys.argv[1:])
return self.func(**vars(namespace))
[docs] def help(self):
"""
Return the help message as a string
"""
return self.parentparser.format_help()
def __repr__(self):
args = ', '.join(self.names)
return '<%s %s(%s)>' % (self.__class__.__name__, self.name, args)
[docs]def script(func):
s = Script(func)
func.arg = s.arg
func.opt = s.opt
func.flg = s.flg
func.group = s.group
func._add = s._add
func.callfunc = s.callfunc
return func
[docs]def compose(scripts, name='main', description=None, prog=None,
version=None):
"""
Collects together different scripts and builds a single
script dispatching to the subparsers depending on
the first argument, i.e. the name of the subparser to invoke.
:param scripts: a list of script instances
:param name: the name of the composed parser
:param description: description of the composed parser
:param prog: name of the script printed in the usage message
:param version: version of the script printed with --version
"""
assert len(scripts) >= 1, scripts
parentparser = argparse.ArgumentParser(
description=description, add_help=False)
parentparser.add_argument(
'--version', '-v', action='version', version=version)
subparsers = parentparser.add_subparsers(
help='available subcommands; use %s help <subcmd>' % prog,
prog=prog)
def gethelp(cmd=None):
if cmd is None:
print(parentparser.format_help())
return
subp = subparsers._name_parser_map.get(cmd)
if subp is None:
print('No help for unknown command %r' % cmd)
else:
print(subp.format_help())
help_script = Script(gethelp, 'help', help=False)
progname = '%s ' % prog if prog else ''
help_script.arg('cmd', progname + 'subcommand')
for s in list(scripts) + [help_script]:
subp = subparsers.add_parser(s.name, description=s.description)
for args, kw in s.all_arguments:
subp.add_argument(*args, **kw)
subp.set_defaults(_func=s.func)
def main(**kw):
try:
func = kw.pop('_func')
except KeyError:
parentparser.print_usage()
else:
return func(**kw)
main.__name__ = name
return Script(main, name, parentparser)