mirror of
https://github.com/release-engineering/dist-git.git
synced 2025-02-23 06:52:57 +00:00

Inherited from Copr basecode, namely starting from https://github.com/fedora-copr/copr/pull/3299
501 lines
16 KiB
Python
501 lines
16 KiB
Python
"""
|
|
dist-git-client code, moved to a python module to simplify unit-testing
|
|
"""
|
|
|
|
import argparse
|
|
import configparser
|
|
import errno
|
|
import glob
|
|
import logging
|
|
import shlex
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
from urllib.parse import urlparse
|
|
|
|
try:
|
|
from rpmautospec import (
|
|
specfile_uses_rpmautospec as rpmautospec_used,
|
|
process_distgit as rpmautospec_expand,
|
|
)
|
|
except ImportError:
|
|
rpmautospec_used = lambda _: False
|
|
|
|
|
|
def log_cmd(command, comment="Running command"):
|
|
""" Dump the command to stderr so it can be c&p to shell """
|
|
command = ' '.join([shlex.quote(x) for x in command])
|
|
logging.info("%s: %s", comment, command)
|
|
|
|
|
|
def check_output(cmd, comment="Reading stdout from command"):
|
|
""" el6 compatible subprocess.check_output() """
|
|
log_cmd(cmd, comment)
|
|
process = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
(stdout, _) = process.communicate()
|
|
if process.returncode:
|
|
raise RuntimeError("Exit non-zero: {0}".format(process.returncode))
|
|
return stdout
|
|
|
|
def call(cmd, comment="Calling"):
|
|
""" wrap sp.call() with logging info """
|
|
log_cmd(cmd, comment)
|
|
return subprocess.call(cmd)
|
|
|
|
def check_call(cmd, comment="Checked call"):
|
|
""" wrap sp.check_call() with logging info """
|
|
log_cmd(cmd, comment)
|
|
subprocess.check_call(cmd)
|
|
|
|
def _load_config(directory):
|
|
config = configparser.ConfigParser()
|
|
files = glob.glob(os.path.join(directory, "*.ini"))
|
|
logging.debug("Files %s in config directory %s", files, directory)
|
|
config.read(files)
|
|
|
|
config_dict = {
|
|
"instances": {},
|
|
"clone_host_map": {},
|
|
}
|
|
|
|
instances = config_dict["instances"]
|
|
for section_name in config.sections():
|
|
section = config[section_name]
|
|
instance = instances[section_name] = {}
|
|
for key in section.keys():
|
|
# array-like config options
|
|
if key in ["clone_hostnames", "path_prefixes"]:
|
|
hostnames = section[key].split()
|
|
instance[key] = [h.strip() for h in hostnames]
|
|
else:
|
|
instance[key] = section[key]
|
|
|
|
for key in ["sources", "specs"]:
|
|
if key in instance:
|
|
continue
|
|
instance[key] = "."
|
|
|
|
if "sources_file" not in instance:
|
|
instance["sources_file"] = "sources"
|
|
|
|
if "default_sum" not in instance:
|
|
instance["default_sum"] = "md5"
|
|
|
|
for host in instance["clone_hostnames"]:
|
|
if host not in config_dict["clone_host_map"]:
|
|
config_dict["clone_host_map"][host] = {}
|
|
host_dict = config_dict["clone_host_map"][host]
|
|
for prefix in instance.get("path_prefixes", ["DEFAULT"]):
|
|
if prefix in host_dict:
|
|
msg = "Duplicate prefix {0} for {1} hostname".format(
|
|
prefix, host,
|
|
)
|
|
raise RuntimeError(msg)
|
|
host_dict[prefix] = instance
|
|
|
|
return config_dict
|
|
|
|
|
|
def download(url, filename):
|
|
""" Download URL as FILENAME using curl command """
|
|
|
|
if not hasattr(download, "curl_has_retry_all_errors"):
|
|
# Drop this once EL8 is not a thing to support
|
|
output = check_output(["curl", "--help", "all"])
|
|
# method's static variable to avoid using 'global'
|
|
download.curl_has_retry_all_errors = b"--retry-all-errors" in output
|
|
|
|
command = [
|
|
"curl",
|
|
"-H", "Pragma:",
|
|
"-o", filename,
|
|
"--location",
|
|
"--connect-timeout", "60",
|
|
"--retry", "3", "--retry-delay", "10",
|
|
"--remote-time",
|
|
"--show-error",
|
|
"--fail",
|
|
]
|
|
|
|
if download.curl_has_retry_all_errors:
|
|
command += ["--retry-all-errors"]
|
|
|
|
if call(command + [url]):
|
|
raise RuntimeError("Can't download file {0}".format(filename))
|
|
|
|
|
|
def mkdir_p(path):
|
|
""" mimic 'mkdir -p <path>' command """
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as err:
|
|
if err.errno != errno.EEXIST:
|
|
raise
|
|
|
|
|
|
def download_file_and_check(url, params, distgit_config):
|
|
""" Download given URL (if not yet downloaded), and try the checksum """
|
|
filename = params["filename"]
|
|
sum_binary = params["hashtype"] + "sum"
|
|
|
|
mkdir_p(distgit_config["sources"])
|
|
|
|
if not os.path.exists(filename):
|
|
logging.info("Downloading %s", filename)
|
|
download(url, filename)
|
|
else:
|
|
logging.info("File %s already exists", filename)
|
|
|
|
sum_command = [sum_binary, filename]
|
|
output = check_output(sum_command).decode("utf-8")
|
|
checksum, _ = output.strip().split()
|
|
if checksum != params["hash"]:
|
|
raise RuntimeError("Check-sum {0} is wrong, expected: {1}".format(
|
|
checksum,
|
|
params["hash"],
|
|
))
|
|
|
|
|
|
def _detect_clone_url():
|
|
git_config = ".git/config"
|
|
if not os.path.exists(git_config):
|
|
msg = "{0} not found, $PWD is not a git repository".format(git_config)
|
|
raise RuntimeError(msg)
|
|
|
|
git_conf_reader = configparser.ConfigParser()
|
|
git_conf_reader.read(git_config)
|
|
return git_conf_reader['remote "origin"']["url"]
|
|
|
|
|
|
def get_distgit_config(config, forked_from=None):
|
|
"""
|
|
Given the '.git/config' file from current directory, return the
|
|
appropriate part of dist-git configuration.
|
|
Returns tuple: (urlparse(clone_url), distgit_config)
|
|
"""
|
|
url = forked_from
|
|
if not url:
|
|
url = _detect_clone_url()
|
|
parsed_url = urlparse(url)
|
|
if parsed_url.hostname is None:
|
|
hostname = "localhost"
|
|
else:
|
|
hostname = parsed_url.hostname
|
|
|
|
prefixes = config["clone_host_map"][hostname]
|
|
prefix_found = None
|
|
for prefix in prefixes.keys():
|
|
if not parsed_url.path.startswith(prefix):
|
|
continue
|
|
prefix_found = prefix
|
|
|
|
if not prefix_found:
|
|
if "DEFAULT" not in prefixes:
|
|
raise RuntimeError("Path {0} does not match any of 'path_prefixes' "
|
|
"for '{1}' hostname".format(parsed_url.path,
|
|
hostname))
|
|
prefix_found = "DEFAULT"
|
|
|
|
return parsed_url, prefixes[prefix_found]
|
|
|
|
|
|
def get_spec(distgit_config):
|
|
"""
|
|
Find the specfile name inside distgit_config["specs"] directory
|
|
"""
|
|
spec_dir = distgit_config["specs"]
|
|
specfiles = glob.glob(os.path.join(spec_dir, '*.spec'))
|
|
if len(specfiles) != 1:
|
|
abs_spec_dir = os.path.join(os.getcwd(), spec_dir)
|
|
message = "Exactly one spec file expected in {0} directory, {1} found".format(
|
|
abs_spec_dir, len(specfiles),
|
|
)
|
|
raise RuntimeError(message)
|
|
specfile = os.path.basename(specfiles[0])
|
|
return specfile
|
|
|
|
|
|
def sources(args, config):
|
|
"""
|
|
Locate the sources, and download them from the appropriate dist-git
|
|
lookaside cache.
|
|
"""
|
|
parsed_url, distgit_config = get_distgit_config(config, args.forked_from)
|
|
namespace = parsed_url.path.strip('/').split('/')
|
|
# drop the last {name}.git part
|
|
repo_name = namespace.pop()
|
|
if repo_name.endswith(".git"):
|
|
repo_name = repo_name[:-4]
|
|
namespace = list(reversed(namespace))
|
|
|
|
output = check_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
|
|
output = output.decode("utf-8").strip()
|
|
if output == "HEAD":
|
|
output = check_output(["git", "rev-parse", "HEAD"])
|
|
output = output.decode("utf-8").strip()
|
|
refspec = output
|
|
specfile = get_spec(distgit_config)
|
|
name = specfile[:-5]
|
|
sources_file = distgit_config["sources_file"].format(name=name)
|
|
if not os.path.exists(sources_file):
|
|
logging.info("'%s' file not found, download skipped", sources_file)
|
|
return
|
|
|
|
logging.info("Reading sources specification file: %s", sources_file)
|
|
with open(sources_file, 'r') as sfd:
|
|
while True:
|
|
line = sfd.readline()
|
|
if not line:
|
|
break
|
|
|
|
kwargs = {
|
|
"name": repo_name,
|
|
"refspec": refspec,
|
|
"namespace": namespace,
|
|
}
|
|
|
|
source_spec = line.split()
|
|
if not source_spec:
|
|
# line full of white-spaces, skip
|
|
continue
|
|
|
|
if len(source_spec) == 2:
|
|
# old md5/sha1 format: 0ced6f20b9fa1bea588005b5ad4b52c1 tar-1.26.tar.xz
|
|
kwargs["hashtype"] = distgit_config["default_sum"].lower()
|
|
kwargs["hash"] = source_spec[0]
|
|
kwargs["filename"] = source_spec[1]
|
|
elif len(source_spec) == 4:
|
|
# SHA512 (tar-1.30.tar.xz.sig) = <HASH>
|
|
kwargs["hashtype"] = source_spec[0].lower()
|
|
kwargs["hash"] = source_spec[3]
|
|
filename = os.path.basename(source_spec[1])
|
|
kwargs["filename"] = filename.strip('()')
|
|
else:
|
|
msg = "Weird sources line: {0}".format(line)
|
|
raise RuntimeError(msg)
|
|
|
|
url_file = '/'.join([
|
|
distgit_config["lookaside_location"],
|
|
distgit_config["lookaside_uri_pattern"].format(**kwargs)
|
|
])
|
|
|
|
download_file_and_check(url_file, kwargs, distgit_config)
|
|
|
|
|
|
def handle_autospec(spec_abspath, spec_basename, args):
|
|
"""
|
|
When %auto* macros are used in SPEC_ABSPATH, expand them into a separate
|
|
spec file within ARGS.OUTPUTDIR, and return the absolute filename of the
|
|
specfile. When %auto* macros are not used, return SPEC_ABSPATH unchanged.
|
|
"""
|
|
result = spec_abspath
|
|
if rpmautospec_used(spec_abspath):
|
|
git_dir = check_output(["git", "rev-parse", "--git-dir"])
|
|
git_dir = git_dir.decode("utf-8").strip()
|
|
if os.path.exists(os.path.join(git_dir, "shallow")):
|
|
# Hack. The rpmautospec doesn't support shallow clones:
|
|
# https://pagure.io/fedora-infra/rpmautospec/issue/227
|
|
logging.info("rpmautospec requires full clone => --unshallow")
|
|
check_call(["git", "fetch", "--unshallow"])
|
|
|
|
# Expand the %auto* macros, and create the separate spec file in the
|
|
# output directory.
|
|
output_spec = os.path.join(args.outputdir, spec_basename)
|
|
rpmautospec_expand(spec_abspath, output_spec)
|
|
result = output_spec
|
|
return result
|
|
|
|
|
|
def srpm(args, config):
|
|
"""
|
|
Using the appropriate dist-git configuration, generate source RPM
|
|
file. This requires running 'def sources()' first.
|
|
"""
|
|
_, distgit_config = get_distgit_config(config, args.forked_from)
|
|
|
|
cwd = os.getcwd()
|
|
sources_dir = os.path.join(cwd, distgit_config["sources"])
|
|
specs = os.path.join(cwd, distgit_config["specs"])
|
|
spec = get_spec(distgit_config)
|
|
|
|
mkdir_p(args.outputdir)
|
|
|
|
spec_abspath = os.path.join(specs, spec)
|
|
spec_abspath = handle_autospec(spec_abspath, spec, args)
|
|
|
|
if args.mock_chroot:
|
|
command = [
|
|
"mock", "--buildsrpm",
|
|
"-r", args.mock_chroot,
|
|
"--spec", spec_abspath,
|
|
"--sources", sources_dir,
|
|
"--resultdir", args.outputdir,
|
|
]
|
|
else:
|
|
command = [
|
|
"rpmbuild", "-bs", spec_abspath,
|
|
"--define", "dist %nil",
|
|
"--define", "_sourcedir {0}".format(sources_dir),
|
|
"--define", "_srcrpmdir {0}".format(args.outputdir),
|
|
"--define", "_disable_source_fetch 1",
|
|
]
|
|
|
|
if args.dry_run or 'COPR_DISTGIT_CLIENT_DRY_RUN' in os.environ:
|
|
log_cmd(command, comment="Dry run")
|
|
else:
|
|
check_call(command)
|
|
|
|
|
|
def clone(args, config):
|
|
"""
|
|
Automatically clone a package from a given DistGit instance
|
|
"""
|
|
distgit = config["instances"][args.dist_git]
|
|
parts = distgit.get("cloning_pattern_package_parts")
|
|
if parts:
|
|
expected = parts.split()
|
|
have = args.package.split("/")
|
|
if len(expected) != len(have):
|
|
raise RuntimeError(
|
|
"Package '{0}' has a wrong format, {1} "
|
|
"slash-separated parts are expected: {2}".format(
|
|
args.package, len(expected),
|
|
"/".join(expected),
|
|
))
|
|
|
|
clone_url = distgit["cloning_pattern"].format(package=args.package)
|
|
check_call([
|
|
"git", "clone", clone_url,
|
|
])
|
|
|
|
def _get_argparser():
|
|
parser = argparse.ArgumentParser(prog="dist-git-client",
|
|
description="""\
|
|
A simple, configurable python utility that is able to download sources from
|
|
various dist-git instances, and generate source RPMs.
|
|
The utility is able to automatically map the "origin" .git/config clone URL
|
|
(or --forked-from URL, if specified) to a corresponding dist-git instance
|
|
configured in /etc/dist-git-client directory.
|
|
""")
|
|
|
|
# main parser
|
|
default_confdir = os.environ.get("COPR_DISTGIT_CLIENT_CONFDIR",
|
|
"/etc/dist-git-client")
|
|
parser.add_argument(
|
|
"--configdir", default=default_confdir,
|
|
help="Where to load configuration files from")
|
|
parser.add_argument(
|
|
"--loglevel", default="info",
|
|
help="Python logging level, e.g. debug, info, error")
|
|
parser.add_argument(
|
|
"--forked-from",
|
|
metavar="CLONE_URL",
|
|
help=("Specify that this git clone directory is a dist-git repository "
|
|
"fork. If used, the default clone url detection from the "
|
|
".git/config file is disabled and CLONE_URL is used instead. "
|
|
"This specified CLONE_URL is used to detect the appropriate "
|
|
"lookaside cache pattern to download the sources."))
|
|
|
|
subparsers = parser.add_subparsers(
|
|
title="actions", dest="action")
|
|
|
|
# sources parser
|
|
subparsers.add_parser(
|
|
"sources",
|
|
description=(
|
|
"Using the 'url' .git/config, detect where the right DistGit "
|
|
"lookaside cache exists, and download the corresponding source "
|
|
"files."),
|
|
help="Download sources from the lookaside cache")
|
|
|
|
# srpm parser
|
|
srpm_parser = subparsers.add_parser(
|
|
"srpm",
|
|
help="Generate a source RPM",
|
|
description=(
|
|
"Generate a source RPM from the downloaded source files "
|
|
"by 'sources' command, please run 'sources' first."),
|
|
)
|
|
srpm_parser.add_argument(
|
|
"--outputdir",
|
|
default="/tmp",
|
|
help="Where to store the resulting source RPM")
|
|
srpm_parser.add_argument(
|
|
"--mock-chroot",
|
|
help=("Generate the SRPM in mock buildroot instead of on host. The "
|
|
"argument is passed down to mock as the 'mock -r|--root' "
|
|
"argument."),
|
|
)
|
|
srpm_parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help=("Don't produce the SRPM, just print the command which would be "
|
|
"otherwise called"),
|
|
)
|
|
|
|
clone_parser = subparsers.add_parser(
|
|
"clone",
|
|
help="Clone package from a DistGit source",
|
|
)
|
|
|
|
clone_parser.add_argument(
|
|
"--dist-git",
|
|
default="fedora",
|
|
help=("The DistGit ID as configured in /etc/dist-git-client/"),
|
|
)
|
|
|
|
clone_parser.add_argument(
|
|
"package",
|
|
default="fedora",
|
|
help=("Package name specification. For some DistGit "
|
|
"instances this consists of multiple parts separated "
|
|
"by slash, e.g. for '--dist-git=fedora-copr' use "
|
|
"'@copr/copr-dev/copr-cli'."),
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def unittests_init_git(files=None):
|
|
"""
|
|
Initialize .git/ directory. This method is only used for unit-testing.
|
|
"""
|
|
check_output(["git", "init", ".", "-b", "main"])
|
|
shutil.rmtree(".git/hooks")
|
|
check_output(["git", "config", "user.email", "you@example.com"])
|
|
check_output(["git", "config", "user.name", "Your Name"])
|
|
check_output(["git", "config", "advice.detachedHead", "false"])
|
|
|
|
for filename, content in files:
|
|
dirname = os.path.dirname(filename)
|
|
try:
|
|
os.makedirs(dirname)
|
|
except OSError:
|
|
pass
|
|
with open(filename, "w", encoding="utf-8") as filed:
|
|
filed.write(content)
|
|
check_output(["git", "add", filename])
|
|
|
|
check_output(["git", "commit", "-m", "initial"])
|
|
|
|
|
|
def main():
|
|
""" The entrypoint for the whole logic """
|
|
args = _get_argparser().parse_args()
|
|
logging.basicConfig(
|
|
level=getattr(logging, args.loglevel.upper()),
|
|
format="%(levelname)s: %(message)s",
|
|
)
|
|
config = _load_config(args.configdir)
|
|
|
|
try:
|
|
if args.action == "srpm":
|
|
srpm(args, config)
|
|
elif args.action == "clone":
|
|
clone(args, config)
|
|
else:
|
|
sources(args, config)
|
|
except RuntimeError as err:
|
|
logging.error("%s", err)
|
|
sys.exit(1)
|