#!/usr/bin/python2.7 ''' Created on Jan 11, 2012 @author: flid ''' import rpm import argparse import sys import subprocess import re import os import zlib import glob import shutil import platform import copy import unittest import gettext gettext.install('urpm-tools') from rpm5utils.synthesis import * ARCH = platform.machine() downloaded_rpms_dir = '/tmp/urpm-reposync.rpms' VERSION = "urpm-reposync 2.1" def vprint(text): '''Print the message only if verbose mode is on''' if(command_line.verbose): print(text) def qprint(text): '''Print the message only if quiet mode is off and 'printonly' is off''' if command_line.printonly: return if(not command_line.quiet): print(text) def eprint(text, fatal=False, code=1): '''Print the message to stderr. Exit if fatal''' print >> sys.stderr, text if (fatal): exit(code) def oprint(text): '''Print the message only if quiet mode is off''' if(not command_line.quiet): print(text) def parse_command_line(): global command_line arg_parser = argparse.ArgumentParser(description=_('reposync is used to synchronize a set of packages on the local computer with the remote repository.')) arg_parser.add_argument('--include-media', '--media', action='append',nargs = '+', help=_("Use only selected URPM media")) arg_parser.add_argument('--exclude-media', action='append',nargs = '+', help=_("Do not use selected URPM media")) #arg_parser.add_argument('-x', '--exclude-packages', action='store',nargs = '+', help="Exclude package(s) by regex") arg_parser.add_argument('-v', '--verbose', action='store_true', help=_("Verbose (print additional info)")) arg_parser.add_argument('-q', '--quiet', action='store_true', help=_("Quiet operation. Senseless without --auto.")) arg_parser.add_argument('-a', '--auto', action='store_true', help=_("Do not ask questions, just do it!")) arg_parser.add_argument('-p', '--printonly', action='store_true', help=_("Only print the list of actions to be done and do nothing more!")) arg_parser.add_argument('-d', '--download', action='store_true', help=_("Only download the rpm files, but install or remove nothing.")) #arg_parser.add_argument('-n', '--noremove', action='store_true', help=_("Do not remove packages at all. If some installed package prevent another package from beeing updated - do not update it.")) arg_parser.add_argument('-r', '--remove', action='store_true', help=_("Remove all the packages which do not present in repository. By default, only some of them would be removed.")) arg_parser.add_argument('-c', '--check', action='store_true', help=_("Download packages and check wether they can be installed to your system, but do not install them.")) arg_parser.add_argument('-k', '--nokernel', action='store_true', help=_("Do nothing with kernels.")) arg_parser.add_argument('--runselftests', action='store_true', help=_("Run self-tests end exit.")) arg_parser.add_argument('--detailed', action='store_true', help=_("Show detailed information about packages are going to be removed or installed (why does it have to be done)")) command_line = arg_parser.parse_args(sys.argv[1:]) if(command_line.quiet and not command_line.auto): eprint(_("It's senseless to use --quiet without --auto!"), fatal=True, code=2) if command_line.verbose: command_line.detailed = True cmd = ['urpmq'] to_update = [] to_downgrade = [] to_remove = [] to_remove_pre = [] to_append = [] unresolved = {} to_append_bysource = {} to_remove_problems = {} to_remove_saved = [] files_to_download = [] #If one of package deps matches this regexp and this package is #not in the repository - don't try to save this package. to_remove_force_list = [ NEVR.from_depstring("plymouth(system-theme)"), NEVR.from_depstring("mandriva-theme-screensaver"), ] flags = {rpm.RPMCALLBACK_UNKNOWN:'RPMCALLBACK_UNKNOWN', rpm.RPMCALLBACK_INST_PROGRESS:'RPMCALLBACK_INST_PROGRESS', rpm.RPMCALLBACK_INST_START:'RPMCALLBACK_INST_START', rpm.RPMCALLBACK_INST_OPEN_FILE:'RPMCALLBACK_INST_OPEN_FILE', rpm.RPMCALLBACK_INST_CLOSE_FILE:'RPMCALLBACK_INST_CLOSE_FILE', rpm.RPMCALLBACK_TRANS_PROGRESS:'RPMCALLBACK_TRANS_PROGRESS', rpm.RPMCALLBACK_TRANS_START:'RPMCALLBACK_TRANS_START', rpm.RPMCALLBACK_TRANS_STOP:'RPMCALLBACK_TRANS_STOP', rpm.RPMCALLBACK_UNINST_PROGRESS:'RPMCALLBACK_UNINST_PROGRESS', rpm.RPMCALLBACK_UNINST_START:'RPMCALLBACK_UNINST_START', rpm.RPMCALLBACK_UNINST_STOP:'RPMCALLBACK_UNINST_STOP', rpm.RPMCALLBACK_REPACKAGE_PROGRESS:'RPMCALLBACK_REPACKAGE_PROGRESS', rpm.RPMCALLBACK_REPACKAGE_START:'RPMCALLBACK_REPACKAGE_START', rpm.RPMCALLBACK_REPACKAGE_STOP:'RPMCALLBACK_REPACKAGE_STOP', rpm.RPMCALLBACK_UNPACK_ERROR:'RPMCALLBACK_UNPACK_ERROR', rpm.RPMCALLBACK_CPIO_ERROR:'RPMCALLBACK_CPIO_ERROR', rpm.RPMCALLBACK_SCRIPT_ERROR:'RPMCALLBACK_SCRIPT_ERROR'} rpmtsCallback_fd = None file_id = 0 current_file = "NotSet" def runCallback(reason, amount, total, key, client_data): global i, file_id, rpmtsCallback_fd, current_file if reason in flags: fl = flags[reason] #if not fl.endswith('PROGRESS'): vprint ("rpm_callback was called: %s, %s, %s, %s, %s" %(fl, str(amount), str(total), str(key), str(client_data))) if reason == rpm.RPMCALLBACK_INST_OPEN_FILE: vprint ("Opening file: " + key) current_file = key file_id += 1 qprint("[%d/%d] %s" % (file_id, len(files_to_download), os.path.basename(key))) rpmtsCallback_fd = os.open(key, os.O_RDONLY) return rpmtsCallback_fd if reason == rpm.RPMCALLBACK_UNINST_START: qprint(_("Removing %s") % os.path.basename(key)) elif reason == rpm.RPMCALLBACK_INST_START: vprint ("Closing file") os.close(rpmtsCallback_fd) elif reason == rpm.RPMCALLBACK_UNPACK_ERROR or \ reason == rpm.RPMCALLBACK_CPIO_ERROR or \ reason == rpm.RPMCALLBACK_SCRIPT_ERROR: eprint(_('urpm-reposync: error in package %s. Data: %(data)s') %{ 'cur_file': current_file, 'data': "%s; %s, %s, %s, %s" % (flags[reason], str(amount), str(total), str(key), str(client_data))}) def get_problem_dependencies(pkg): ''' Get all the packages to satisfy dependencies not provided by some installed package or by some action ''' global actions output = [] for req in repository.packages[pkg]['requires']: # for every package requirement pkgs_inst = installed.whatprovides(req) if pkgs_inst: continue #dependency is satisfied by one of installed packages #look for dependency in 'actions' pkgs_rep = repository.whatprovides(req) for p in pkgs_rep[:]: if p not in actions: pkgs_rep.remove(p) if not pkgs_rep: output.append(req) vprint("Problem deps for %s: %s" %(pkg, str(output))) return output def resolve_dependency(dep, pkg): res = repository.whatprovides(dep) if command_line.nokernel: for p in res[:]: if p.startswith('kernel'): res.remove('kernel') if not res: if pkg not in unresolved: unresolved[pkg] = [] if str(dep) not in unresolved[pkg]: unresolved[pkg].append(str(dep)) return None res = sorted(res) vprint("Resolved dependencies: " + str(res)) if not pkg in to_append_bysource: to_append_bysource[pkg] = [] to_append_bysource[pkg].append(res[0]) return res[0] def resolve_dep_while_emulation(requirement, package): #try to resolve the dep in repository pkgs = repository.whatprovides(requirement) found = False for p in pkgs: if p in actions: found = True break if not found and pkgs: vprint('NEW ACTION: ' + pkgs[0]) actions.append(pkgs[0]) if not package in to_append_bysource: to_append_bysource[package] = [] to_append_bysource[package].append(pkgs[0]) def emulate_install(pkg): ''' Reurns True if something was done, False - if package was not installed ''' global actions, not_provided_packages, conflicting_packages vprint('Emulating package installation: ' + pkg) conflicts = False for confl in repository.packages[pkg]['conflicts']: res = installed.whatprovides(confl) for item in res[:]: if item in to_remove_pre: res.remove(item) if res: conflicts = True conflicting_packages.append( (pkg, res) ) vprint("New conflict: %s, %s" % (str(pkg), str(res))) for prov in repository.packages[pkg]['provides']: res = installed.whatconflicts(prov) for item in res[:]: if item in to_remove_pre: res.remove(item) if res: conflicts = True conflicting_packages.append( (res, pkg) ) vprint("New conflict: %s, %s" % (str(res), str(pkg))) if conflicts: return False # remove the previosly added conflicts for this package for item in conflicting_packages[:]: pkg1, pkgs2 = item if pkg1 == pkg: conflicting_packages.remove(item) vprint("Conflict %s have been resolved" % str(item)) if pkg in pkgs2: pkgs2.remove(pkg) if not pkgs2: conflicting_packages.remove(item) vprint("Conflict %s have been resolved" % str((pkg1, pkg))) emptied = [] for p in not_provided_packages: for req in not_provided_packages[p][:]: for prov in repository.packages[pkg]['provides']: if prov.satisfies(req): vprint("Missing dep satisfied: %s -- %s" % (p, req)) not_provided_packages[p].remove(req) if not not_provided_packages[p]: emptied.append(p) break for p in emptied: not_provided_packages.pop(p) url = ms.media[repository.packages[pkg]['medium']] url += '/' + repository.packages[pkg]['filename'] + '.rpm' files_to_download.append(url) if pkg not in to_update and pkg not in to_downgrade and pkg not in to_append: to_append.append(pkg) if pkg not in installed.packages: installed.packages[pkg] = {} for tag in PackageSet.alltags: installed.packages[pkg][tag] = repository.packages[pkg][tag] for tag in PackageSet.tags: deps = installed.packages[pkg][tag] for dep in deps: if dep.N not in installed.what[tag]: installed.what[tag][dep.N] = [] installed.what[tag][dep.N].append((pkg,dep)) actions.remove(pkg) for req in repository.packages[pkg]['requires']: provs = installed.whatprovides(req) if not provs: # nothing provides it if pkg not in not_provided_packages: not_provided_packages[pkg] = [] vprint("New missing dep: %s -- %s" % (pkg, req)) not_provided_packages[pkg].append(req) resolve_dep_while_emulation(req, pkg) return True def emulate_remove(pkg, updating=False): ''' Reurns True if something was done, False - if package was not removed ''' global not_provided_packages vprint("Emulating package removing: " + pkg) if pkg not in installed.packages or 'nevr' not in installed.packages[pkg]: vprint("Nothing to remove") return False if pkg in not_provided_packages: not_provided_packages.pop(pkg) for tag in PackageSet.tags: deps = installed.packages[pkg][tag] for dep in deps: installed.what[tag][dep.N].remove((pkg,dep)) P = copy.deepcopy(installed.packages[pkg]) installed.packages[pkg] = {} installed.packages[pkg]['old_package'] = P if not actions: #do nothing while initial packages removing return True for dep in installed.packages[pkg]['old_package']['provides']: if dep.N not in installed.what['requires']: continue for (package, requirement) in installed.what['requires'][dep.N]: if dep.satisfies(requirement) and not installed.whatprovides(requirement): if package not in not_provided_packages: not_provided_packages[package] = [] vprint("New missing dep: %s -- %s" % (package, requirement)) not_provided_packages[package].append(requirement) resolve_dep_while_emulation(requirement, package) return True def have_to_be_removed(pkg): to_remove_problems[pkg] = [] for dep in installed.packages[pkg]['requires']: res = installed.whatprovides(dep) act_resolved = False for act_pkg in actions: for atc_dep in repository.packages[act_pkg]['provides']: if atc_dep.satisfies(dep): act_resolved = True break if act_resolved: break if not res and not act_resolved: to_remove_problems[pkg].append(_("\tRequires %s, which will not be installed.") % (str(dep) )) continue for dep in installed.packages[pkg]['provides']: res = installed.whatconflicts(dep) if res: to_remove_problems[pkg].append(_("\t%s conflicts with it" %(', '.join(res)))) for dep in installed.packages[pkg]['conflicts']: res = installed.whatprovides(dep) if res: to_remove_problems[pkg].append(_("\tIt conflicts with %s" %(', '.join(res)))) return to_remove_problems[pkg] def process_packages(): global actions, to_remove, not_provided_packages, conflicting_packages qprint("Computing actions list...") if command_line.remove: for pkg in to_remove_pre: emulate_remove(pkg) to_remove.append(pkg) actions = to_update + to_downgrade actions_backup = actions[:] conflicting_packages = [] problems = {} changed = True while changed: i = 0 l = len(actions) changed = False for act in actions[:]: i = i + 1 vprint('[%d/%d] %s' % (i, l, act)) prob = get_problem_dependencies(act) problems[act] = [] for p in prob: problems[act].append((p, resolve_dependency(p, act))) if problems[act]: vprint ("\nPROBLEM: %s: %s" % (act, problems[act])) if not problems[act]: removed = emulate_remove(act, updating=True) installed = emulate_install(act) if removed or installed: changed = True for pr in problems: if len(problems[pr])>0: for prob, resolved in problems[pr]: if resolved: vprint ("Package '%s' requires '%s' via dependency '%s'" % (pr, resolved, prob)) changed = True if resolved not in actions: actions.append(resolved) if not command_line.remove: for pkg in to_remove_pre[:]: vprint("Checking wether to remove " + pkg) res = have_to_be_removed(pkg) if res: vprint("%s have to be removed because:" % (pkg)) for item in res: vprint(str(item)) emulate_remove(pkg) if not pkg in to_remove: to_remove.append(pkg) if pkg in to_remove_saved: to_remove_saved.remove(pkg) changed = True to_remove_pre.remove(pkg) else: if pkg not in to_remove_saved: to_remove_saved.append(pkg) have_to_exit = False if not_provided_packages: for p_name in not_provided_packages: eprint('>>>ERROR: Package %s has unsatisfied dependencies: %s' % (p_name, str(not_provided_packages[p_name]))) have_to_exit = True vprint ('Actions left: ' + str(actions)) if actions: for pkg in unresolved: eprint(">>>ERROR: %s requires %s" %(pkg, ', '.join(unresolved[pkg]))) have_to_exit = True if conflicting_packages: def format_conflicts(a): if type(a) is list: return '[%s]' % ', '.join(a) else: return str(a) for (a, b) in conflicting_packages: a_text = format_conflicts(a) b_text = format_conflicts(b) eprint(">>>ERROR: %s conflicts with %s" %(a_text, b_text)) have_to_exit = True if have_to_exit: eprint(_(">>> Contact repository maintaiers and send them this information, please."), fatal=True, code=4) def download_packages(): if not files_to_download: return qprint(_('Downloading files...')) l = len(files_to_download) i = 0 for url in files_to_download: i += 1 qprint("[%d/%d] %s " %(i, l, os.path.basename(url))) path = os.path.join(downloaded_rpms_dir, os.path.basename(url)) if os.path.isfile(path): continue try: if(url.startswith('/')): # local file shutil.copyfile(url, path) else: fd = urlopen(url) file = open(path, 'w') file.write(fd.read()) file.close() fd.close() except IOError, e: eprint("Can not download file %s: %s" % (url, str(e)), fatal=True, code=5) def install_packages(): def readRpmHeader(ts, filename): vprint("Reading header of " + filename) fd = os.open(filename, os.O_RDONLY) h = ts.hdrFromFdno(fd) os.close(fd) return h qprint(_("Generating transaction...")) ts = rpm.TransactionSet() # turn all the checks off. They can cause segfault in RPM for now. ts.setVSFlags(rpm.RPMVSF_NOHDRCHK|rpm.RPMVSF_NOSHA1HEADER|rpm.RPMVSF_NODSAHEADER|rpm.RPMVSF_NORSAHEADER|rpm.RPMVSF_NOMD5|rpm.RPMVSF_NODSA|rpm.RPMVSF_NORSA) ts.setProbFilter(rpm.RPMPROB_FILTER_OLDPACKAGE) #flags for ts.run execution. We need it to speed the process up ts.setFlags(rpm.RPMTRANS_FLAG_NOFDIGESTS) for file in files_to_download: f = os.path.join(downloaded_rpms_dir, os.path.basename(file)) h = readRpmHeader(ts, f) ts.addInstall(h, f, 'u') for pkg in to_remove: ts.addErase(pkg) qprint(_("Checking dependencies...")) def format_dep(dep): ((name, ver, rel), (namereq, verreq), needsFlags, suggestedPackage, sense) = dep vprint (dep) t = _('requires') if sense & 1: t = _('conflicts with') s = '' if needsFlags & rpm.RPMSENSE_LESS: #2 s = '<' if needsFlags & rpm.RPMSENSE_EQUAL: #8 s = '=' if needsFlags & rpm.RPMSENSE_GREATER: #4 s = '>' if needsFlags & rpm.RPMSENSE_NOTEQUAL: #6 s = '!=' if(verreq): verreq = '[%s %s]' % (s, verreq) else: verreq = '' return _("Package %(name)s-%(ver)s-%(rel)s %(t)s %(namereq)s%(verreq)s") % \ {'name': name,'ver': ver,'rel': rel,'namereq': namereq,'verreq': verreq, 't': t} unresolved_dependencies = ts.check() if(unresolved_dependencies): eprint(_("There are some unresolved dependencies: ") ) for dep in unresolved_dependencies: eprint("\t" + format_dep(dep)) eprint(_("Packages can not be installed. Please, contact urpm-tools developers and provide this output."), fatal=True, code=3) else: qprint(_("No errors found in transaction")) ts.order() if command_line.check: return qprint(_("Running transaction...")) ts.run(runCallback, 1) def check_media_set(): def try_solve_lib_arch(pkgname): '''if you have lib64A installed, but there is only libA in repository, it have not to be removed. And vice versa''' if not pkgname.startswith('lib'): return None if pkgname in repository.packages: return None is64 = (pkgname[3:5] == '64') is32 = not is64 if is32: l32 = pkgname l64 = 'lib64' + pkgname[3:] else: l32 = 'lib' + pkgname[5:] l64 = pkgname e32 = (l32 in repository.packages) e64 = (l64 in repository.packages) if(is32 and e64): # you have 32bit version installed, but there is only 64 bit version in repository if(ARCH=="x86_64"): return l64 else: return # 64bit library can not work in 32bit system if(is64 and e32): return l32 found = [] for pkg in to_remove: res = try_solve_lib_arch(pkg) if res: found.append((pkg, res)) vprint("The list of libs with incorrect arch in repository: " + str(found)) if found: qprint(_("WARNING: Some libraries are going to be removed because there are only the packages with the other architecture in the repository. Maybe you missed media with the correct architecture?")) def print_actions(): if(command_line.quiet): return def count_total_size(): sum = 0 for pkg in to_append + to_update + to_downgrade: sum += repository.packages[pkg]['size'] return sum def bytes_to_human_readable(bytes): bytes = float(bytes) if bytes >= 1099511627776: terabytes = bytes / 1099511627776 size = '%.2fT' % terabytes elif bytes >= 1073741824: gigabytes = bytes / 1073741824 size = '%.2fG' % gigabytes elif bytes >= 1048576: megabytes = bytes / 1048576 size = '%.2fM' % megabytes elif bytes >= 1024: kilobytes = bytes / 1024 size = '%.2fK' % kilobytes else: size = '%.2fb' % bytes return size media = ms.media.keys() def print_pkg_list(pkglist, tag): media_contents = {} for medium in media: for pkg in pkglist: if(repository.packages[pkg]['medium'] == medium): if( medium not in media_contents): media_contents[medium] = [] media_contents[medium].append(pkg) qprint(" %-30s %-15s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('New Version'), _('Arch'))) for medium in media_contents: qprint("(%s %s)" %( _("medium"), medium)) for pkg in sorted(media_contents[medium]): nevri = installed.packages[pkg]['nevr'] nevrr = repository.packages[pkg]['nevr'] if(nevri.E == nevrr.E): veri = nevri.VR verr = nevrr.VR else: veri = nevri.EVR verr = nevrr.EVR if nevri.DE and nevrr.DE and nevri.DE != nevrr.DE: veri += '(%s%s) ' % ( nevri.DT, nevri.DE) verr += '(%s%s) ' % ( nevrr.DT, nevrr.DE) oprint("%s %-30s %-15s %-15s %-10s" %(prefix, pkg, veri, verr, installed.packages[pkg]['arch'])) qprint('') prefix = '' if to_update: qprint(_("The following packages are going to be upgraded:")) if command_line.printonly: prefix = 'U' print_pkg_list(to_update, 'U') if to_downgrade: qprint(_("The following packages are going to be downgraded:")) if command_line.printonly: prefix = 'D' print_pkg_list(to_downgrade, 'D') if to_append: qprint(_("Additional packages are going to be installed:")) qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Version'), _('Arch'))) if command_line.printonly: prefix = 'A' def get_append_sources(pkg): out = [] for item in to_append_bysource: if pkg in to_append_bysource[item]: out.append(item) return out for pkg in to_append: nevr = repository.packages[pkg]['nevr'] oprint("%s %-30s %-15s %-10s" %(prefix, pkg, nevr.VR, repository.packages[pkg]['arch'])) if command_line.detailed: qprint(_("\tRequired by %s") % (", ".join(get_append_sources(pkg)))) qprint('') if to_remove: qprint(_("The following packages are going to be removed:")) qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('Arch'))) if command_line.printonly: prefix = 'R' for pkg in sorted(to_remove): nevr = installed.packages[pkg]['nevr'] oprint("%s %-30s %-15s %-10s" %(prefix, pkg, nevr.VR, installed.packages[pkg]['arch'])) if command_line.detailed and not command_line.remove: for problem in sorted(to_remove_problems[pkg]): qprint(problem) qprint('') if to_remove_saved and command_line.detailed: qprint(_("Packages which do not present in repositories, but do not have to be removed (will be saved):")) qprint(" %-30s %-15s %-10s" %(_('Package Name'), _('Current Version'), _('Arch'))) if command_line.printonly: prefix = 'S' for pkg in sorted(to_remove_saved): oprint("%s %-30s %-15s %-10s" %(prefix, pkg, installed.packages[pkg]['nevr'].VR, installed.packages[pkg]['arch'])) qprint(_("%d packages are going to be downloaded and installed.") % len(files_to_download)) qprint(_("%d packages are going to be removed.") % len(to_remove)) qprint(_("%s will be downloaded.") % bytes_to_human_readable(count_total_size())) def have_to_be_forced(pkg): for dep in installed.packages[pkg]['provides']: for f in to_remove_force_list: if dep.satisfies(f): vprint("Package %s have been forced to removal." % pkg) return f return None def Main(): global cmd, resolve_source, installed, repository, include_media, exclude_media global not_provided_packages, installed_backup, ms, actions resolve_source = False # variable that makes download_rpm to download resolved build-deps cmd = ['urpmq'] include_media = [] actions = [] if(command_line.include_media != None): media = '' for i in command_line.include_media: media = ",".join([media]+i) for ii in i: include_media.append(ii) cmd = cmd + ['--media', media[1:]] exclude_media = [] if(command_line.exclude_media != None): media = '' for i in command_line.exclude_media: media = ",".join([media]+i) for ii in i: exclude_media.append(ii) cmd = cmd + ['--excludemedia', media[1:]] ms = MediaSet.from_system(cmd) installed = PackageSet() installed.load_from_system() repository = PackageSet() repository.load_from_repository(ms) installed_backup = copy.deepcopy(installed) not_provided_packages = {} for inst in installed.packages: if command_line.nokernel and inst.startswith('kernel'): continue if inst not in repository.packages: if command_line.remove: to_remove_pre.append(inst) else: res = have_to_be_forced(inst) if res: emulate_remove(inst) to_remove.append(inst) to_remove_problems[inst]=[_('\tForced to be removed dew to "%s" policy.') % str(res)] else: to_remove_pre.append(inst) continue #compare distepochs first if installed.packages[inst]["nevr"].DE == None or repository.packages[inst]["nevr"].DE == None: res_epoch = 0 else: res_epoch = rpm.evrCompare(installed.packages[inst]["nevr"].DE, repository.packages[inst]["nevr"].DE) if res_epoch == -1: to_update.append(inst) elif res_epoch == 1: to_downgrade.append(inst) else: # disteposhs are the same #now versions can be compared res = rpm.evrCompare(installed.packages[inst]["nevr"].EVR, repository.packages[inst]["nevr"].EVR) if(res == -1): to_update.append(inst) elif res == 1: to_downgrade.append(inst) else: # res == 0 pass # do nothing process_packages() if len(to_update + to_downgrade + to_remove) == 0: qprint(_("Nothing to do")) return installed = installed_backup print_actions() if command_line.printonly: return vprint("Installed packages: " + str(len(installed.packages))) vprint("Repository packages: " + str(len(repository.packages))) vprint("Packages that need some actions: " + str(len(to_update) + len(to_downgrade) + len(to_remove) + len(to_append))) check_media_set() if(not command_line.auto): sys.stdout.write(_("Do you want to proceed? (y/n): ")) sys.stdout.flush() while(True): res = sys.stdin.readline() res = res.strip() if res in [_('y'), _('yes'), 'y', 'yes']: break if res in [_('n'), _('no'), 'n', 'no']: exit(0) download_packages() if command_line.download: return install_packages() if not os.path.exists(downloaded_rpms_dir): os.makedirs(downloaded_rpms_dir) class Tests(unittest.TestCase): def setUp(self): self.p1 = NEVR.from_depstring('a[== 1.0]') self.p2 = NEVR.from_depstring('a[> 1.0]') self.p3 = NEVR.from_depstring('a[< 1.0]') self.p4 = NEVR.from_depstring('a[>= 1.0]') self.p5 = NEVR.from_depstring('b[== 1.0]') self.r1 = NEVR.from_depstring('a[== 1.0]') self.r2 = NEVR.from_depstring('a[== 1.1]') self.r3 = NEVR.from_depstring('a[<= 1.1]') self.r4 = NEVR.from_depstring('a[>= 1.1]') self.r5 = NEVR.from_depstring('a[< 0.9]') self.r6 = NEVR.from_depstring('a[> 0.9]') self.r7 = NEVR.from_depstring('a[< 1.0]') self.r8 = NEVR.from_depstring('b[== 1.0]') self.pkg1 = NEVR.from_filename("s-c-t-0.0.1-0.20091218.2-rosa.lts2012.0.x86_64") def test_nevr_parse(self): self.assertEqual(self.p1.N, 'a') self.assertEqual(self.p1.VR, '1.0') self.assertEqual(self.p1.EVR, '1.0') self.assertEqual(self.p1.FL, NEVR.EQUAL) self.assertEqual(self.p2.FL, NEVR.GREATER) self.assertEqual(self.p3.FL, NEVR.LESS) self.assertEqual(self.p4.FL, NEVR.EQUAL | NEVR.GREATER) self.assertEqual(self.pkg1.N, 's-c-t') self.assertEqual(self.pkg1.EVR, '0.0.1-0.20091218.2') self.assertEqual(self.pkg1.FL, NEVR.EQUAL) def test_version_compare(self): self.assertTrue(self.p1.satisfies(self.r1)) self.assertTrue(self.p1.satisfies(self.r3)) self.assertTrue(self.p1.satisfies(self.r6)) self.assertFalse(self.p1.satisfies(self.r4)) self.assertFalse(self.p1.satisfies(self.r5)) self.assertFalse(self.p1.satisfies(self.r7)) self.assertFalse(self.p1.satisfies(self.r8)) self.assertTrue(self.p2.satisfies(self.r2)) self.assertTrue(self.p2.satisfies(self.r2)) self.assertTrue(self.p2.satisfies(self.r4)) self.assertTrue(self.p2.satisfies(self.r6)) self.assertFalse(self.p2.satisfies(self.r1)) self.assertFalse(self.p2.satisfies(self.r5)) self.assertFalse(self.p2.satisfies(self.r7)) self.assertTrue(self.p3.satisfies(self.r3)) self.assertTrue(self.p3.satisfies(self.r5)) self.assertTrue(self.p3.satisfies(self.r6)) self.assertTrue(self.p3.satisfies(self.r7)) self.assertFalse(self.p3.satisfies(self.r1)) self.assertFalse(self.p3.satisfies(self.r2)) self.assertFalse(self.p3.satisfies(self.r4)) self.assertTrue(self.p4.satisfies(self.r1)) self.assertTrue(self.p4.satisfies(self.r6)) self.assertFalse(self.p4.satisfies(self.r5)) self.assertFalse(self.p4.satisfies(self.r7)) self.assertTrue(self.p5.satisfies(self.r8)) self.assertEqual(self.p1, self.r1) self.assertNotEqual(self.p1, self.r2) self.assertRaises(Exception, NEVR.from_depstring, "a [== 1.0]") self.assertRaises(Exception, NEVR.from_depstring, "a [== 1.0 ]") self.assertRaises(Exception, NEVR.from_depstring, "a[! 1.0]") self.assertRaises(Exception, NEVR.from_depstring, "a == 1.0") self.assertRaises(Exception, self.p1.__eq__, "a [== 1.0]") if __name__ == '__main__': parse_command_line() if command_line.runselftests: suite = unittest.TestLoader().loadTestsFromTestCase(Tests) unittest.TextTestRunner(verbosity=2).run(suite) else: Main()