#!/usr/bin/python ''' " Repomanage utility for distributions using urpm " " The tool traverses a directory, build a dict of " foo[(name, arch)] = [/path/to/file/that/is/highest, /path/to/equalfile] " and then reports newest/old packages " " Based on repomanage from yum-utils " " Copyright (C) 2011 ROSA Laboratory. " Written by Denis Silakov " " This program is free software: you can redistribute it and/or modify " it under the terms of the GNU General Public License or the GNU Lesser " General Public License as published by the Free Software Foundation, " either version 2 of the Licenses, or (at your option) any later version. " " This program is distributed in the hope that it will be useful, " but WITHOUT ANY WARRANTY; without even the implied warranty of " MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the " GNU General Public License for more details. " " You should have received a copy of the GNU General Public License " and the GNU Lesser General Public License along with this program. " If not, see . ''' import os import sys import rpm import fnmatch import subprocess import string from rpm5utils import miscutils, arch, transaction, Rpm5UtilsError import urpmmisc import argparse import gettext gettext.install('urpm-tools') def errorprint(stuff): print >> sys.stderr, stuff def getFileList(path, ext, filelist): """Return all files in path matching ext, store them in filelist, recurse dirs return list object""" extlen = len(ext) try: dir_list = os.listdir(path) except OSError, e: errorprint(_('Error accessing directory %(path)s, %(e)s') % {"path": path,"e": str(e)}) return [] for d in dir_list: if os.path.isdir(path + '/' + d): filelist = getFileList(path + '/' + d, ext, filelist) else: if string.lower(d[-extlen:]) == '%s' % (ext): newpath = os.path.normpath(path + '/' + d) filelist.append(newpath) return filelist def trimRpms(rpms, excludeGlobs): badrpms = [] for fn in rpms: for glob in excludeGlobs: if fnmatch.fnmatch(fn, glob): #~ print 'excluded: %s' % fn if fn not in badrpms: badrpms.append(fn) for fn in badrpms: if fn in rpms: rpms.remove(fn) return rpms def parseargs(args): parser = argparse.ArgumentParser(description=_('manage a directory of rpm packages and report newest or oldest packages')) # new is only used to make sure that the user is not trying to get both # new and old, after this old and not old will be used. # (default = not old = new) parser.add_argument("path", metavar="path", help=_('path to directory with rpm packages')) group = parser.add_mutually_exclusive_group(); group.add_argument("-o", "--old", default=False, action="store_true", help=_('print the older packages')) group.add_argument("-n", "--new", default=True, action="store_true", help=_('print the newest packages (this is the default behavior)')) group.add_argument("-b", "--obsolete", default=False, action="store_true", help=_('report obsolete packages')) parser.add_argument("-r", "--remove-old", default=False, action="store_true", help=_('remove older packages')) parser.add_argument("-l", "--leave-oldest", default=False, action="store_true", help=_('leave only the oldest version of every package')) parser.add_argument("-s", "--space", default=False, action="store_true", help=_('space separated output, not newline')) parser.add_argument("-k", "--keep", default=1, dest='keep', action="store", help=_('number of newest packages to keep - defaults to 1')) parser.add_argument("-c", "--nocheck", default=0, action="store_true", help=_('do not check package payload signatures/digests')) group_log = parser.add_mutually_exclusive_group(); group_log.add_argument("-q", "--quiet", default=0, action="store_true", help=_('be completely quiet')) group_log.add_argument("-V", "--verbose", default=False, action="store_true", help=_('be verbose - say which packages are decided to be old and why \ (this info is dumped to STDERR)')) opts = parser.parse_args() return opts def main(args): options = parseargs(args) mydir = options.path rpmList = [] rpmList = getFileList(mydir, '.rpm', rpmList) verfile = {} pkgdict = {} # hold all of them - put them in (n,a) = [(e,v,r),(e1,v1,r1)] obsolete = {} obsolete_vers = {} obsolete_flags = {} keepnum = int(options.keep)*(-1) # the number of items to keep if len(rpmList) == 0: errorprint(_('No files to process')) sys.exit(1) # 'New' is still used by default to be compatible with repomanage from yum-utils if options.obsolete or options.old: options.new = False ts = rpm.TransactionSet() if not options.nocheck: ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD)) else: ts.setVSFlags(~(rpm.RPMVSF_NEEDPAYLOAD)) for pkg in rpmList: try: hdr = miscutils.hdrFromPackage(ts, pkg) except Rpm5UtilsError, e: msg = _("Error opening pkg %(pkg)s: %(err)s") % {"pkg": pkg, "err": str(e)} errorprint(msg) continue if options.obsolete: obsolete_by_pkg = hdr[rpm.RPMTAG_OBSOLETENAME] obsolete_by_pkg_flags = hdr[rpm.RPMTAG_OBSOLETEFLAGS] obsolete_by_pkg_vers = hdr[rpm.RPMTAG_OBSOLETEVERSION] idx=0; for obs in obsolete_by_pkg: # Do not count packages obsoleted by themselves - let's leave this for rpmlint if hdr['name'] != obs: if obs not in obsolete: obsolete[obs] = [] obsolete_flags[obs] = [] obsolete_vers[obs] = [] obsolete[obs].append(hdr['name']) obsolete_vers[obs].append(obsolete_by_pkg_vers[idx]) obsolete_flags[obs].append(obsolete_by_pkg_flags[idx]) idx+=1 pkgtuple = miscutils.pkgDistTupleFromHeader(hdr) (n,a,e,v,r,d) = pkgtuple del hdr if (n,a) not in pkgdict: pkgdict[(n,a)] = [] pkgdict[(n,a)].append((e,v,r,d)) if pkgtuple not in verfile: verfile[pkgtuple] = [] verfile[pkgtuple].append(pkg) for natup in pkgdict.keys(): evrlist = pkgdict[natup] if len(evrlist) > 1: evrlist = urpmmisc.unique(evrlist) evrlist.sort(miscutils.compareEVRD) pkgdict[natup] = evrlist del ts # now we have our dicts - we can return whatever by iterating over them outputpackages = [] # a flag indicating that old packages were found old_found = 0 if options.obsolete: for (n,a) in pkgdict.keys(): evrlist = pkgdict[(n,a)] idx=0 if n in obsolete: for pkg in evrlist: (e,v,r,d) = pkg # Check OBSOLETEFLAGS and OBSOLETEVERSION - do we really satisfy them? really_obsoleted = 0; oef = obsolete_flags[n][idx] oep = None if obsolete_vers[n][idx].find("-") >= 0: (over,orel) = obsolete_vers[n][idx].split("-"); if orel.find(":") >= 0: (orel,oep) = orel.split(":"); else: oep = None else: over = obsolete_vers[n][idx] orel = None if over.find(":") >= 0: (oep,over) = over.split(":"); else: oep = None # Ignore epochs for now ... rc = miscutils.compareEVR((0, v, r), (0, over, orel)) if rc >= 1 and ((oef & rpm.RPMSENSE_GREATER) or (oef & rpm.RPMSENSE_EQUAL)): really_obsoleted = 1; if rc == 0 and (oef & rpm.RPMSENSE_EQUAL): really_obsoleted = 1; if rc <= -1 and ((oef & rpm.RPMSENSE_LESS) or (oef & rpm.RPMSENSE_EQUAL)): really_obsoleted = 1; if really_obsoleted == 1: print str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") if options.verbose: print >> sys.stderr, str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") + " is obsoleted by:" for replacement in obsolete[n]: print >> sys.stderr, " " + replacement idx+=1 #if new if options.new: for (n,a) in pkgdict.keys(): evrlist = pkgdict[(n,a)] if len(evrlist) < abs(keepnum): newevrs = evrlist else: newevrs = evrlist[keepnum:] if len(evrlist[:keepnum]) > 0: old_found = 1 if options.leave_oldest: for dropped in evrlist[1:]: (e,v,r,d) = dropped pkg = str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") subprocess.call(["rm", "-f", pkg]) if options.remove_old: for dropped in evrlist[:keepnum]: (e,v,r,d) = dropped pkg = str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") subprocess.call(["rm", "-f", pkg]) if options.verbose: for dropped in evrlist[:keepnum]: (e,v,r,d) = dropped print >> sys.stderr, _("Dropped ") + str(verfile[(n,a,e,v,r,d)]) print >> sys.stderr, _(" superseded by: ") for left in newevrs: (e,v,r,d) = left print >> sys.stderr, " " + str(verfile[(n,a,e,v,r,d)]) for (e,v,r,d) in newevrs: for pkg in verfile[(n,a,e,v,r,d)]: outputpackages.append(pkg) if options.old: for (n,a) in pkgdict.keys(): evrlist = pkgdict[(n,a)] if len(evrlist) < abs(keepnum): continue oldevrs = evrlist[:keepnum] if len(oldevrs) > 0: old_found = 1 if options.leave_oldest: for dropped in evrlist[1:]: (e,v,r,d) = dropped pkg = str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") subprocess.call(["rm", "-f", pkg]) for (e,v,r,d) in oldevrs: for pkg in verfile[(n,a,e,v,r,d)]: outputpackages.append(pkg) if options.remove_old: subprocess.call(["rm", "-f", pkg]) if options.verbose: print >> sys.stderr, _("Dropped ") + pkg print >> sys.stderr, _(" superseded by: ") for left in evrlist[keepnum:]: (e,v,r,d) = left print >> sys.stderr, " " + str(verfile[(n,a,e,v,r,d)]) if not options.quiet: outputpackages.sort() for pkg in outputpackages: if options.space: print '%s' % pkg, else: print pkg if old_found==1: sys.exit(3) if __name__ == "__main__": main(sys.argv)