"""
Automatically package and test a Python project against configurable
Python2 and Python3 based virtual environments. Environments are
setup by using virtualenv. Configuration is generally done through an
INI-style "tox.ini" file.
"""
from __future__ import print_function
import os
import re
import shutil
import subprocess
import sys
import time
import py
from packaging.version import InvalidVersion, Version
import tox
from tox.config import parseconfig
from tox.result import ResultLog
from tox.venv import VirtualEnv
def prepare(args):
config = parseconfig(args)
if config.option.help:
show_help(config)
raise SystemExit(0)
elif config.option.helpini:
show_help_ini(config)
raise SystemExit(0)
return config
def cmdline(args=None):
if args is None:
args = sys.argv[1:]
main(args)
def main(args):
try:
config = prepare(args)
retcode = Session(config).runcommand()
if retcode is None:
retcode = 0
raise SystemExit(retcode)
except KeyboardInterrupt:
raise SystemExit(2)
except tox.exception.MinVersionError as e:
r = Reporter(None)
r.error(str(e))
raise SystemExit(1)
def show_help(config):
tw = py.io.TerminalWriter()
tw.write(config._parser._format_help())
tw.line()
tw.line("Environment variables", bold=True)
tw.line("TOXENV: comma separated list of environments (overridable by '-e')")
tw.line(
"TOX_TESTENV_PASSENV: space-separated list of extra environment variables to be "
"passed into test command environments"
)
def show_help_ini(config):
tw = py.io.TerminalWriter()
tw.sep("-", "per-testenv attributes")
for env_attr in config._testenv_attr:
tw.line(
"{:<15} {:<8} default: {}".format(
env_attr.name, "<" + env_attr.type + ">", env_attr.default
),
bold=True,
)
tw.line(env_attr.help)
tw.line()
class Action(object):
def __init__(self, session, venv, msg, args):
self.venv = venv
self.msg = msg
self.activity = msg.split(" ", 1)[0]
self.session = session
self.report = session.report
self.args = args
self.id = venv and venv.envconfig.envname or "tox"
self._popenlist = []
if self.venv:
self.venvname = self.venv.name
else:
self.venvname = "GLOB"
if msg == "runtests":
cat = "test"
else:
cat = "setup"
envlog = session.resultlog.get_envlog(self.venvname)
self.commandlog = envlog.get_commandlog(cat)
def __enter__(self):
self.report.logaction_start(self)
return self
def __exit__(self, *args):
self.report.logaction_finish(self)
def setactivity(self, name, msg):
self.activity = name
if msg:
self.report.verbosity0("{} {}: {}".format(self.venvname, name, msg), bold=True)
else:
self.report.verbosity1("{} {}: {}".format(self.venvname, name, msg), bold=True)
def info(self, name, msg):
self.report.verbosity1("{} {}: {}".format(self.venvname, name, msg), bold=True)
def _initlogpath(self, actionid):
if self.venv:
logdir = self.venv.envconfig.envlogdir
else:
logdir = self.session.config.logdir
try:
log_count = len(logdir.listdir("{}-*".format(actionid)))
except (py.error.ENOENT, py.error.ENOTDIR):
logdir.ensure(dir=1)
log_count = 0
path = logdir.join("{}-{}.log".format(actionid, log_count))
f = path.open("w")
f.flush()
return f
def popen(self, args, cwd=None, env=None, redirect=True, returnout=False, ignore_ret=False):
stdout = outpath = None
resultjson = self.session.config.option.resultjson
if resultjson or redirect:
fout = self._initlogpath(self.id)
fout.write("actionid: {}\nmsg: {}\ncmdargs: {!r}\n\n".format(self.id, self.msg, args))
fout.flush()
outpath = py.path.local(fout.name)
fin = outpath.open("rb")
fin.read() # read the header, so it won't be written to stdout
stdout = fout
elif returnout:
stdout = subprocess.PIPE
if cwd is None:
# FIXME XXX cwd = self.session.config.cwd
cwd = py.path.local()
try:
popen = self._popen(args, cwd, env=env, stdout=stdout, stderr=subprocess.STDOUT)
except OSError as e:
self.report.error(
"invocation failed (errno {:d}), args: {}, cwd: {}".format(e.errno, args, cwd)
)
raise
popen.outpath = outpath
popen.args = [str(x) for x in args]
popen.cwd = cwd
popen.action = self
self._popenlist.append(popen)
try:
self.report.logpopen(popen, env=env)
try:
if resultjson and not redirect:
if popen.stderr is not None:
# prevent deadlock
raise ValueError("stderr must not be piped here")
# we read binary from the process and must write using a
# binary stream
buf = getattr(sys.stdout, "buffer", sys.stdout)
out = None
last_time = time.time()
while 1:
# we have to read one byte at a time, otherwise there
# might be no output for a long time with slow tests
data = fin.read(1)
if data:
buf.write(data)
if b"\n" in data or (time.time() - last_time) > 1:
# we flush on newlines or after 1 second to
# provide quick enough feedback to the user
# when printing a dot per test
buf.flush()
last_time = time.time()
elif popen.poll() is not None:
if popen.stdout is not None:
popen.stdout.close()
break
else:
time.sleep(0.1)
# the seek updates internal read buffers
fin.seek(0, 1)
fin.close()
else:
out, err = popen.communicate()
except KeyboardInterrupt:
self.report.keyboard_interrupt()
popen.wait()
raise
ret = popen.wait()
finally:
self._popenlist.remove(popen)
if ret and not ignore_ret:
invoked = " ".join(map(str, popen.args))
if outpath:
self.report.error(
"invocation failed (exit code {:d}), logfile: {}".format(ret, outpath)
)
out = outpath.read()
self.report.error(out)
if hasattr(self, "commandlog"):
self.commandlog.add_command(popen.args, out, ret)
raise tox.exception.InvocationError("{} (see {})".format(invoked, outpath), ret)
else:
raise tox.exception.InvocationError("{!r}".format(invoked), ret)
if not out and outpath:
out = outpath.read()
if hasattr(self, "commandlog"):
self.commandlog.add_command(popen.args, out, ret)
return out
def _rewriteargs(self, cwd, args):
newargs = []
for arg in args:
if not tox.INFO.IS_WIN and isinstance(arg, py.path.local):
arg = cwd.bestrelpath(arg)
newargs.append(str(arg))
# subprocess does not always take kindly to .py scripts so adding the interpreter here
if tox.INFO.IS_WIN:
ext = os.path.splitext(str(newargs[0]))[1].lower()
if ext == ".py" and self.venv:
newargs = [str(self.venv.envconfig.envpython)] + newargs
return newargs
def _popen(self, args, cwd, stdout, stderr, env=None):
if env is None:
env = os.environ.copy()
return self.session.popen(
self._rewriteargs(cwd, args),
shell=False,
cwd=str(cwd),
universal_newlines=True,
stdout=stdout,
stderr=stderr,
env=env,
)
class Verbosity(object):
DEBUG = 2
INFO = 1
DEFAULT = 0
QUIET = -1
EXTRA_QUIET = -2
class Reporter(object):
actionchar = "-"
def __init__(self, session):
self.tw = py.io.TerminalWriter()
self.session = session
self._reportedlines = []
@property
def verbosity(self):
if self.session:
return (
self.session.config.option.verbose_level - self.session.config.option.quiet_level
)
else:
return Verbosity.DEBUG
def logpopen(self, popen, env):
""" log information about the action.popen() created process. """
cmd = " ".join(map(str, popen.args))
if popen.outpath:
self.verbosity1(" {}$ {} >{}".format(popen.cwd, cmd, popen.outpath))
else:
self.verbosity1(" {}$ {} ".format(popen.cwd, cmd))
def logaction_start(self, action):
msg = "{} {}".format(action.msg, " ".join(map(str, action.args)))
self.verbosity2("{} start: {}".format(action.venvname, msg), bold=True)
assert not hasattr(action, "_starttime")
action._starttime = time.time()
def logaction_finish(self, action):
duration = time.time() - action._starttime
self.verbosity2(
"{} finish: {} after {:.2f} seconds".format(action.venvname, action.msg, duration),
bold=True,
)
delattr(action, "_starttime")
def startsummary(self):
if self.verbosity >= Verbosity.QUIET:
self.tw.sep("_", "summary")
def info(self, msg):
if self.verbosity >= Verbosity.DEBUG:
self.logline(msg)
def using(self, msg):
if self.verbosity >= 1:
self.logline("using {}".format(msg), bold=True)
def keyboard_interrupt(self):
self.error("KEYBOARDINTERRUPT")
def keyvalue(self, name, value):
if name.endswith(":"):
name += " "
self.tw.write(name, bold=True)
self.tw.write(value)
self.tw.line()
def line(self, msg, **opts):
self.logline(msg, **opts)
def good(self, msg):
if self.verbosity >= Verbosity.QUIET:
self.logline(msg, green=True)
def warning(self, msg):
if self.verbosity >= Verbosity.QUIET:
self.logline("WARNING: {}".format(msg), red=True)
def error(self, msg):
if self.verbosity >= Verbosity.QUIET:
self.logline("ERROR: {}".format(msg), red=True)
def skip(self, msg):
if self.verbosity >= Verbosity.QUIET:
self.logline("SKIPPED: {}".format(msg), yellow=True)
def logline(self, msg, **opts):
self._reportedlines.append(msg)
self.tw.line("{}".format(msg), **opts)
def verbosity0(self, msg, **opts):
if self.verbosity >= Verbosity.DEFAULT:
self.logline("{}".format(msg), **opts)
def verbosity1(self, msg, **opts):
if self.verbosity >= Verbosity.INFO:
self.logline("{}".format(msg), **opts)
def verbosity2(self, msg, **opts):
if self.verbosity >= Verbosity.DEBUG:
self.logline("{}".format(msg), **opts)
# def log(self, msg):
# print(msg, file=sys.stderr)
[docs]class Session:
"""The session object that ties together configuration, reporting, venv creation, testing."""
def __init__(self, config, popen=subprocess.Popen, Report=Reporter):
self.config = config
self.popen = popen
self.resultlog = ResultLog()
self.report = Report(self)
self.make_emptydir(config.logdir)
config.logdir.ensure(dir=1)
self.report.using("tox.ini: {}".format(self.config.toxinipath))
self._spec2pkg = {}
self._name2venv = {}
try:
self.venvlist = [self.getvenv(x) for x in self.config.envlist]
except LookupError:
raise SystemExit(1)
except tox.exception.ConfigError as e:
self.report.error(str(e))
raise SystemExit(1)
self._actions = []
@property
def hook(self):
return self.config.pluginmanager.hook
def _makevenv(self, name):
envconfig = self.config.envconfigs.get(name, None)
if envconfig is None:
self.report.error("unknown environment {!r}".format(name))
raise LookupError(name)
elif envconfig.envdir == self.config.toxinidir:
self.report.error(
"venv {!r} in {} would delete project".format(name, envconfig.envdir)
)
raise tox.exception.ConfigError("envdir must not equal toxinidir")
venv = VirtualEnv(envconfig=envconfig, session=self)
self._name2venv[name] = venv
return venv
[docs] def getvenv(self, name):
""" return a VirtualEnv controler object for the 'name' env. """
try:
return self._name2venv[name]
except KeyError:
return self._makevenv(name)
def newaction(self, venv, msg, *args):
action = Action(self, venv, msg, args)
self._actions.append(action)
return action
def runcommand(self):
self.report.using("tox-{} from {}".format(tox.__version__, tox.__file__))
verbosity = self.report.verbosity > Verbosity.DEFAULT
if self.config.option.showconfig:
self.showconfig()
elif self.config.option.listenvs:
self.showenvs(all_envs=False, description=verbosity)
elif self.config.option.listenvs_all:
self.showenvs(all_envs=True, description=verbosity)
else:
return self.subcommand_test()
def _copyfiles(self, srcdir, pathlist, destdir):
for relpath in pathlist:
src = srcdir.join(relpath)
if not src.check():
self.report.error("missing source file: {}".format(src))
raise SystemExit(1)
target = destdir.join(relpath)
target.dirpath().ensure(dir=1)
src.copy(target)
def _makesdist(self):
setup = self.config.setupdir.join("setup.py")
if not setup.check():
self.report.error(
"No setup.py file found. The expected location is:\n"
" {}\n"
"You can\n"
" 1. Create one:\n"
" https://packaging.python.org/tutorials/distributing-packages/#setup-py\n"
" 2. Configure tox to avoid running sdist:\n"
" http://tox.readthedocs.io/en/latest/example/general.html"
"#avoiding-expensive-sdist".format(setup)
)
raise SystemExit(1)
with self.newaction(None, "packaging") as action:
action.setactivity("sdist-make", setup)
self.make_emptydir(self.config.distdir)
action.popen(
[
sys.executable,
setup,
"sdist",
"--formats=zip",
"--dist-dir",
self.config.distdir,
],
cwd=self.config.setupdir,
)
try:
return self.config.distdir.listdir()[0]
except py.error.ENOENT:
# check if empty or comment only
data = []
with open(str(setup)) as fp:
for line in fp:
if line and line[0] == "#":
continue
data.append(line)
if not "".join(data).strip():
self.report.error("setup.py is empty")
raise SystemExit(1)
self.report.error(
"No dist directory found. Please check setup.py, e.g with:\n"
" python setup.py sdist"
)
raise SystemExit(1)
def make_emptydir(self, path):
if path.check():
self.report.info(" removing {}".format(path))
shutil.rmtree(str(path), ignore_errors=True)
path.ensure(dir=1)
def setupenv(self, venv):
if venv.envconfig.missing_subs:
venv.status = (
"unresolvable substitution(s): {}. "
"Environment variables are missing or defined recursively.".format(
",".join(["'{}'".format(m) for m in venv.envconfig.missing_subs])
)
)
return
if not venv.matching_platform():
venv.status = "platform mismatch"
return # we simply omit non-matching platforms
with self.newaction(venv, "getenv", venv.envconfig.envdir) as action:
venv.status = 0
default_ret_code = 1
envlog = self.resultlog.get_envlog(venv.name)
try:
status = venv.update(action=action)
except IOError as e:
if e.args[0] != 2:
raise
status = (
"Error creating virtualenv. Note that spaces in paths are "
"not supported by virtualenv. Error details: {!r}".format(e)
)
except tox.exception.InvocationError as e:
status = (
"Error creating virtualenv. Note that some special characters (e.g. ':' and "
"unicode symbols) in paths are not supported by virtualenv. Error details: "
"{!r}".format(e)
)
except tox.exception.InterpreterNotFound as e:
status = e
if self.config.option.skip_missing_interpreters:
default_ret_code = 0
if status:
str_status = str(status)
commandlog = envlog.get_commandlog("setup")
commandlog.add_command(["setup virtualenv"], str_status, default_ret_code)
venv.status = status
if default_ret_code == 0:
self.report.skip(str_status)
else:
self.report.error(str_status)
return False
commandpath = venv.getcommandpath("python")
envlog.set_python_info(commandpath)
return True
def finishvenv(self, venv):
with self.newaction(venv, "finishvenv"):
venv.finish()
return True
def developpkg(self, venv, setupdir):
with self.newaction(venv, "developpkg", setupdir) as action:
try:
venv.developpkg(setupdir, action)
return True
except tox.exception.InvocationError:
venv.status = sys.exc_info()[1]
return False
[docs] def installpkg(self, venv, path):
"""Install package in the specified virtual environment.
:param VenvConfig venv: Destination environment
:param str path: Path to the distribution package.
:return: True if package installed otherwise False.
:rtype: bool
"""
self.resultlog.set_header(installpkg=py.path.local(path))
with self.newaction(venv, "installpkg", path) as action:
try:
venv.installpkg(path, action)
return True
except tox.exception.InvocationError:
venv.status = sys.exc_info()[1]
return False
[docs] def get_installpkg_path(self):
"""
:return: Path to the distribution
:rtype: py.path.local
"""
if not self.config.option.sdistonly and (
self.config.sdistsrc or self.config.option.installpkg
):
path = self.config.option.installpkg
if not path:
path = self.config.sdistsrc
path = self._resolve_package(path)
self.report.info("using package {!r}, skipping 'sdist' activity ".format(str(path)))
else:
try:
path = self._makesdist()
except tox.exception.InvocationError:
v = sys.exc_info()[1]
self.report.error("FAIL could not package project - v = {!r}".format(v))
return
sdistfile = self.config.distshare.join(path.basename)
if sdistfile != path:
self.report.info("copying new sdistfile to {!r}".format(str(sdistfile)))
try:
sdistfile.dirpath().ensure(dir=1)
except py.error.Error:
self.report.warning(
"could not copy distfile to {}".format(sdistfile.dirpath())
)
else:
path.copy(sdistfile)
return path
def subcommand_test(self):
if self.config.skipsdist:
self.report.info("skipping sdist step")
path = None
else:
path = self.get_installpkg_path()
if not path:
return 2
if self.config.option.sdistonly:
return
for venv in self.venvlist:
if self.setupenv(venv):
if venv.envconfig.skip_install:
self.finishvenv(venv)
else:
if venv.envconfig.usedevelop:
self.developpkg(venv, self.config.setupdir)
elif self.config.skipsdist:
self.finishvenv(venv)
else:
self.installpkg(venv, path)
self.runenvreport(venv)
self.runtestenv(venv)
retcode = self._summary()
return retcode
[docs] def runenvreport(self, venv):
"""
Run an environment report to show which package
versions are installed in the venv
"""
with self.newaction(venv, "envreport") as action:
packages = self.hook.tox_runenvreport(venv=venv, action=action)
action.setactivity("installed", ",".join(packages))
envlog = self.resultlog.get_envlog(venv.name)
envlog.set_installed(packages)
def runtestenv(self, venv, redirect=False):
if not self.config.option.notest:
if venv.status:
return
self.hook.tox_runtest_pre(venv=venv)
self.hook.tox_runtest(venv=venv, redirect=redirect)
self.hook.tox_runtest_post(venv=venv)
else:
venv.status = "skipped tests"
def _summary(self):
self.report.startsummary()
retcode = 0
for venv in self.venvlist:
status = venv.status
if isinstance(status, tox.exception.InterpreterNotFound):
msg = " {}: {}".format(venv.envconfig.envname, str(status))
if self.config.option.skip_missing_interpreters:
self.report.skip(msg)
else:
retcode = 1
self.report.error(msg)
elif status == "platform mismatch":
msg = " {}: {}".format(venv.envconfig.envname, str(status))
self.report.skip(msg)
elif status and status == "ignored failed command":
msg = " {}: {}".format(venv.envconfig.envname, str(status))
self.report.good(msg)
elif status and status != "skipped tests":
msg = " {}: {}".format(venv.envconfig.envname, str(status))
self.report.error(msg)
retcode = 1
else:
if not status:
status = "commands succeeded"
self.report.good(" {}: {}".format(venv.envconfig.envname, status))
if not retcode:
self.report.good(" congratulations :)")
path = self.config.option.resultjson
if path:
path = py.path.local(path)
path.write(self.resultlog.dumps_json())
self.report.line("wrote json report at: {}".format(path))
return retcode
def showconfig(self):
self.info_versions()
self.report.keyvalue("config-file:", self.config.option.configfile)
self.report.keyvalue("toxinipath: ", self.config.toxinipath)
self.report.keyvalue("toxinidir: ", self.config.toxinidir)
self.report.keyvalue("toxworkdir: ", self.config.toxworkdir)
self.report.keyvalue("setupdir: ", self.config.setupdir)
self.report.keyvalue("distshare: ", self.config.distshare)
self.report.keyvalue("skipsdist: ", self.config.skipsdist)
self.report.tw.line()
for envconfig in self.config.envconfigs.values():
self.report.line("[testenv:{}]".format(envconfig.envname), bold=True)
for attr in self.config._parser._testenv_attr:
self.report.line(" {:<15} = {}".format(attr.name, getattr(envconfig, attr.name)))
def showenvs(self, all_envs=False, description=False):
env_conf = self.config.envconfigs # this contains all environments
default = self.config.envlist # this only the defaults
extra = sorted(e for e in env_conf if e not in default) if all_envs else []
if description:
self.report.line("default environments:")
max_length = max(len(env) for env in (default + extra))
def report_env(e):
if description:
text = env_conf[e].description or "[no description]"
msg = "{} -> {}".format(e.ljust(max_length), text).strip()
else:
msg = e
self.report.line(msg)
for e in default:
report_env(e)
if all_envs and extra:
if description:
self.report.line("")
self.report.line("additional environments:")
for e in extra:
report_env(e)
def info_versions(self):
versions = ["tox-{}".format(tox.__version__)]
proc = subprocess.Popen(
(sys.executable, "-m", "virtualenv", "--version"), stdout=subprocess.PIPE
)
out, _ = proc.communicate()
versions.append("virtualenv-{}".format(out.decode("UTF-8").strip()))
self.report.keyvalue("tool-versions:", " ".join(versions))
def _resolve_package(self, package_spec):
try:
return self._spec2pkg[package_spec]
except KeyError:
self._spec2pkg[package_spec] = x = self._get_latest_version_of_package(package_spec)
return x
def _get_latest_version_of_package(self, package_spec):
if not os.path.isabs(str(package_spec)):
return package_spec
p = py.path.local(package_spec)
if p.check():
return p
if not p.dirpath().check(dir=1):
raise tox.exception.MissingDirectory(p.dirpath())
self.report.info("determining {}".format(p))
candidates = p.dirpath().listdir(p.basename)
if len(candidates) == 0:
raise tox.exception.MissingDependency(package_spec)
if len(candidates) > 1:
version_package = []
for filename in candidates:
version = get_version_from_filename(filename.basename)
if version is not None:
version_package.append((version, filename))
else:
self.report.warning("could not determine version of: {}".format(str(filename)))
if not version_package:
raise tox.exception.MissingDependency(package_spec)
version_package.sort()
_, package_with_largest_version = version_package[-1]
return package_with_largest_version
else:
return candidates[0]
_REGEX_FILE_NAME_WITH_VERSION = re.compile(r"[\w_\-\+\.]+-(.*)\.(zip|tar\.gz)")
def get_version_from_filename(basename):
m = _REGEX_FILE_NAME_WITH_VERSION.match(basename)
if m is None:
return None
version = m.group(1)
try:
return Version(version)
except InvalidVersion:
return None