urpm-tools/urpm-reposync.py

890 lines
32 KiB
Python
Raw Normal View History

#!/usr/bin/python2.7
'''
Created on Jan 11, 2012
@author: flid
'''
import rpm
import argparse
import sys
import subprocess
import re
import os
import zlib
import glob
import shutil
import platform
import copy
import unittest
import gettext
gettext.install('urpm-tools')
from rpm5utils.synthesis import *
ARCH = platform.machine()
downloaded_rpms_dir = '/tmp/urpm-reposync.rpms'
VERSION = "urpm-reposync 2.1"
def vprint(text):
'''Print the message only if verbose mode is on'''
if(command_line.verbose):
print(text)
def qprint(text):
'''Print the message only if quiet mode is off and 'printonly' is off'''
if command_line.printonly:
return
if(not command_line.quiet):
print(text)
def eprint(text, fatal=False, code=1):
'''Print the message to stderr. Exit if fatal'''
print >> sys.stderr, text
if (fatal):
exit(code)
def oprint(text):
'''Print the message only if quiet mode is off'''
if(not command_line.quiet):
print(text)
def parse_command_line():
global command_line
arg_parser = argparse.ArgumentParser(description=_('reposync is used to synchronize a set of packages on the local computer with the remote repository.'))
arg_parser.add_argument('--include-media', '--media', action='append',nargs = '+', help=_("Use only selected URPM media"))
arg_parser.add_argument('--exclude-media', action='append',nargs = '+', help=_("Do not use selected URPM media"))
#arg_parser.add_argument('-x', '--exclude-packages', action='store',nargs = '+', help="Exclude package(s) by regex")
arg_parser.add_argument('-v', '--verbose', action='store_true', help=_("Verbose (print additional info)"))
arg_parser.add_argument('-q', '--quiet', action='store_true', help=_("Quiet operation. Senseless without --auto."))
arg_parser.add_argument('-a', '--auto', action='store_true', help=_("Do not ask questions, just do it!"))
arg_parser.add_argument('-p', '--printonly', action='store_true', help=_("Only print the list of actions to be done and do nothing more!"))
arg_parser.add_argument('-d', '--download', action='store_true', help=_("Only download the rpm files, but install or remove nothing."))
#arg_parser.add_argument('-n', '--noremove', action='store_true', help=_("Do not remove packages at all. If some installed package prevent another package from beeing updated - do not update it."))
arg_parser.add_argument('-r', '--remove', action='store_true', help=_("Remove all the packages which do not present in repository. By default, only some of them would be removed."))
arg_parser.add_argument('-c', '--check', action='store_true', help=_("Download packages and check wether they can be installed to your system, but do not install them."))
arg_parser.add_argument('-k', '--nokernel', action='store_true', help=_("Do nothing with kernels."))
arg_parser.add_argument('--runselftests', action='store_true', help=_("Run self-tests end exit."))
arg_parser.add_argument('--detailed', action='store_true', help=_("Show detailed information about packages are going to be removed or installed (why does it have to be done)"))
command_line = arg_parser.parse_args(sys.argv[1:])
if(command_line.quiet and not command_line.auto):
eprint(_("It's senseless to use --quiet without --auto!"), fatal=True, code=2)
if command_line.verbose:
command_line.detailed = True
cmd = ['urpmq']
to_update = []
to_downgrade = []
to_remove = []
to_remove_pre = []
to_append = []
unresolved = {}
to_append_bysource = {}
to_remove_problems = {}
to_remove_saved = []
files_to_download = []
#If one of package deps matches this regexp and this package is
#not in the repository - don't try to save this package.
to_remove_force_list = [
NEVR.from_depstring("plymouth(system-theme)"),
NEVR.from_depstring("mandriva-theme-screensaver"),
]
flags = {rpm.RPMCALLBACK_UNKNOWN:'RPMCALLBACK_UNKNOWN',
rpm.RPMCALLBACK_INST_PROGRESS:'RPMCALLBACK_INST_PROGRESS',
rpm.RPMCALLBACK_INST_START:'RPMCALLBACK_INST_START',
rpm.RPMCALLBACK_INST_OPEN_FILE:'RPMCALLBACK_INST_OPEN_FILE',
rpm.RPMCALLBACK_INST_CLOSE_FILE:'RPMCALLBACK_INST_CLOSE_FILE',
rpm.RPMCALLBACK_TRANS_PROGRESS:'RPMCALLBACK_TRANS_PROGRESS',
rpm.RPMCALLBACK_TRANS_START:'RPMCALLBACK_TRANS_START',
rpm.RPMCALLBACK_TRANS_STOP:'RPMCALLBACK_TRANS_STOP',
rpm.RPMCALLBACK_UNINST_PROGRESS:'RPMCALLBACK_UNINST_PROGRESS',
rpm.RPMCALLBACK_UNINST_START:'RPMCALLBACK_UNINST_START',
rpm.RPMCALLBACK_UNINST_STOP:'RPMCALLBACK_UNINST_STOP',
rpm.RPMCALLBACK_REPACKAGE_PROGRESS:'RPMCALLBACK_REPACKAGE_PROGRESS',
rpm.RPMCALLBACK_REPACKAGE_START:'RPMCALLBACK_REPACKAGE_START',
rpm.RPMCALLBACK_REPACKAGE_STOP:'RPMCALLBACK_REPACKAGE_STOP',
rpm.RPMCALLBACK_UNPACK_ERROR:'RPMCALLBACK_UNPACK_ERROR',
rpm.RPMCALLBACK_CPIO_ERROR:'RPMCALLBACK_CPIO_ERROR',
rpm.RPMCALLBACK_SCRIPT_ERROR:'RPMCALLBACK_SCRIPT_ERROR'}
rpmtsCallback_fd = None
file_id = 0
current_file = "NotSet"
def runCallback(reason, amount, total, key, client_data):
global i, file_id, rpmtsCallback_fd, current_file
if reason in flags:
fl = flags[reason]
#if not fl.endswith('PROGRESS'):
vprint ("rpm_callback was called: %s, %s, %s, %s, %s" %(fl, str(amount), str(total),
str(key), str(client_data)))
if reason == rpm.RPMCALLBACK_INST_OPEN_FILE:
vprint ("Opening file: " + key)
current_file = key
file_id += 1
qprint("[%d/%d] %s" % (file_id, len(files_to_download), os.path.basename(key)))
rpmtsCallback_fd = os.open(key, os.O_RDONLY)
return rpmtsCallback_fd
if reason == rpm.RPMCALLBACK_UNINST_START:
qprint(_("Removing %s") % os.path.basename(key))
elif reason == rpm.RPMCALLBACK_INST_START:
vprint ("Closing file")
os.close(rpmtsCallback_fd)
elif reason == rpm.RPMCALLBACK_UNPACK_ERROR or \
reason == rpm.RPMCALLBACK_CPIO_ERROR or \
reason == rpm.RPMCALLBACK_SCRIPT_ERROR:
eprint(_('urpm-reposync: error in package %s. Data: %(data)s') %{ 'cur_file': current_file, 'data': "%s; %s, %s, %s, %s" % (flags[reason], str(amount),
str(total), str(key), str(client_data))})
def get_problem_dependencies(pkg):
''' Get all the packages to satisfy dependencies not provided by some installed package or by some action '''
global actions
output = []
for req in repository.packages[pkg]['requires']: # for every package requirement
pkgs_inst = installed.whatprovides(req)
if pkgs_inst:
continue #dependency is satisfied by one of installed packages
#look for dependency in 'actions'
pkgs_rep = repository.whatprovides(req)
for p in pkgs_rep[:]:
if p not in actions:
pkgs_rep.remove(p)
if not pkgs_rep:
output.append(req)
vprint("Problem deps for %s: %s" %(pkg, str(output)))
return output
def resolve_dependency(dep, pkg):
res = repository.whatprovides(dep)
if command_line.nokernel:
for p in res[:]:
if p.startswith('kernel'):
res.remove('kernel')
if not res:
if pkg not in unresolved:
unresolved[pkg] = []
if str(dep) not in unresolved[pkg]:
unresolved[pkg].append(str(dep))
return None
res = sorted(res)
vprint("Resolved dependencies: " + str(res))
if not pkg in to_append_bysource:
to_append_bysource[pkg] = []
to_append_bysource[pkg].append(res[0])
return res[0]
def resolve_dep_while_emulation(requirement, package):
#try to resolve the dep in repository
pkgs = repository.whatprovides(requirement)
found = False
for p in pkgs:
if p in actions:
found = True
break
if not found and pkgs:
vprint('NEW ACTION: ' + pkgs[0])
actions.append(pkgs[0])
if not package in to_append_bysource:
to_append_bysource[package] = []
to_append_bysource[package].append(pkgs[0])
def emulate_install(pkg):
global actions, not_provided_packages, conflicting_packages
vprint('Emulating package installation: ' + pkg)
emptied = []
for p in not_provided_packages:
for req in not_provided_packages[p][:]:
for prov in repository.packages[pkg]['provides']:
if prov.satisfies(req):
vprint("Missing dep satisfied: %s -- %s" % (p, req))
not_provided_packages[p].remove(req)
if not not_provided_packages[p]:
emptied.append(p)
break
for p in emptied:
not_provided_packages.pop(p)
conflicts = False
for confl in repository.packages[pkg]['conflicts']:
res = installed.whatprovides(confl)
if res:
conflicts = True
conflicting_packages.append( (pkg, res) )
vprint("New conflict: %s, %s" % (str(pkg), str(res)))
for prov in repository.packages[pkg]['provides']:
res = installed.whatconflicts(prov)
if res:
conflicts = True
conflicting_packages.append( (res, pkg) )
vprint("New conflict: %s, %s" % (str(res), str(pkg)))
if conflicts:
return
url = ms.media[repository.packages[pkg]['medium']]
url += '/' + repository.packages[pkg]['filename'] + '.rpm'
files_to_download.append(url)
if pkg not in to_update and pkg not in to_downgrade and pkg not in to_append:
to_append.append(pkg)
if pkg not in installed.packages:
installed.packages[pkg] = {}
for tag in PackageSet.alltags:
installed.packages[pkg][tag] = repository.packages[pkg][tag]
for tag in PackageSet.tags:
deps = installed.packages[pkg][tag]
for dep in deps:
if dep.N not in installed.what[tag]:
installed.what[tag][dep.N] = []
installed.what[tag][dep.N].append((pkg,dep))
actions.remove(pkg)
for req in repository.packages[pkg]['requires']:
provs = installed.whatprovides(req)
if not provs: # nothing provides it
if pkg not in not_provided_packages:
not_provided_packages[pkg] = []
vprint("New missing dep: %s -- %s" % (pkg, req))
not_provided_packages[pkg].append(req)
resolve_dep_while_emulation(req, pkg)
def emulate_remove(pkg, updating=False):
global not_provided_packages
vprint("Emulating package removing: " + pkg)
if pkg not in installed.packages:
vprint("Nothing to remove")
return
if pkg in not_provided_packages:
not_provided_packages.pop(pkg)
for tag in PackageSet.tags:
deps = installed.packages[pkg][tag]
for dep in deps:
installed.what[tag][dep.N].remove((pkg,dep))
P = copy.deepcopy(installed.packages[pkg])
installed.packages[pkg] = {}
installed.packages[pkg]['old_package'] = P
if not actions: #do nothing while initial packages removing
return
for dep in installed.packages[pkg]['old_package']['provides']:
if dep.N not in installed.what['requires']:
continue
for (package, requirement) in installed.what['requires'][dep.N]:
if dep.satisfies(requirement) and not installed.whatprovides(requirement):
if package not in not_provided_packages:
not_provided_packages[package] = []
vprint("New missing dep: %s -- %s" % (package, requirement))
not_provided_packages[package].append(requirement)
resolve_dep_while_emulation(requirement, package)
def have_to_be_removed(pkg):
to_remove_problems[pkg] = []
for dep in installed.packages[pkg]['requires']:
res = installed.whatprovides(dep)
if not res:
to_remove_problems[pkg].append(_("\tRequires %s, which will not be installed.") % (str(dep) ))
continue
for dep in installed.packages[pkg]['provides']:
res = installed.whatconflicts(dep)
if res:
to_remove_problems[pkg].append(_("\t%s conflicts with it" %(', '.join(res))))
for dep in installed.packages[pkg]['conflicts']:
res = installed.whatprovides(dep)
if res:
to_remove_problems[pkg].append(_("\tIt conflicts with %s" %(', '.join(res))))
return to_remove_problems[pkg]
def process_packages():
global actions, to_remove, not_provided_packages, conflicting_packages
qprint("Computing actions list...")
if command_line.remove:
for pkg in to_remove_pre:
emulate_remove(pkg)
to_remove.append(pkg)
actions = to_update + to_downgrade
actions_backup = actions[:]
conflicting_packages = []
problems = {}
changed = True
while changed:
i = 0
l = len(actions)
changed = False
for act in actions[:]:
i = i + 1
vprint('[%d/%d] %s' % (i, l, act))
prob = get_problem_dependencies(act)
problems[act] = []
for p in prob:
problems[act].append((p, resolve_dependency(p, act)))
if problems[act]:
vprint ("\nPROBLEM: %s: %s" % (act, problems[act]))
if not problems[act]:
emulate_remove(act, updating=True)
emulate_install(act)
changed = True
for pr in problems:
if len(problems[pr])>0:
for prob, resolved in problems[pr]:
if resolved:
vprint ("Package '%s' requires '%s' via dependency '%s'" % (pr, resolved, prob))
changed = True
if resolved not in actions:
actions.append(resolved)
if not command_line.remove:
for pkg in to_remove_pre[:]:
vprint("Checking wether to remove " + pkg)
res = have_to_be_removed(pkg)
if res:
vprint("%s have to be removed because:" % (pkg))
for item in res:
vprint(str(item))
emulate_remove(pkg)
if not pkg in to_remove:
to_remove.append(pkg)
if pkg in to_remove_saved:
to_remove_saved.remove(pkg)
changed = True
to_remove_pre.remove(pkg)
else:
if pkg not in to_remove_saved:
to_remove_saved.append(pkg)
have_to_exit = False
if not_provided_packages:
for p_name in not_provided_packages:
eprint('>>>ERROR: Package %s has unsatisfied dependencies: %s' %
(p_name, str(not_provided_packages[p_name])))
have_to_exit = True
vprint ('Actions left: ' + str(actions))
if actions:
for pkg in unresolved:
eprint(">>>ERROR: %s requires %s" %(pkg, ', '.join(unresolved[pkg])))
have_to_exit = True
if conflicting_packages:
def format_conflicts(a):
if type(a) is list:
return '[%s]' % ', '.join(a)
else:
return str(a)
for (a, b) in conflicting_packages:
a_text = format_conflicts(a)
b_text = format_conflicts(b)
eprint(">>>ERROR: %s conflicts with %s" %(a_text, b_text))
have_to_exit = True
if have_to_exit:
eprint(_(">>> Contact repository maintaiers and send them this information, please."), fatal=True, code=4)
def download_packages():
if not files_to_download:
return
qprint(_('Downloading files...'))
l = len(files_to_download)
i = 0
for url in files_to_download:
i += 1
qprint("[%d/%d] %s " %(i, l, os.path.basename(url)))
path = os.path.join(downloaded_rpms_dir, os.path.basename(url))
if os.path.isfile(path):
continue
try:
if(url.startswith('/')): # local file
shutil.copyfile(url, path)
else:
fd = urlopen(url)
file = open(path, 'w')
file.write(fd.read())
file.close()
fd.close()
except IOError, e:
eprint("Can not download file %s: %s" % (url, str(e)), fatal=True, code=5)
def install_packages():
def readRpmHeader(ts, filename):
vprint("Reading header of " + filename)
fd = os.open(filename, os.O_RDONLY)
h = ts.hdrFromFdno(fd)
os.close(fd)
return h
qprint(_("Generating transaction..."))
ts = rpm.TransactionSet()
# turn all the checks off. They can cause segfault in RPM for now.
ts.setVSFlags(rpm.RPMVSF_NOHDRCHK|rpm.RPMVSF_NOSHA1HEADER|rpm.RPMVSF_NODSAHEADER|rpm.RPMVSF_NORSAHEADER|rpm.RPMVSF_NOMD5|rpm.RPMVSF_NODSA|rpm.RPMVSF_NORSA|rpm._RPMVSF_NODIGESTS|rpm._RPMVSF_NOSIGNATURES)
ts.setProbFilter(rpm.RPMPROB_FILTER_OLDPACKAGE)
#flags for ts.run execution. We need it to speed the process up
ts.setFlags(rpm.RPMTRANS_FLAG_NOFDIGESTS)
for file in files_to_download:
f = os.path.join(downloaded_rpms_dir, os.path.basename(file))
h = readRpmHeader(ts, f)
ts.addInstall(h, f, 'u')
for pkg in to_remove:
ts.addErase(pkg)
qprint(_("Checking dependencies..."))
def format_dep(dep):
((name, ver, rel), (namereq, verreq), needsFlags, suggestedPackage, sense) = dep
vprint (dep)
t = _('requires')
if sense & 1:
t = _('conflicts with')
s = ''
if needsFlags & rpm.RPMSENSE_LESS: #2
s = '<'
if needsFlags & rpm.RPMSENSE_EQUAL: #8
s = '='
if needsFlags & rpm.RPMSENSE_GREATER: #4
s = '>'
if needsFlags & rpm.RPMSENSE_NOTEQUAL: #6
s = '!='
if(verreq):
verreq = '[%s %s]' % (s, verreq)
else:
verreq = ''
return _("Package %(name)s-%(ver)s-%(rel)s %(t)s %(namereq)s%(verreq)s") % \
{'name': name,'ver': ver,'rel': rel,'namereq': namereq,'verreq': verreq, 't': t}
unresolved_dependencies = ts.check()
if(unresolved_dependencies):
eprint(_("There are some unresolved dependencies: ") )
for dep in unresolved_dependencies:
eprint("\t" + format_dep(dep))
eprint(_("Packages can not be installed. Please, contact urpm-tools developers and provide this output."), fatal=True, code=3)
else:
qprint(_("No errors found in transaction"))
ts.order()
if command_line.check:
return
qprint(_("Running transaction..."))
ts.run(runCallback, 1)
def check_media_set():
def try_solve_lib_arch(pkgname):
'''if you have lib64A installed, but there is only libA in repository, it have not to be removed. And vice versa'''
if not pkgname.startswith('lib'):
return None
if pkgname in repository.packages:
return None
is64 = (pkgname[3:5] == '64')
is32 = not is64
if is32:
l32 = pkgname
l64 = 'lib64' + pkgname[3:]
else:
l32 = 'lib' + pkgname[5:]
l64 = pkgname
e32 = (l32 in repository.packages)
e64 = (l64 in repository.packages)
if(is32 and e64): # you have 32bit version installed, but there is only 64 bit version in repository
if(ARCH=="x86_64"):
return l64
else:
return # 64bit library can not work in 32bit system
if(is64 and e32):
return l32
found = []
for pkg in to_remove:
res = try_solve_lib_arch(pkg)
if res:
found.append((pkg, res))
vprint("The list of libs with incorrect arch in repository: " + str(found))
if found:
qprint(_("WARNING: Some libraries are going to be removed because there are only the packages with the other architecture in the repository. Maybe you missed media with the correct architecture?"))
def print_actions():
if(command_line.quiet):
return
def count_total_size():
sum = 0
for pkg in to_append + to_update + to_downgrade:
sum += repository.packages[pkg]['size']
return sum
def bytes_to_human_readable(bytes):
bytes = float(bytes)
if bytes >= 1099511627776:
terabytes = bytes / 1099511627776
size = '%.2fT' % terabytes
elif bytes >= 1073741824:
gigabytes = bytes / 1073741824
size = '%.2fG' % gigabytes
elif bytes >= 1048576:
megabytes = bytes / 1048576
size = '%.2fM' % megabytes
elif bytes >= 1024:
kilobytes = bytes / 1024
size = '%.2fK' % kilobytes
else:
size = '%.2fb' % bytes
return size
media = ms.media.keys()
def print_pkg_list(pkglist, tag):
media_contents = {}
for medium in media:
for pkg in pkglist:
if(repository.packages[pkg]['medium'] == medium):
if( medium not in media_contents):
media_contents[medium] = []
media_contents[medium].append(pkg)
qprint(" %-30s %-15s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('New Version'), _('Arch')))
for medium in media_contents:
qprint("(%s %s)" %( _("medium"), medium))
for pkg in sorted(media_contents[medium]):
nevri = installed.packages[pkg]['nevr']
nevrr = repository.packages[pkg]['nevr']
if(nevri.E == nevrr.E):
veri = nevri.VR
verr = nevrr.VR
else:
veri = nevri.EVR
verr = nevrr.EVR
if nevri.DE and nevrr.DE and nevri.DE != nevrr.DE:
veri += '(%s%s) ' % ( nevri.DT, nevri.DE)
verr += '(%s%s) ' % ( nevrr.DT, nevrr.DE)
oprint("%s %-30s %-15s %-15s %-10s" %(prefix, pkg, veri, verr, installed.packages[pkg]['arch']))
qprint('')
prefix = ''
if to_update:
qprint(_("The following packages are going to be upgraded:"))
if command_line.printonly:
prefix = 'U'
print_pkg_list(to_update, 'U')
if to_downgrade:
qprint(_("The following packages are going to be downgraded:"))
if command_line.printonly:
prefix = 'D'
print_pkg_list(to_downgrade, 'D')
if to_append:
qprint(_("Additional packages are going to be installed:"))
qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Version'), _('Arch')))
if command_line.printonly:
prefix = 'A'
def get_append_sources(pkg):
out = []
for item in to_append_bysource:
if pkg in to_append_bysource[item]:
out.append(item)
return out
for pkg in to_append:
nevr = repository.packages[pkg]['nevr']
oprint("%s %-30s %-15s %-10s" %(prefix, pkg, nevr.VR, repository.packages[pkg]['arch']))
if command_line.detailed:
qprint(_("\tRequired by %s") % (", ".join(get_append_sources(pkg))))
qprint('')
if to_remove:
qprint(_("The following packages are going to be removed:"))
qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('Arch')))
if command_line.printonly:
prefix = 'R'
for pkg in sorted(to_remove):
nevr = installed.packages[pkg]['nevr']
oprint("%s %-30s %-15s %-10s" %(prefix, pkg, nevr.VR, installed.packages[pkg]['arch']))
if command_line.detailed and not command_line.remove:
for problem in sorted(to_remove_problems[pkg]):
qprint(problem)
qprint('')
if to_remove_saved and command_line.detailed:
qprint(_("Packages which do not present in repositories, but do not have to be removed (will be saved):"))
qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('Arch')))
if command_line.printonly:
prefix = 'S'
for pkg in sorted(to_remove_saved):
oprint("%s %-30s %-15s %-10s" %(prefix, pkg, installed.packages[pkg]['nevr'].VR, installed.packages[pkg]['arch']))
qprint(_("%d packages are going to be downloaded and installed.") % len(files_to_download))
qprint(_("%d packages are going to be removed.") % len(to_remove))
qprint(_("%s will be downloaded.") % bytes_to_human_readable(count_total_size()))
def have_to_be_forced(pkg):
for dep in installed.packages[pkg]['provides']:
for f in to_remove_force_list:
if dep.satisfies(f):
vprint("Package %s have been forced to removal." % pkg)
return f
return None
def Main():
global cmd, resolve_source, installed, repository, include_media, exclude_media
global not_provided_packages, installed_backup, ms, actions
resolve_source = False # variable that makes download_rpm to download resolved build-deps
cmd = ['urpmq']
include_media = []
actions = []
if(command_line.include_media != None):
media = ''
for i in command_line.include_media:
media = ",".join([media]+i)
for ii in i:
include_media.append(ii)
cmd = cmd + ['--media', media[1:]]
exclude_media = []
if(command_line.exclude_media != None):
media = ''
for i in command_line.exclude_media:
media = ",".join([media]+i)
for ii in i:
exclude_media.append(ii)
cmd = cmd + ['--excludemedia', media[1:]]
ms = MediaSet.from_system(cmd)
installed = PackageSet()
installed.load_from_system()
repository = PackageSet()
repository.load_from_repository(ms)
exit()
installed_backup = copy.deepcopy(installed)
not_provided_packages = {}
for inst in installed.packages:
if command_line.nokernel and inst.startswith('kernel'):
continue
if inst not in repository.packages:
if command_line.remove:
to_remove_pre.append(inst)
else:
res = have_to_be_forced(inst)
if res:
emulate_remove(inst)
to_remove.append(inst)
to_remove_problems[inst]=[_('\tForced to be removed dew to "%s" policy.') % str(res)]
else:
to_remove_pre.append(inst)
continue
#compare distepochs first
if installed.packages[inst]["nevr"].DE == None or repository.packages[inst]["nevr"].DE == None:
res_epoch = 0
else:
res_epoch = rpm.evrCompare(installed.packages[inst]["nevr"].DE, repository.packages[inst]["nevr"].DE)
if res_epoch == -1:
to_update.append(inst)
elif res_epoch == 1:
to_downgrade.append(inst)
else: # disteposhs are the same
#now versions can be compared
res = rpm.evrCompare(installed.packages[inst]["nevr"].EVR, repository.packages[inst]["nevr"].EVR)
if(res == -1):
to_update.append(inst)
elif res == 1:
to_downgrade.append(inst)
else: # res == 0
pass # do nothing
process_packages()
if len(to_update + to_downgrade + to_remove) == 0:
qprint(_("Nothing to do"))
return
installed = installed_backup
print_actions()
if command_line.printonly:
return
vprint("Installed packages: " + str(len(installed.packages)))
vprint("Repository packages: " + str(len(repository.packages)))
vprint("Packages that need some actions: " + str(len(to_update) + len(to_downgrade) + len(to_remove) + len(to_append)))
check_media_set()
if(not command_line.auto):
sys.stdout.write(_("Do you want to proceed? (y/n): "))
sys.stdout.flush()
while(True):
res = sys.stdin.readline()
res = res.strip()
if res in [_('y'), _('yes'), 'y', 'yes']:
break
if res in [_('n'), _('no'), 'n', 'no']:
exit(0)
download_packages()
if command_line.download:
return
install_packages()
if not os.path.exists(downloaded_rpms_dir):
os.makedirs(downloaded_rpms_dir)
class Tests(unittest.TestCase):
def setUp(self):
self.p1 = NEVR.from_depstring('a[== 1.0]')
self.p2 = NEVR.from_depstring('a[> 1.0]')
self.p3 = NEVR.from_depstring('a[< 1.0]')
self.p4 = NEVR.from_depstring('a[>= 1.0]')
self.p5 = NEVR.from_depstring('b[== 1.0]')
self.r1 = NEVR.from_depstring('a[== 1.0]')
self.r2 = NEVR.from_depstring('a[== 1.1]')
self.r3 = NEVR.from_depstring('a[<= 1.1]')
self.r4 = NEVR.from_depstring('a[>= 1.1]')
self.r5 = NEVR.from_depstring('a[< 0.9]')
self.r6 = NEVR.from_depstring('a[> 0.9]')
self.r7 = NEVR.from_depstring('a[< 1.0]')
self.r8 = NEVR.from_depstring('b[== 1.0]')
self.pkg1 = NEVR.from_filename("s-c-t-0.0.1-0.20091218.2-rosa.lts2012.0.x86_64")
def test_nevr_parse(self):
self.assertEqual(self.p1.N, 'a')
self.assertEqual(self.p1.VR, '1.0')
self.assertEqual(self.p1.EVR, '1.0')
self.assertEqual(self.p1.FL, NEVR.EQUAL)
self.assertEqual(self.p2.FL, NEVR.GREATER)
self.assertEqual(self.p3.FL, NEVR.LESS)
self.assertEqual(self.p4.FL, NEVR.EQUAL | NEVR.GREATER)
self.assertEqual(self.pkg1.N, 's-c-t')
self.assertEqual(self.pkg1.EVR, '0.0.1-0.20091218.2')
self.assertEqual(self.pkg1.FL, NEVR.EQUAL)
def test_version_compare(self):
self.assertTrue(self.p1.satisfies(self.r1))
self.assertTrue(self.p1.satisfies(self.r3))
self.assertTrue(self.p1.satisfies(self.r6))
self.assertFalse(self.p1.satisfies(self.r4))
self.assertFalse(self.p1.satisfies(self.r5))
self.assertFalse(self.p1.satisfies(self.r7))
self.assertFalse(self.p1.satisfies(self.r8))
self.assertTrue(self.p2.satisfies(self.r2))
self.assertTrue(self.p2.satisfies(self.r2))
self.assertTrue(self.p2.satisfies(self.r4))
self.assertTrue(self.p2.satisfies(self.r6))
self.assertFalse(self.p2.satisfies(self.r1))
self.assertFalse(self.p2.satisfies(self.r5))
self.assertFalse(self.p2.satisfies(self.r7))
self.assertTrue(self.p3.satisfies(self.r3))
self.assertTrue(self.p3.satisfies(self.r5))
self.assertTrue(self.p3.satisfies(self.r6))
self.assertTrue(self.p3.satisfies(self.r7))
self.assertFalse(self.p3.satisfies(self.r1))
self.assertFalse(self.p3.satisfies(self.r2))
self.assertFalse(self.p3.satisfies(self.r4))
self.assertTrue(self.p4.satisfies(self.r1))
self.assertTrue(self.p4.satisfies(self.r6))
self.assertFalse(self.p4.satisfies(self.r5))
self.assertFalse(self.p4.satisfies(self.r7))
self.assertTrue(self.p5.satisfies(self.r8))
self.assertEqual(self.p1, self.r1)
self.assertNotEqual(self.p1, self.r2)
self.assertRaises(Exception, NEVR.from_depstring, "a [== 1.0]")
self.assertRaises(Exception, NEVR.from_depstring, "a [== 1.0 ]")
self.assertRaises(Exception, NEVR.from_depstring, "a[! 1.0]")
self.assertRaises(Exception, NEVR.from_depstring, "a == 1.0")
self.assertRaises(Exception, self.p1.__eq__, "a [== 1.0]")
if __name__ == '__main__':
parse_command_line()
if command_line.runselftests:
suite = unittest.TestLoader().loadTestsFromTestCase(Tests)
unittest.TextTestRunner(verbosity=2).run(suite)
else:
Main()