dist-git/dist-git-client/dist_git_client.py

502 lines
16 KiB
Python
Raw Permalink Normal View History

"""
dist-git-client code, moved to a python module to simplify unit-testing
"""
import argparse
import configparser
import errno
import glob
import logging
import shlex
import os
import shutil
import subprocess
import sys
from urllib.parse import urlparse
try:
from rpmautospec import (
specfile_uses_rpmautospec as rpmautospec_used,
process_distgit as rpmautospec_expand,
)
except ImportError:
rpmautospec_used = lambda _: False
def log_cmd(command, comment="Running command"):
""" Dump the command to stderr so it can be c&p to shell """
command = ' '.join([shlex.quote(x) for x in command])
logging.info("%s: %s", comment, command)
def check_output(cmd, comment="Reading stdout from command"):
""" el6 compatible subprocess.check_output() """
log_cmd(cmd, comment)
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, _) = process.communicate()
if process.returncode:
raise RuntimeError("Exit non-zero: {0}".format(process.returncode))
return stdout
def call(cmd, comment="Calling"):
""" wrap sp.call() with logging info """
log_cmd(cmd, comment)
return subprocess.call(cmd)
def check_call(cmd, comment="Checked call"):
""" wrap sp.check_call() with logging info """
log_cmd(cmd, comment)
subprocess.check_call(cmd)
def _load_config(directory):
config = configparser.ConfigParser()
files = glob.glob(os.path.join(directory, "*.ini"))
logging.debug("Files %s in config directory %s", files, directory)
config.read(files)
config_dict = {
"instances": {},
"clone_host_map": {},
}
instances = config_dict["instances"]
for section_name in config.sections():
section = config[section_name]
instance = instances[section_name] = {}
for key in section.keys():
# array-like config options
if key in ["clone_hostnames", "path_prefixes"]:
hostnames = section[key].split()
instance[key] = [h.strip() for h in hostnames]
else:
instance[key] = section[key]
for key in ["sources", "specs"]:
if key in instance:
continue
instance[key] = "."
if "sources_file" not in instance:
instance["sources_file"] = "sources"
if "default_sum" not in instance:
instance["default_sum"] = "md5"
for host in instance["clone_hostnames"]:
if host not in config_dict["clone_host_map"]:
config_dict["clone_host_map"][host] = {}
host_dict = config_dict["clone_host_map"][host]
for prefix in instance.get("path_prefixes", ["DEFAULT"]):
if prefix in host_dict:
msg = "Duplicate prefix {0} for {1} hostname".format(
prefix, host,
)
raise RuntimeError(msg)
host_dict[prefix] = instance
return config_dict
def download(url, filename):
""" Download URL as FILENAME using curl command """
if not hasattr(download, "curl_has_retry_all_errors"):
# Drop this once EL8 is not a thing to support
output = check_output(["curl", "--help", "all"])
# method's static variable to avoid using 'global'
download.curl_has_retry_all_errors = b"--retry-all-errors" in output
command = [
"curl",
"-H", "Pragma:",
"-o", filename,
"--location",
"--connect-timeout", "60",
"--retry", "3", "--retry-delay", "10",
"--remote-time",
"--show-error",
"--fail",
]
if download.curl_has_retry_all_errors:
command += ["--retry-all-errors"]
if call(command + [url]):
raise RuntimeError("Can't download file {0}".format(filename))
def mkdir_p(path):
""" mimic 'mkdir -p <path>' command """
try:
os.makedirs(path)
except OSError as err:
if err.errno != errno.EEXIST:
raise
def download_file_and_check(url, params, distgit_config):
""" Download given URL (if not yet downloaded), and try the checksum """
filename = params["filename"]
sum_binary = params["hashtype"] + "sum"
mkdir_p(distgit_config["sources"])
if not os.path.exists(filename):
logging.info("Downloading %s", filename)
download(url, filename)
else:
logging.info("File %s already exists", filename)
sum_command = [sum_binary, filename]
output = check_output(sum_command).decode("utf-8")
checksum, _ = output.strip().split()
if checksum != params["hash"]:
raise RuntimeError("Check-sum {0} is wrong, expected: {1}".format(
checksum,
params["hash"],
))
def _detect_clone_url():
git_config = ".git/config"
if not os.path.exists(git_config):
msg = "{0} not found, $PWD is not a git repository".format(git_config)
raise RuntimeError(msg)
git_conf_reader = configparser.ConfigParser()
git_conf_reader.read(git_config)
return git_conf_reader['remote "origin"']["url"]
def get_distgit_config(config, forked_from=None):
"""
Given the '.git/config' file from current directory, return the
appropriate part of dist-git configuration.
Returns tuple: (urlparse(clone_url), distgit_config)
"""
url = forked_from
if not url:
url = _detect_clone_url()
parsed_url = urlparse(url)
if parsed_url.hostname is None:
hostname = "localhost"
else:
hostname = parsed_url.hostname
prefixes = config["clone_host_map"][hostname]
prefix_found = None
for prefix in prefixes.keys():
if not parsed_url.path.startswith(prefix):
continue
prefix_found = prefix
if not prefix_found:
if "DEFAULT" not in prefixes:
raise RuntimeError("Path {0} does not match any of 'path_prefixes' "
"for '{1}' hostname".format(parsed_url.path,
hostname))
prefix_found = "DEFAULT"
return parsed_url, prefixes[prefix_found]
def get_spec(distgit_config):
"""
Find the specfile name inside distgit_config["specs"] directory
"""
spec_dir = distgit_config["specs"]
specfiles = glob.glob(os.path.join(spec_dir, '*.spec'))
if len(specfiles) != 1:
abs_spec_dir = os.path.join(os.getcwd(), spec_dir)
message = "Exactly one spec file expected in {0} directory, {1} found".format(
abs_spec_dir, len(specfiles),
)
raise RuntimeError(message)
specfile = os.path.basename(specfiles[0])
return specfile
def sources(args, config):
"""
Locate the sources, and download them from the appropriate dist-git
lookaside cache.
"""
parsed_url, distgit_config = get_distgit_config(config, args.forked_from)
namespace = parsed_url.path.strip('/').split('/')
# drop the last {name}.git part
repo_name = namespace.pop()
if repo_name.endswith(".git"):
repo_name = repo_name[:-4]
namespace = list(reversed(namespace))
output = check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
output = output.decode("utf-8").strip()
if output == "HEAD":
output = check_output(["git", "rev-parse", "HEAD"])
output = output.decode("utf-8").strip()
refspec = output
specfile = get_spec(distgit_config)
name = specfile[:-5]
sources_file = distgit_config["sources_file"].format(name=name)
if not os.path.exists(sources_file):
logging.info("'%s' file not found, download skipped", sources_file)
return
logging.info("Reading sources specification file: %s", sources_file)
with open(sources_file, 'r') as sfd:
while True:
line = sfd.readline()
if not line:
break
kwargs = {
"name": repo_name,
"refspec": refspec,
"namespace": namespace,
}
source_spec = line.split()
if not source_spec:
# line full of white-spaces, skip
continue
if len(source_spec) == 2:
# old md5/sha1 format: 0ced6f20b9fa1bea588005b5ad4b52c1 tar-1.26.tar.xz
kwargs["hashtype"] = distgit_config["default_sum"].lower()
kwargs["hash"] = source_spec[0]
kwargs["filename"] = source_spec[1]
elif len(source_spec) == 4:
# SHA512 (tar-1.30.tar.xz.sig) = <HASH>
kwargs["hashtype"] = source_spec[0].lower()
kwargs["hash"] = source_spec[3]
filename = os.path.basename(source_spec[1])
kwargs["filename"] = filename.strip('()')
else:
msg = "Weird sources line: {0}".format(line)
raise RuntimeError(msg)
url_file = '/'.join([
distgit_config["lookaside_location"],
distgit_config["lookaside_uri_pattern"].format(**kwargs)
])
download_file_and_check(url_file, kwargs, distgit_config)
def handle_autospec(spec_abspath, spec_basename, args):
"""
When %auto* macros are used in SPEC_ABSPATH, expand them into a separate
spec file within ARGS.OUTPUTDIR, and return the absolute filename of the
specfile. When %auto* macros are not used, return SPEC_ABSPATH unchanged.
"""
result = spec_abspath
if rpmautospec_used(spec_abspath):
git_dir = check_output(["git", "rev-parse", "--git-dir"])
git_dir = git_dir.decode("utf-8").strip()
if os.path.exists(os.path.join(git_dir, "shallow")):
# Hack. The rpmautospec doesn't support shallow clones:
# https://pagure.io/fedora-infra/rpmautospec/issue/227
logging.info("rpmautospec requires full clone => --unshallow")
check_call(["git", "fetch", "--unshallow"])
# Expand the %auto* macros, and create the separate spec file in the
# output directory.
output_spec = os.path.join(args.outputdir, spec_basename)
rpmautospec_expand(spec_abspath, output_spec)
result = output_spec
return result
def srpm(args, config):
"""
Using the appropriate dist-git configuration, generate source RPM
file. This requires running 'def sources()' first.
"""
_, distgit_config = get_distgit_config(config, args.forked_from)
cwd = os.getcwd()
sources_dir = os.path.join(cwd, distgit_config["sources"])
specs = os.path.join(cwd, distgit_config["specs"])
spec = get_spec(distgit_config)
mkdir_p(args.outputdir)
spec_abspath = os.path.join(specs, spec)
spec_abspath = handle_autospec(spec_abspath, spec, args)
if args.mock_chroot:
command = [
"mock", "--buildsrpm",
"-r", args.mock_chroot,
"--spec", spec_abspath,
"--sources", sources_dir,
"--resultdir", args.outputdir,
]
else:
command = [
"rpmbuild", "-bs", spec_abspath,
"--define", "dist %nil",
"--define", "_sourcedir {0}".format(sources_dir),
"--define", "_srcrpmdir {0}".format(args.outputdir),
"--define", "_disable_source_fetch 1",
]
if args.dry_run or 'COPR_DISTGIT_CLIENT_DRY_RUN' in os.environ:
log_cmd(command, comment="Dry run")
else:
check_call(command)
def clone(args, config):
"""
Automatically clone a package from a given DistGit instance
"""
distgit = config["instances"][args.dist_git]
parts = distgit.get("cloning_pattern_package_parts")
if parts:
expected = parts.split()
have = args.package.split("/")
if len(expected) != len(have):
raise RuntimeError(
"Package '{0}' has a wrong format, {1} "
"slash-separated parts are expected: {2}".format(
args.package, len(expected),
"/".join(expected),
))
clone_url = distgit["cloning_pattern"].format(package=args.package)
check_call([
"git", "clone", clone_url,
])
def _get_argparser():
parser = argparse.ArgumentParser(prog="dist-git-client",
description="""\
A simple, configurable python utility that is able to download sources from
various dist-git instances, and generate source RPMs.
The utility is able to automatically map the "origin" .git/config clone URL
(or --forked-from URL, if specified) to a corresponding dist-git instance
configured in /etc/dist-git-client directory.
""")
# main parser
default_confdir = os.environ.get("COPR_DISTGIT_CLIENT_CONFDIR",
"/etc/dist-git-client")
parser.add_argument(
"--configdir", default=default_confdir,
help="Where to load configuration files from")
parser.add_argument(
"--loglevel", default="info",
help="Python logging level, e.g. debug, info, error")
parser.add_argument(
"--forked-from",
metavar="CLONE_URL",
help=("Specify that this git clone directory is a dist-git repository "
"fork. If used, the default clone url detection from the "
".git/config file is disabled and CLONE_URL is used instead. "
"This specified CLONE_URL is used to detect the appropriate "
"lookaside cache pattern to download the sources."))
subparsers = parser.add_subparsers(
title="actions", dest="action")
# sources parser
subparsers.add_parser(
"sources",
description=(
"Using the 'url' .git/config, detect where the right DistGit "
"lookaside cache exists, and download the corresponding source "
"files."),
help="Download sources from the lookaside cache")
# srpm parser
srpm_parser = subparsers.add_parser(
"srpm",
help="Generate a source RPM",
description=(
"Generate a source RPM from the downloaded source files "
"by 'sources' command, please run 'sources' first."),
)
srpm_parser.add_argument(
"--outputdir",
default="/tmp",
help="Where to store the resulting source RPM")
srpm_parser.add_argument(
"--mock-chroot",
help=("Generate the SRPM in mock buildroot instead of on host. The "
"argument is passed down to mock as the 'mock -r|--root' "
"argument."),
)
srpm_parser.add_argument(
"--dry-run", action="store_true",
help=("Don't produce the SRPM, just print the command which would be "
"otherwise called"),
)
clone_parser = subparsers.add_parser(
"clone",
help="Clone package from a DistGit source",
)
clone_parser.add_argument(
"--dist-git",
default="fedora",
help=("The DistGit ID as configured in /etc/dist-git-client/"),
)
clone_parser.add_argument(
"package",
default="fedora",
help=("Package name specification. For some DistGit "
"instances this consists of multiple parts separated "
"by slash, e.g. for '--dist-git=fedora-copr' use "
"'@copr/copr-dev/copr-cli'."),
)
return parser
def unittests_init_git(files=None):
"""
Initialize .git/ directory. This method is only used for unit-testing.
"""
check_output(["git", "init", ".", "-b", "main"])
shutil.rmtree(".git/hooks")
check_output(["git", "config", "user.email", "you@example.com"])
check_output(["git", "config", "user.name", "Your Name"])
check_output(["git", "config", "advice.detachedHead", "false"])
for filename, content in files:
dirname = os.path.dirname(filename)
try:
os.makedirs(dirname)
except OSError:
pass
with open(filename, "w", encoding="utf-8") as filed:
filed.write(content)
check_output(["git", "add", filename])
check_output(["git", "commit", "-m", "initial"])
def main():
""" The entrypoint for the whole logic """
args = _get_argparser().parse_args()
logging.basicConfig(
level=getattr(logging, args.loglevel.upper()),
format="%(levelname)s: %(message)s",
)
config = _load_config(args.configdir)
try:
if args.action == "srpm":
srpm(args, config)
elif args.action == "clone":
clone(args, config)
else:
sources(args, config)
except RuntimeError as err:
logging.error("%s", err)
sys.exit(1)