urpm-tools/urpm-reposync.py
2012-09-05 15:45:44 +04:00

1332 lines
49 KiB
Python
Executable file

#!/usr/bin/python2.7
'''
Created on Jan 11, 2012
@author: flid
'''
import rpm
import argparse
import sys
import subprocess
import re
import os
from urllib2 import urlopen, HTTPError, URLError
import zlib
import glob
import shutil
import platform
import copy
import unittest
import gettext
gettext.install('urpm-tools')
ARCH = platform.machine()
downloaded_rpms_dir = '/tmp/urpm-reposync.rpms'
VERSION = "urpm-reposync 2.1"
def vprint(text):
'''Print the message only if verbose mode is on'''
if(command_line.verbose):
print(text)
def qprint(text):
'''Print the message only if quiet mode is off and 'printonly' is off'''
if command_line.printonly:
return
if(not command_line.quiet):
print(text)
def eprint(text, fatal=False, code=1):
'''Print the message to stderr. Exit if fatal'''
print >> sys.stderr, text
if (fatal):
exit(code)
def oprint(text):
'''Print the message only if quiet mode is off'''
if(not command_line.quiet):
print(text)
def get_command_output(command, fatal_fails=True):
'''Execute command using subprocess.Popen and return its stdout output string. If return code is not 0, print error message and exit'''
vprint("Executing command: " + str(command))
res = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = list(res.communicate())
if sys.stdout.encoding:
output[0] = output[0].decode(sys.stdout.encoding).encode("UTF-8")
output[1] = output[1].decode(sys.stdout.encoding).encode("UTF-8")
if(res.returncode != 0 and fatal_fails):
eprint(_("Error while calling command") + " '" + " ".join(command) + "'")
if(output[1] != None or output[0] != None):
eprint(_("Error message: \n")+ ((output[0].strip() + "\n") if output[0]!=None else "") +
(output[1].strip() if output[1]!=None else "") )
exit(1)
return [output[0], output[1], res.returncode]
def parse_command_line():
global command_line
arg_parser = argparse.ArgumentParser(description=_('reposync is used to synchronize a set of packages on the local computer with the remote repository.'))
arg_parser.add_argument('--include-media', '--media', action='append',nargs = '+', help=_("Use only selected URPM media"))
arg_parser.add_argument('--exclude-media', action='append',nargs = '+', help=_("Do not use selected URPM media"))
#arg_parser.add_argument('-x', '--exclude-packages', action='store',nargs = '+', help="Exclude package(s) by regex")
arg_parser.add_argument('-v', '--verbose', action='store_true', help=_("Verbose (print additional info)"))
arg_parser.add_argument('-q', '--quiet', action='store_true', help=_("Quiet operation. Senseless without --auto."))
arg_parser.add_argument('-a', '--auto', action='store_true', help=_("Do not ask questions, just do it!"))
arg_parser.add_argument('-p', '--printonly', action='store_true', help=_("Only print the list of actions to be done and do nothing more!"))
arg_parser.add_argument('-d', '--download', action='store_true', help=_("Only download the rpm files, but install or remove nothing."))
#arg_parser.add_argument('-n', '--noremove', action='store_true', help=_("Do not remove packages at all. If some installed package prevent another package from beeing updated - do not update it."))
arg_parser.add_argument('-r', '--remove', action='store_true', help=_("Remove all the packages which do not present in repository. By default, only some of them would be removed."))
arg_parser.add_argument('-c', '--check', action='store_true', help=_("Download packages and check wether they can be installed to your system, but do not install them."))
arg_parser.add_argument('-k', '--nokernel', action='store_true', help=_("Do nothing with kernels."))
arg_parser.add_argument('--runselftests', action='store_true', help=_("Run self-tests end exit."))
arg_parser.add_argument('--detailed', action='store_true', help=_("Show detailed information about packages are going to be removed or installed (why does it have to be done)"))
command_line = arg_parser.parse_args(sys.argv[1:])
if(command_line.quiet and not command_line.auto):
eprint(_("It's senseless to use --quiet without --auto!"), fatal=True, code=2)
if command_line.verbose:
command_line.detailed = True
cmd = ['urpmq']
class MediaSet(object):
def __init__(self):
global cmd
self.urls = []
self.media = {}
self.by_url = {}
vprint("Loading media urls...")
lines = get_command_output(cmd + ["--list-url", "--list-media", 'active'])[0].strip().split("\n")
for line in lines:
parts = line.split(" ")
medium = ' '.join(parts[:-1])
url = parts[-1]
if(url.endswith("/")):
url = url[:-1]
if(url.find('/') != -1):
self.media[medium] = url
self.by_url[parts[-1]] = medium
self.urls.append(url)
vprint("Media urls: " + str(self.urls))
class NEVR:
EQUAL = rpm.RPMSENSE_EQUAL #8
GREATER = rpm.RPMSENSE_GREATER #4
LESS = rpm.RPMSENSE_LESS #2
#re_ver = re.compile('^([\d\.]+:)?([\w\d\.\-\[\]]+)(:[\d\.]+)?$')
re_dep_ver = re.compile('^([^ \[\]]+)\[([\>\<\=\!]*) ([^ ]+)\]$')
re_dep = re.compile('^([^ \[\]]+)$')
types = {None: 0,
'==' : EQUAL,
'' : EQUAL,
'=' : EQUAL,
'>=' : EQUAL|GREATER,
'<=' : EQUAL|LESS,
'>' : GREATER,
'<' : LESS,
'!=' : LESS|GREATER,
'<>' : LESS|GREATER}
def __init__(self, N, EVR, DE=None, DT=None, FL=None, E=None):
self.N = N
self.EVR = EVR
self.DE = DE
self.DT = DT
self.FL = FL
self.E = E
self.VR = EVR
if E:
if EVR.startswith(E + ':'):
self.VR = EVR[len(E)+1:]
else:
self.EVR = E + ':' + self.EVR
#try to get E
if not self.E and self.EVR and self.EVR.find(':') != -1:
items = self.EVR.split(':')
if items[0].find('.') == -1 and items[0].find('-') == -1:
self.E = items[0]
if not self.E and self.EVR:
self.E = '0'
self.EVR = '0:' + self.EVR
if self.DE == 'None':
self.DE = None
def __str__(self):
if self.FL:
for t in NEVR.types:
if not t:
continue
if NEVR.types[t] == self.FL:
return "%s %s %s" % (self.N, t, self.EVR)
if self.EVR:
return "%s == %s" % (self.N, self.EVR)
return "%s" % (self.N)
def __repr__(self):
return self.__str__()
def __eq__(self, val):
if not isinstance(val, NEVR):
raise Exception("Internal error: comparing between NEVR and " + str(type(val)))
return str(self) == str(val)
def __ne__(self, val):
return not (self == val)
@staticmethod
def from_depstring(s, DE_toremove=None):
s = s.replace('[*]', '')
if DE_toremove:
res = NEVR.re_dep_ver.match(s)
if res:
(name, t, val) = res.groups()
if val.endswith(':' + DE_toremove):
val = val[:-(len(DE_toremove) + 1)]
EVR = '%s[%s %s]' % (name, t, val)
res = NEVR.re_dep.match(s)
if res:
return NEVR(res.group(1), None)
res = NEVR.re_dep_ver.match(s)
if not res:
raise Exception('Incorrect requirement string: ' + s)
(name, t, val) = res.groups()
return NEVR(name, val, FL=NEVR.types[t])
re_version = re.compile("(\.)?((alpha)|(cvs)|(svn)|(r))?\d+((mdv)|(mdk)|(mnb))")
@staticmethod
def from_filename(rpmname, E=None):
''' Returns [name, version] for given rpm file or package name '''
suffix = ['.x86_64', '.noarch'] + ['.i%s86' % i for i in range(3,6)]
for s in suffix:
if(rpmname.endswith(s)):
rpmname = rpmname[:-len(s)]
sections = rpmname.split("-")
if(NEVR.re_version.search(sections[-1]) == None):
name = sections[:-3]
version = sections[-3:-1]
else:
name = sections[:-2]
version = sections[-2:]
return NEVR("-".join(name), "-".join(version), FL=NEVR.EQUAL, E=E)
def satisfies(self, val):
if self.N != val.N:
return False
if self.EVR == None or val.EVR == None:
return True
(pname, pt, pval) = (self.N, self.FL, self.EVR)
(rname, rt, rval) = (val.N, val.FL, val.EVR)
def cut_part(seperator, val1, val2):
if val1 and val2 and val1.count(seperator) != val2.count(seperator):
n = max(val1.count(seperator), val2.count(seperator))
val1 = seperator.join(val1.split(seperator)[:n])
val2 = seperator.join(val2.split(seperator)[:n])
return (val1, val2)
(rval, pval) = cut_part(':', rval, pval)
(rval, pval) = cut_part('-', rval, pval)
res = rpm.evrCompare(rval, pval)
if res == 1: # >
if pt & NEVR.GREATER:
return True
elif pt & NEVR.LESS:
if rt & NEVR.LESS:
return True
else:
return False
else:
if rt & NEVR.LESS:
return True
else:
return False
elif res == 0:
if rt & NEVR.EQUAL and pt & NEVR.EQUAL:
return True
if rt & NEVR.LESS and pt & NEVR.LESS:
return True
if rt & NEVR.GREATER and pt & NEVR.GREATER:
return True
return False
else: # <
if rt & NEVR.GREATER:
return True
elif rt & NEVR.LESS:
if pt & NEVR.LESS:
return True
else:
return False
else:
if pt & NEVR.LESS:
return True
else:
return False
class PackageSet:
tags = ['provides','requires','obsoletes','suggests', 'conflicts']
alltags = tags + ['nevr', 'arch']
def __init__(self):
self.what = {}
self.packages = {}
def load_from_system(self):
qprint(_("Loading the list of installed packages..."))
ts = rpm.TransactionSet()
mi = ts.dbMatch()
for tag in PackageSet.tags:
self.what[tag] = {}
for h in mi:
name = h['name']
if(name == 'gpg-pubkey'):
continue
if(name not in self.packages):
self.packages[h['name']] = {}
else:
qprint(_("Duplicating ") + name + '-' + h['version'] + '-' + h['release'])
qprint(_("Already found: ") + name + '-' + self.packages[name]["nevr"].EVR)
E = str(h['epoch'])
V = h['version']
R = h['release']
DE = h['distepoch']
DT = h['disttag']
if E == None or E == 'None':
E = '0'
EVR = "%s:%s-%s" % (E, V, R)
nevr = NEVR(name, EVR, FL=NEVR.EQUAL, DE=DE, DT=DT, E=E)
self.packages[name]['nevr'] = nevr
self.packages[name]['arch'] = h['arch']
for tag in PackageSet.tags:
if tag not in self.packages[name]:
self.packages[name][tag] = []
dss = h.dsFromHeader(tag[:-1] + 'name')
for s in dss:
fl = s.Flags()
#undocumented flag for special dependencies
if fl & 16777216:
continue
fl = fl % 16
_evr = s.EVR()
if _evr == '':
evr = NEVR(s.N(), None, FL=fl)
else:
evr = NEVR(s.N(), _evr, FL=fl)
self.packages[name][tag].append(evr)
if evr.N not in self.what[tag]:
self.what[tag][evr.N] = []
self.what[tag][evr.N].append((name, evr))
def load_from_repository(self):
url_by_synthesis_url = {}
global fields
def get_synthesis_by_url(url):
if url.startswith('file://'):
url = url[6:]
if url.startswith('/'):
medium = ms.by_url[url]
return '/var/lib/urpmi/%s/synthesis.hdlist.cz' % medium
else:
return url + "/media_info/synthesis.hdlist.cz"
medium_by_synth = {}
synthesis_lists = []
for url in ms.urls:
synth = get_synthesis_by_url(url)
synthesis_lists.append(synth)
url_by_synthesis_url[synth] = url
medium_by_synth[synth] = ms.by_url[url]
def clear_data():
'''Clears the data of the current package from 'fields' dictionary'''
global fields
fields = {"provides":[], "requires":[], "obsoletes":[], "suggests":[],
"conflicts":[], "info":[], "summary":[]}
arches32 = ['i%d86' for i in range(3,6)]
for tag in PackageSet.tags:
self.what[tag] = {}
#the following code is awful, I know. But it's easy-to-understand and clear.
# don't like it - write better and send me :)
for synthesis_list in synthesis_lists:
try:
#print synthesis_list
qprint(_("Processing medium ") + medium_by_synth[synthesis_list] + "...")
vprint(synthesis_list)
if(synthesis_list.startswith("http://") or synthesis_list.startswith("ftp://")):
r = urlopen(synthesis_list)
s = r.read()
r.close()
elif(synthesis_list.startswith("rsync://")):
tmppath = '/tmp/urpm-reposync.synthesis_lists'
if (not os.path.exists(tmppath)):
os.mkdir(tmppath)
filename = tmppath + '/' + os.path.basename(synthesis_list)
os.system("rsync --copy-links %s %s 1>/dev/null 2>&1" % (synthesis_list, filename))
r = open(filename)
s = r.read()
r.close()
shutil.rmtree(tmppath)
elif(synthesis_list.startswith("/")): #local file
if not os.path.exists(synthesis_list):
eprint(_('Could not read synthesis file. (File %s not found)') % synthesis_list)
continue
r = open(synthesis_list)
s = r.read()
r.close()
res = subprocess.Popen(['gzip', '-d'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = res.communicate(s)
clear_data()
for line in output[0].split('\n'):
if(line == ''): # there can be empty lines
continue
items = line.split("@")
data = [x.strip() for x in items[2:]]
fields[items[1]] = data
if(items[1] == "info"):
rpmname = items[2]
size = int(items[4])
nevr = NEVR.from_filename(items[2], E=items[3])
nevr.E = items[3]
disttagepoch = '-'
if(len(items)>6):
disttagepoch = items[6]
nevr.DT = items[6]
if(len(items)>7):
disttagepoch += items[7]
nevr.DE = items[7]
arch = items[2].split('.')[-1]
if arch in arches32 and ARCH in arches:
arch = ARCH
in_repo = nevr.N in self.packages
new_arch_correct = arch == ARCH
if in_repo:
if nevr.DE == self.packages[nevr.N]['nevr'].DE:
ver_newer = rpm.evrCompare(nevr.EVR, self.packages[nevr.N]['nevr'].EVR) == 1
else:
ver_newer = (nevr.DE > self.packages[nevr.N]['nevr'].DE)
old_arch_correct = self.packages[nevr.N]['arch'] == ARCH
else:
ver_newer = None
old_arch_correct = None
toinst = not in_repo or (not old_arch_correct and new_arch_correct) or \
(ver_newer and old_arch_correct == new_arch_correct)
if toinst:
#remove old data
if nevr.N in self.packages:
for tag in PackageSet.tags:
for dep in self.packages[nevr.N][tag]:
self.what[tag][dep.N].remove((nevr.N, dep))
else:
self.packages[nevr.N] = {}
self.packages[nevr.N]['nevr'] = nevr
self.packages[nevr.N]["arch"] = arch
self.packages[nevr.N]["synthesis_list"] = synthesis_list
self.packages[nevr.N]["filename"] = rpmname
self.packages[nevr.N]["size"] = size
for tag in PackageSet.tags:
self.packages[nevr.N][tag] = []
for item in fields[tag]:
if item == '':
continue
dep = NEVR.from_depstring(item, DE_toremove=nevr.DE)
self.packages[nevr.N][tag].append(dep)
if dep.N not in self.what[tag]:
self.what[tag][dep.N] = []
self.what[tag][dep.N].append((nevr.N, dep))
self.packages[nevr.N]['medium'] = medium_by_synth[synthesis_list]
clear_data()
except (HTTPError,URLError):
eprint(_("File can not be processed! Url: ") + synthesis_list)
def whattag(self, tag, val):
if val.N not in self.what[tag]:
return []
found = []
for (pkg, dep) in self.what[tag][val.N]:
if dep.satisfies(val):
found.append(pkg)
return found
def whattag_revert(self, tag, val):
if val.N not in self.what[tag]:
return []
found = []
for (pkg, dep) in self.what[tag][val.N]:
if val.satisfies(dep):
found.append(pkg)
return found
def whatprovides(self, val):
return self.whattag('provides', val)
def whatobsoletes(self, val):
return self.whattag_revert('obsoletes', val)
def whatrequires(self, val):
return self.whattag_revert('requires', val)
def whatconflicts(self, val):
return self.whattag_revert('conflicts', val)
def whatrequires_pkg(self, pkg):
found = []
for req in self.packages[pkg]['provides']:
found += [(d, req) for d in self.whatrequires(req)]
return found
to_update = []
to_downgrade = []
to_remove = []
to_remove_pre = []
to_append = []
unresolved = {}
to_append_bysource = {}
to_remove_problems = {}
to_remove_saved = []
files_to_download = []
#If one of package deps matches this regexp and this package is
#not in the repository - don't try to save this package.
to_remove_force_list = [
NEVR.from_depstring("plymouth(system-theme)"),
NEVR.from_depstring("mandriva-theme-screensaver"),
]
flags = {rpm.RPMCALLBACK_UNKNOWN:'RPMCALLBACK_UNKNOWN',
rpm.RPMCALLBACK_INST_PROGRESS:'RPMCALLBACK_INST_PROGRESS',
rpm.RPMCALLBACK_INST_START:'RPMCALLBACK_INST_START',
rpm.RPMCALLBACK_INST_OPEN_FILE:'RPMCALLBACK_INST_OPEN_FILE',
rpm.RPMCALLBACK_INST_CLOSE_FILE:'RPMCALLBACK_INST_CLOSE_FILE',
rpm.RPMCALLBACK_TRANS_PROGRESS:'RPMCALLBACK_TRANS_PROGRESS',
rpm.RPMCALLBACK_TRANS_START:'RPMCALLBACK_TRANS_START',
rpm.RPMCALLBACK_TRANS_STOP:'RPMCALLBACK_TRANS_STOP',
rpm.RPMCALLBACK_UNINST_PROGRESS:'RPMCALLBACK_UNINST_PROGRESS',
rpm.RPMCALLBACK_UNINST_START:'RPMCALLBACK_UNINST_START',
rpm.RPMCALLBACK_UNINST_STOP:'RPMCALLBACK_UNINST_STOP',
rpm.RPMCALLBACK_REPACKAGE_PROGRESS:'RPMCALLBACK_REPACKAGE_PROGRESS',
rpm.RPMCALLBACK_REPACKAGE_START:'RPMCALLBACK_REPACKAGE_START',
rpm.RPMCALLBACK_REPACKAGE_STOP:'RPMCALLBACK_REPACKAGE_STOP',
rpm.RPMCALLBACK_UNPACK_ERROR:'RPMCALLBACK_UNPACK_ERROR',
rpm.RPMCALLBACK_CPIO_ERROR:'RPMCALLBACK_CPIO_ERROR',
rpm.RPMCALLBACK_SCRIPT_ERROR:'RPMCALLBACK_SCRIPT_ERROR'}
rpmtsCallback_fd = None
file_id = 0
current_file = "NotSet"
def runCallback(reason, amount, total, key, client_data):
global i, file_id, rpmtsCallback_fd, current_file
if reason in flags:
fl = flags[reason]
#if not fl.endswith('PROGRESS'):
vprint ("rpm_callback was called: %s, %s, %s, %s, %s" %(fl, str(amount), str(total),
str(key), str(client_data)))
if reason == rpm.RPMCALLBACK_INST_OPEN_FILE:
vprint ("Opening file: " + key)
current_file = key
file_id += 1
qprint("[%d/%d] %s" % (file_id, len(files_to_download), os.path.basename(key)))
rpmtsCallback_fd = os.open(key, os.O_RDONLY)
return rpmtsCallback_fd
if reason == rpm.RPMCALLBACK_UNINST_START:
qprint(_("Removing %s") % os.path.basename(key))
elif reason == rpm.RPMCALLBACK_INST_START:
vprint ("Closing file")
os.close(rpmtsCallback_fd)
elif reason == rpm.RPMCALLBACK_UNPACK_ERROR or \
reason == rpm.RPMCALLBACK_CPIO_ERROR or \
reason == rpm.RPMCALLBACK_SCRIPT_ERROR:
eprint(_('urpm-reposync: error in package %s. Data: %(data)s') %{ 'cur_file': current_file, 'data': "%s; %s, %s, %s, %s" % (flags[reason], str(amount),
str(total), str(key), str(client_data))})
def get_problem_dependencies(pkg):
''' Get all the packages to satisfy dependencies not provided by some installed package or by some action '''
global actions
output = []
for req in repository.packages[pkg]['requires']: # for every package requirement
pkgs_inst = installed.whatprovides(req)
if pkgs_inst:
continue #dependency is satisfied by one of installed packages
#look for dependency in 'actions'
pkgs_rep = repository.whatprovides(req)
for p in pkgs_rep[:]:
if p not in actions:
pkgs_rep.remove(p)
if not pkgs_rep:
output.append(req)
vprint("Problem deps for %s: %s" %(pkg, str(output)))
return output
def resolve_dependency(dep, pkg):
res = repository.whatprovides(dep)
if command_line.nokernel:
for p in res[:]:
if p.startswith('kernel'):
res.remove('kernel')
if not res:
if pkg not in unresolved:
unresolved[pkg] = []
if str(dep) not in unresolved[pkg]:
unresolved[pkg].append(str(dep))
return None
res = sorted(res)
vprint("Resolved dependencies: " + str(res))
if not pkg in to_append_bysource:
to_append_bysource[pkg] = []
to_append_bysource[pkg].append(res[0])
return res[0]
def resolve_dep_while_emulation(requirement, package):
#try to resolve the dep in repository
pkgs = repository.whatprovides(requirement)
found = False
for p in pkgs:
if p in actions:
found = True
break
if not found and pkgs:
vprint('NEW ACTION: ' + pkgs[0])
actions.append(pkgs[0])
if not package in to_append_bysource:
to_append_bysource[package] = []
to_append_bysource[package].append(pkgs[0])
def emulate_install(pkg):
global actions, not_provided_packages, conflicting_packages
vprint('Emulating package installation: ' + pkg)
emptied = []
for p in not_provided_packages:
for req in not_provided_packages[p][:]:
for prov in repository.packages[pkg]['provides']:
if prov.satisfies(req):
vprint("Missing dep satisfied: %s -- %s" % (p, req))
not_provided_packages[p].remove(req)
if not not_provided_packages[p]:
emptied.append(p)
break
for p in emptied:
not_provided_packages.pop(p)
conflicts = False
for confl in repository.packages[pkg]['conflicts']:
res = installed.whatprovides(confl)
if res:
conflicts = True
conflicting_packages.append( (pkg, res) )
vprint("New conflict: %s, %s" % (str(pkg), str(res)))
for prov in repository.packages[pkg]['provides']:
res = installed.whatconflicts(prov)
if res:
conflicts = True
conflicting_packages.append( (res, pkg) )
vprint("New conflict: %s, %s" % (str(res), str(pkg)))
if conflicts:
return
url = ms.media[repository.packages[pkg]['medium']]
url += '/' + repository.packages[pkg]['filename'] + '.rpm'
files_to_download.append(url)
if pkg not in to_update and pkg not in to_downgrade and pkg not in to_append:
to_append.append(pkg)
if pkg not in installed.packages:
installed.packages[pkg] = {}
for tag in PackageSet.alltags:
installed.packages[pkg][tag] = repository.packages[pkg][tag]
for tag in PackageSet.tags:
deps = installed.packages[pkg][tag]
for dep in deps:
if dep.N not in installed.what[tag]:
installed.what[tag][dep.N] = []
installed.what[tag][dep.N].append((pkg,dep))
actions.remove(pkg)
for req in repository.packages[pkg]['requires']:
provs = installed.whatprovides(req)
if not provs: # nothing provides it
if pkg not in not_provided_packages:
not_provided_packages[pkg] = []
vprint("New missing dep: %s -- %s" % (pkg, req))
not_provided_packages[pkg].append(req)
resolve_dep_while_emulation(req, pkg)
def emulate_remove(pkg, updating=False):
global not_provided_packages
vprint("Emulating package removing: " + pkg)
if pkg not in installed.packages:
vprint("Nothing to remove")
return
if pkg in not_provided_packages:
not_provided_packages.pop(pkg)
for tag in PackageSet.tags:
deps = installed.packages[pkg][tag]
for dep in deps:
installed.what[tag][dep.N].remove((pkg,dep))
P = copy.deepcopy(installed.packages[pkg])
installed.packages[pkg] = {}
installed.packages[pkg]['old_package'] = P
if not actions: #do nothing while initial packages removing
return
for dep in installed.packages[pkg]['old_package']['provides']:
if dep.N not in installed.what['requires']:
continue
for (package, requirement) in installed.what['requires'][dep.N]:
if dep.satisfies(requirement) and not installed.whatprovides(requirement):
if package not in not_provided_packages:
not_provided_packages[package] = []
vprint("New missing dep: %s -- %s" % (package, requirement))
not_provided_packages[package].append(requirement)
resolve_dep_while_emulation(requirement, package)
def have_to_be_removed(pkg):
to_remove_problems[pkg] = []
for dep in installed.packages[pkg]['requires']:
res = installed.whatprovides(dep)
if not res:
to_remove_problems[pkg].append(_("\tRequires %s, which will not be installed.") % (str(dep) ))
continue
for dep in installed.packages[pkg]['provides']:
res = installed.whatconflicts(dep)
if res:
to_remove_problems[pkg].append(_("\t%s conflicts with it" %(', '.join(res))))
for dep in installed.packages[pkg]['conflicts']:
res = installed.whatprovides(dep)
if res:
to_remove_problems[pkg].append(_("\tIt conflicts with %s" %(', '.join(res))))
return to_remove_problems[pkg]
def process_packages():
global actions, to_remove, not_provided_packages, conflicting_packages
qprint("Computing actions list...")
if command_line.remove:
for pkg in to_remove_pre:
emulate_remove(pkg)
to_remove.append(pkg)
actions = to_update + to_downgrade
actions_backup = actions[:]
conflicting_packages = []
problems = {}
changed = True
while changed:
i = 0
l = len(actions)
changed = False
for act in actions[:]:
i = i + 1
vprint('[%d/%d] %s' % (i, l, act))
prob = get_problem_dependencies(act)
problems[act] = []
for p in prob:
problems[act].append((p, resolve_dependency(p, act)))
if problems[act]:
vprint ("\nPROBLEM: %s: %s" % (act, problems[act]))
if not problems[act]:
emulate_remove(act, updating=True)
emulate_install(act)
changed = True
for pr in problems:
if len(problems[pr])>0:
for prob, resolved in problems[pr]:
if resolved:
vprint ("Package '%s' requires '%s' via dependency '%s'" % (pr, resolved, prob))
changed = True
if resolved not in actions:
actions.append(resolved)
if not command_line.remove:
for pkg in to_remove_pre[:]:
vprint("Checking wether to remove " + pkg)
res = have_to_be_removed(pkg)
if res:
vprint("%s have to be removed because:" % (pkg))
for item in res:
vprint(str(item))
emulate_remove(pkg)
if not pkg in to_remove:
to_remove.append(pkg)
if pkg in to_remove_saved:
to_remove_saved.remove(pkg)
changed = True
to_remove_pre.remove(pkg)
else:
if pkg not in to_remove_saved:
to_remove_saved.append(pkg)
have_to_exit = False
if not_provided_packages:
for p_name in not_provided_packages:
eprint('>>>ERROR: Package %s has unsatisfied dependencies: %s' %
(p_name, str(not_provided_packages[p_name])))
have_to_exit = True
vprint ('Actions left: ' + str(actions))
if actions:
for pkg in unresolved:
eprint(">>>ERROR: %s requires %s" %(pkg, ', '.join(unresolved[pkg])))
have_to_exit = True
if conflicting_packages:
def format_conflicts(a):
if type(a) is list:
return '[%s]' % ', '.join(a)
else:
return str(a)
for (a, b) in conflicting_packages:
a_text = format_conflicts(a)
b_text = format_conflicts(b)
eprint(">>>ERROR: %s conflicts with %s" %(a_text, b_text))
have_to_exit = True
if have_to_exit:
eprint(_(">>> Contact repository maintaiers and send them this information, please."), fatal=True, code=4)
def download_packages():
if not files_to_download:
return
qprint(_('Downloading files...'))
l = len(files_to_download)
i = 0
for url in files_to_download:
i += 1
qprint("[%d/%d] %s " %(i, l, os.path.basename(url)))
path = os.path.join(downloaded_rpms_dir, os.path.basename(url))
if os.path.isfile(path):
continue
try:
if(url.startswith('/')): # local file
shutil.copyfile(url, path)
else:
fd = urlopen(url)
file = open(path, 'w')
file.write(fd.read())
file.close()
fd.close()
except IOError, e:
eprint("Can not download file %s: %s" % (url, str(e)), fatal=True, code=5)
def install_packages():
def readRpmHeader(ts, filename):
vprint("Reading header of " + filename)
fd = os.open(filename, os.O_RDONLY)
h = ts.hdrFromFdno(fd)
os.close(fd)
return h
qprint(_("Generating transaction..."))
ts = rpm.TransactionSet()
# turn all the checks off. They can cause segfault in RPM for now.
ts.setVSFlags(rpm.RPMVSF_NOHDRCHK|rpm.RPMVSF_NOSHA1HEADER|rpm.RPMVSF_NODSAHEADER|rpm.RPMVSF_NORSAHEADER|rpm.RPMVSF_NOMD5|rpm.RPMVSF_NODSA|rpm.RPMVSF_NORSA|rpm._RPMVSF_NODIGESTS|rpm._RPMVSF_NOSIGNATURES)
ts.setProbFilter(rpm.RPMPROB_FILTER_OLDPACKAGE)
#flags for ts.run execution. We need it to speed the process up
ts.setFlags(rpm.RPMTRANS_FLAG_NOFDIGESTS)
for file in files_to_download:
f = os.path.join(downloaded_rpms_dir, os.path.basename(file))
h = readRpmHeader(ts, f)
ts.addInstall(h, f, 'u')
for pkg in to_remove:
ts.addErase(pkg)
qprint(_("Checking dependencies..."))
def format_dep(dep):
((name, ver, rel), (namereq, verreq), needsFlags, suggestedPackage, sense) = dep
vprint (dep)
t = _('requires')
if sense & 1:
t = _('conflicts with')
s = ''
if needsFlags & rpm.RPMSENSE_LESS: #2
s = '<'
if needsFlags & rpm.RPMSENSE_EQUAL: #8
s = '='
if needsFlags & rpm.RPMSENSE_GREATER: #4
s = '>'
if needsFlags & rpm.RPMSENSE_NOTEQUAL: #6
s = '!='
if(verreq):
verreq = '[%s %s]' % (s, verreq)
else:
verreq = ''
return _("Package %(name)s-%(ver)s-%(rel)s %(t)s %(namereq)s%(verreq)s") % \
{'name': name,'ver': ver,'rel': rel,'namereq': namereq,'verreq': verreq, 't': t}
unresolved_dependencies = ts.check()
if(unresolved_dependencies):
eprint(_("There are some unresolved dependencies: ") )
for dep in unresolved_dependencies:
eprint("\t" + format_dep(dep))
eprint(_("Packages can not be installed. Please, contact urpm-tools developers and provide this output."), fatal=True, code=3)
else:
qprint(_("No errors found in transaction"))
ts.order()
if command_line.check:
return
qprint(_("Running transaction..."))
ts.run(runCallback, 1)
def check_media_set():
def try_solve_lib_arch(pkgname):
'''if you have lib64A installed, but there is only libA in repository, it have not to be removed. And vice versa'''
if not pkgname.startswith('lib'):
return None
if pkgname in repository.packages:
return None
is64 = (pkgname[3:5] == '64')
is32 = not is64
if is32:
l32 = pkgname
l64 = 'lib64' + pkgname[3:]
else:
l32 = 'lib' + pkgname[5:]
l64 = pkgname
e32 = (l32 in repository.packages)
e64 = (l64 in repository.packages)
if(is32 and e64): # you have 32bit version installed, but there is only 64 bit version in repository
if(ARCH=="x86_64"):
return l64
else:
return # 64bit library can not work in 32bit system
if(is64 and e32):
return l32
found = []
for pkg in to_remove:
res = try_solve_lib_arch(pkg)
if res:
found.append((pkg, res))
vprint("The list of libs with incorrect arch in repository: " + str(found))
if found:
qprint(_("WARNING: Some libraries are going to be removed because there are only the packages with the other architecture in the repository. Maybe you missed media with the correct architecture?"))
def print_actions():
if(command_line.quiet):
return
def count_total_size():
sum = 0
for pkg in to_append + to_update + to_downgrade:
sum += repository.packages[pkg]['size']
return sum
def bytes_to_human_readable(bytes):
bytes = float(bytes)
if bytes >= 1099511627776:
terabytes = bytes / 1099511627776
size = '%.2fT' % terabytes
elif bytes >= 1073741824:
gigabytes = bytes / 1073741824
size = '%.2fG' % gigabytes
elif bytes >= 1048576:
megabytes = bytes / 1048576
size = '%.2fM' % megabytes
elif bytes >= 1024:
kilobytes = bytes / 1024
size = '%.2fK' % kilobytes
else:
size = '%.2fb' % bytes
return size
media = ms.media.keys()
def print_pkg_list(pkglist, tag):
media_contents = {}
for medium in media:
for pkg in pkglist:
if(repository.packages[pkg]['medium'] == medium):
if( medium not in media_contents):
media_contents[medium] = []
media_contents[medium].append(pkg)
qprint(" %-30s %-15s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('New Version'), _('Arch')))
for medium in media_contents:
qprint("(%s %s)" %( _("medium"), medium))
for pkg in sorted(media_contents[medium]):
nevri = installed.packages[pkg]['nevr']
nevrr = repository.packages[pkg]['nevr']
if(nevri.E == nevrr.E):
veri = nevri.VR
verr = nevrr.VR
else:
veri = nevri.EVR
verr = nevrr.EVR
if nevri.DE and nevrr.DE and nevri.DE != nevrr.DE:
veri += '(%s%s) ' % ( nevri.DT, nevri.DE)
verr += '(%s%s) ' % ( nevrr.DT, nevrr.DE)
oprint("%s %-30s %-15s %-15s %-10s" %(prefix, pkg, veri, verr, installed.packages[pkg]['arch']))
qprint('')
prefix = ''
if to_update:
qprint(_("The following packages are going to be upgraded:"))
if command_line.printonly:
prefix = 'U'
print_pkg_list(to_update, 'U')
if to_downgrade:
qprint(_("The following packages are going to be downgraded:"))
if command_line.printonly:
prefix = 'D'
print_pkg_list(to_downgrade, 'D')
if to_append:
qprint(_("Additional packages are going to be installed:"))
qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Version'), _('Arch')))
if command_line.printonly:
prefix = 'A'
def get_append_sources(pkg):
out = []
for item in to_append_bysource:
if pkg in to_append_bysource[item]:
out.append(item)
return out
for pkg in to_append:
nevr = repository.packages[pkg]['nevr']
oprint("%s %-30s %-15s %-10s" %(prefix, pkg, nevr.VR, repository.packages[pkg]['arch']))
if command_line.detailed:
qprint(_("\tRequired by %s") % (", ".join(get_append_sources(pkg))))
qprint('')
if to_remove:
qprint(_("The following packages are going to be removed:"))
qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('Arch')))
if command_line.printonly:
prefix = 'R'
for pkg in sorted(to_remove):
nevr = installed.packages[pkg]['nevr']
oprint("%s %-30s %-15s %-10s" %(prefix, pkg, nevr.VR, installed.packages[pkg]['arch']))
if command_line.detailed and not command_line.remove:
for problem in sorted(to_remove_problems[pkg]):
qprint(problem)
qprint('')
if to_remove_saved and command_line.detailed:
qprint(_("Packages which do not present in repositories, but do not have to be removed (will be saved):"))
qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('Arch')))
if command_line.printonly:
prefix = 'S'
for pkg in sorted(to_remove_saved):
oprint("%s %-30s %-15s %-10s" %(prefix, pkg, installed.packages[pkg]['nevr'].VR, installed.packages[pkg]['arch']))
qprint(_("%d packages are going to be downloaded and installed.") % len(files_to_download))
qprint(_("%d packages are going to be removed.") % len(to_remove))
qprint(_("%s will be downloaded.") % bytes_to_human_readable(count_total_size()))
def have_to_be_forced(pkg):
for dep in installed.packages[pkg]['provides']:
for f in to_remove_force_list:
if dep.satisfies(f):
vprint("Package %s have been forced to removal." % pkg)
return f
return None
def Main():
global cmd, resolve_source, installed, repository, include_media, exclude_media
global not_provided_packages, installed_backup, ms, actions
resolve_source = False # variable that makes download_rpm to download resolved build-deps
cmd = ['urpmq']
include_media = []
actions = []
if(command_line.include_media != None):
media = ''
for i in command_line.include_media:
media = ",".join([media]+i)
for ii in i:
include_media.append(ii)
cmd = cmd + ['--media', media[1:]]
exclude_media = []
if(command_line.exclude_media != None):
media = ''
for i in command_line.exclude_media:
media = ",".join([media]+i)
for ii in i:
exclude_media.append(ii)
cmd = cmd + ['--excludemedia', media[1:]]
ms = MediaSet()
installed = PackageSet()
installed.load_from_system()
repository = PackageSet()
repository.load_from_repository()
installed_backup = copy.deepcopy(installed)
not_provided_packages = {}
for inst in installed.packages:
if command_line.nokernel and inst.startswith('kernel'):
continue
if inst not in repository.packages:
if command_line.remove:
to_remove_pre.append(inst)
else:
res = have_to_be_forced(inst)
if res:
emulate_remove(inst)
to_remove.append(inst)
to_remove_problems[inst]=[_('\tForced to be removed dew to "%s" policy.') % str(res)]
else:
to_remove_pre.append(inst)
continue
#compare distepochs first
if installed.packages[inst]["nevr"].DE == None or repository.packages[inst]["nevr"].DE == None:
res_epoch = 0
else:
res_epoch = rpm.evrCompare(installed.packages[inst]["nevr"].DE, repository.packages[inst]["nevr"].DE)
if res_epoch == -1:
to_update.append(inst)
elif res_epoch == 1:
to_downgrade.append(inst)
else: # disteposhs are the same
#now versions can be compared
res = rpm.evrCompare(installed.packages[inst]["nevr"].EVR, repository.packages[inst]["nevr"].EVR)
if(res == -1):
to_update.append(inst)
elif res == 1:
to_downgrade.append(inst)
else: # res == 0
pass # do nothing
process_packages()
if len(to_update + to_downgrade + to_remove) == 0:
qprint(_("Nothing to do"))
return
installed = installed_backup
print_actions()
if command_line.printonly:
return
vprint("Installed packages: " + str(len(installed.packages)))
vprint("Repository packages: " + str(len(repository.packages)))
vprint("Packages that need some actions: " + str(len(to_update) + len(to_downgrade) + len(to_remove) + len(to_append)))
check_media_set()
if(not command_line.auto):
sys.stdout.write(_("Do you want to proceed? (y/n): "))
sys.stdout.flush()
while(True):
res = sys.stdin.readline()
res = res.strip()
if res in [_('y'), _('yes'), 'y', 'yes']:
break
if res in [_('n'), _('no'), 'n', 'no']:
exit(0)
download_packages()
if command_line.download:
return
install_packages()
if not os.path.exists(downloaded_rpms_dir):
os.makedirs(downloaded_rpms_dir)
class Tests(unittest.TestCase):
def setUp(self):
self.p1 = NEVR.from_depstring('a[== 1.0]')
self.p2 = NEVR.from_depstring('a[> 1.0]')
self.p3 = NEVR.from_depstring('a[< 1.0]')
self.p4 = NEVR.from_depstring('a[>= 1.0]')
self.p5 = NEVR.from_depstring('b[== 1.0]')
self.r1 = NEVR.from_depstring('a[== 1.0]')
self.r2 = NEVR.from_depstring('a[== 1.1]')
self.r3 = NEVR.from_depstring('a[<= 1.1]')
self.r4 = NEVR.from_depstring('a[>= 1.1]')
self.r5 = NEVR.from_depstring('a[< 0.9]')
self.r6 = NEVR.from_depstring('a[> 0.9]')
self.r7 = NEVR.from_depstring('a[< 1.0]')
self.r8 = NEVR.from_depstring('b[== 1.0]')
self.pkg1 = NEVR.from_filename("s-c-t-0.0.1-0.20091218.2-rosa.lts2012.0.x86_64")
def test_nevr_parse(self):
self.assertEqual(self.p1.N, 'a')
self.assertEqual(self.p1.VR, '1.0')
self.assertEqual(self.p1.EVR, '1.0')
self.assertEqual(self.p1.FL, NEVR.EQUAL)
self.assertEqual(self.p2.FL, NEVR.GREATER)
self.assertEqual(self.p3.FL, NEVR.LESS)
self.assertEqual(self.p4.FL, NEVR.EQUAL | NEVR.GREATER)
self.assertEqual(self.pkg1.N, 's-c-t')
self.assertEqual(self.pkg1.EVR, '0.0.1-0.20091218.2')
self.assertEqual(self.pkg1.FL, NEVR.EQUAL)
def test_version_compare(self):
self.assertTrue(self.p1.satisfies(self.r1))
self.assertTrue(self.p1.satisfies(self.r3))
self.assertTrue(self.p1.satisfies(self.r6))
self.assertFalse(self.p1.satisfies(self.r4))
self.assertFalse(self.p1.satisfies(self.r5))
self.assertFalse(self.p1.satisfies(self.r7))
self.assertFalse(self.p1.satisfies(self.r8))
self.assertTrue(self.p2.satisfies(self.r2))
self.assertTrue(self.p2.satisfies(self.r2))
self.assertTrue(self.p2.satisfies(self.r4))
self.assertTrue(self.p2.satisfies(self.r6))
self.assertFalse(self.p2.satisfies(self.r1))
self.assertFalse(self.p2.satisfies(self.r5))
self.assertFalse(self.p2.satisfies(self.r7))
self.assertTrue(self.p3.satisfies(self.r3))
self.assertTrue(self.p3.satisfies(self.r5))
self.assertTrue(self.p3.satisfies(self.r6))
self.assertTrue(self.p3.satisfies(self.r7))
self.assertFalse(self.p3.satisfies(self.r1))
self.assertFalse(self.p3.satisfies(self.r2))
self.assertFalse(self.p3.satisfies(self.r4))
self.assertTrue(self.p4.satisfies(self.r1))
self.assertTrue(self.p4.satisfies(self.r6))
self.assertFalse(self.p4.satisfies(self.r5))
self.assertFalse(self.p4.satisfies(self.r7))
self.assertTrue(self.p5.satisfies(self.r8))
self.assertEqual(self.p1, self.r1)
self.assertNotEqual(self.p1, self.r2)
self.assertRaises(Exception, NEVR.from_depstring, "a [== 1.0]")
self.assertRaises(Exception, NEVR.from_depstring, "a [== 1.0 ]")
self.assertRaises(Exception, NEVR.from_depstring, "a[! 1.0]")
self.assertRaises(Exception, NEVR.from_depstring, "a == 1.0")
self.assertRaises(Exception, self.p1.__eq__, "a [== 1.0]")
if __name__ == '__main__':
parse_command_line()
if command_line.runselftests:
suite = unittest.TestLoader().loadTestsFromTestCase(Tests)
unittest.TextTestRunner(verbosity=2).run(suite)
else:
Main()