urpm-tools/urpm-repomanage.py

313 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/python
'''
" Repomanage utility for distributions using urpm
"
" The tool traverses a directory, build a dict of
" foo[(name, arch)] = [/path/to/file/that/is/highest, /path/to/equalfile]
" and then reports newest/old packages
"
" Based on repomanage from yum-utils
"
" Copyright (C) 2011 ROSA Laboratory.
" Written by Denis Silakov <denis.silakov@rosalab.ru>
"
" This program is free software: you can redistribute it and/or modify
" it under the terms of the GNU General Public License or the GNU Lesser
" General Public License as published by the Free Software Foundation,
" either version 2 of the Licenses, or (at your option) any later version.
"
" This program is distributed in the hope that it will be useful,
" but WITHOUT ANY WARRANTY; without even the implied warranty of
" MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
" GNU General Public License for more details.
"
" You should have received a copy of the GNU General Public License
" and the GNU Lesser General Public License along with this program.
" If not, see <http://www.gnu.org/licenses/>.
'''
import os
import sys
import rpm
import fnmatch
import subprocess
import string
from rpm5utils import miscutils, arch, transaction, Rpm5UtilsError
import urpmmisc
import argparse
import gettext
gettext.install('urpm-tools')
def errorprint(stuff):
print >> sys.stderr, stuff
def getFileList(path, ext, filelist):
"""Return all files in path matching ext, store them in filelist, recurse dirs
return list object"""
extlen = len(ext)
try:
dir_list = os.listdir(path)
except OSError, e:
errorprint(_('Error accessing directory %(path)s, %(e)s') % {"path": path,"e": str(e)})
return []
for d in dir_list:
if os.path.isdir(path + '/' + d):
filelist = getFileList(path + '/' + d, ext, filelist)
else:
if string.lower(d[-extlen:]) == '%s' % (ext):
newpath = os.path.normpath(path + '/' + d)
filelist.append(newpath)
return filelist
def trimRpms(rpms, excludeGlobs):
badrpms = []
for fn in rpms:
for glob in excludeGlobs:
if fnmatch.fnmatch(fn, glob):
#~ print 'excluded: %s' % fn
if fn not in badrpms:
badrpms.append(fn)
for fn in badrpms:
if fn in rpms:
rpms.remove(fn)
return rpms
def parseargs(args):
parser = argparse.ArgumentParser(description=_('manage a directory of rpm packages and report newest or oldest packages'))
# new is only used to make sure that the user is not trying to get both
# new and old, after this old and not old will be used.
# (default = not old = new)
parser.add_argument("path", metavar="path",
help=_('path to directory with rpm packages'))
group = parser.add_mutually_exclusive_group();
group.add_argument("-o", "--old", default=False, action="store_true",
help=_('print the older packages'))
2012-12-12 14:46:47 +04:00
group.add_argument("-n", "--new", default=True, action="store_true",
help=_('print the newest packages (this is the default behavior)'))
2012-10-04 18:07:42 +04:00
group.add_argument("-b", "--obsolete", default=False, action="store_true",
help=_('report obsolete packages'))
parser.add_argument("-r", "--remove-old", default=False, action="store_true",
help=_('remove older packages'))
parser.add_argument("-s", "--space", default=False, action="store_true",
help=_('space separated output, not newline'))
parser.add_argument("-k", "--keep", default=1, dest='keep', action="store",
help=_('number of newest packages to keep - defaults to 1'))
parser.add_argument("-c", "--nocheck", default=0, action="store_true",
help=_('do not check package payload signatures/digests'))
group_log = parser.add_mutually_exclusive_group();
group_log.add_argument("-q", "--quiet", default=0, action="store_true",
help=_('be completely quiet'))
group_log.add_argument("-V", "--verbose", default=False, action="store_true",
help=_('be verbose - say which packages are decided to be old and why \
(this info is dumped to STDERR)'))
opts = parser.parse_args()
return opts
def main(args):
options = parseargs(args)
mydir = options.path
rpmList = []
rpmList = getFileList(mydir, '.rpm', rpmList)
verfile = {}
pkgdict = {} # hold all of them - put them in (n,a) = [(e,v,r),(e1,v1,r1)]
2012-10-04 18:07:42 +04:00
obsolete = {}
obsolete_vers = {}
obsolete_flags = {}
keepnum = int(options.keep)*(-1) # the number of items to keep
if len(rpmList) == 0:
errorprint(_('No files to process'))
sys.exit(1)
# 'New' is still used by default to be compatible with repomanage from yum-utils
if options.obsolete or options.old:
options.new = False
ts = rpm.TransactionSet()
2013-04-22 14:55:55 +04:00
if not options.nocheck:
ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD))
2013-05-20 11:26:27 +04:00
else:
ts.setVSFlags(~(rpm.RPMVSF_NEEDPAYLOAD))
for pkg in rpmList:
try:
hdr = miscutils.hdrFromPackage(ts, pkg)
except Rpm5UtilsError, e:
msg = _("Error opening pkg %(pkg)s: %(err)s") % {"pkg": pkg, "err": str(e)}
errorprint(msg)
continue
2012-10-04 18:07:42 +04:00
if options.obsolete:
obsolete_by_pkg = hdr[rpm.RPMTAG_OBSOLETENAME]
obsolete_by_pkg_flags = hdr[rpm.RPMTAG_OBSOLETEFLAGS]
obsolete_by_pkg_vers = hdr[rpm.RPMTAG_OBSOLETEVERSION]
idx=0;
2012-10-04 18:07:42 +04:00
for obs in obsolete_by_pkg:
# Do not count packages obsoleted by themselves - let's leave this for rpmlint
if hdr['name'] != obs:
if obs not in obsolete:
obsolete[obs] = []
obsolete_flags[obs] = []
obsolete_vers[obs] = []
2012-10-04 18:07:42 +04:00
obsolete[obs].append(hdr['name'])
obsolete_vers[obs].append(obsolete_by_pkg_vers[idx])
obsolete_flags[obs].append(obsolete_by_pkg_flags[idx])
idx+=1
2012-10-04 18:07:42 +04:00
pkgtuple = miscutils.pkgDistTupleFromHeader(hdr)
(n,a,e,v,r,d) = pkgtuple
del hdr
if (n,a) not in pkgdict:
pkgdict[(n,a)] = []
pkgdict[(n,a)].append((e,v,r,d))
if pkgtuple not in verfile:
verfile[pkgtuple] = []
verfile[pkgtuple].append(pkg)
for natup in pkgdict.keys():
evrlist = pkgdict[natup]
if len(evrlist) > 1:
evrlist = urpmmisc.unique(evrlist)
evrlist.sort(miscutils.compareDEVR)
pkgdict[natup] = evrlist
del ts
# now we have our dicts - we can return whatever by iterating over them
outputpackages = []
# a flag indicating that old packages were found
old_found = 0
2012-10-04 18:07:42 +04:00
if options.obsolete:
for (n,a) in pkgdict.keys():
2012-10-12 16:32:51 +04:00
evrlist = pkgdict[(n,a)]
idx=0
2012-10-04 18:07:42 +04:00
if n in obsolete:
2012-10-12 16:32:51 +04:00
for pkg in evrlist:
(e,v,r,d) = pkg
# Check OBSOLETEFLAGS and OBSOLETEVERSION - do we really satisfy them?
really_obsoleted = 0;
oef = obsolete_flags[n][idx]
oep = None
if obsolete_vers[n][idx].find("-") >= 0:
(over,orel) = obsolete_vers[n][idx].split("-");
if orel.find(":") >= 0:
(orel,oep) = orel.split(":");
else:
oep = None
else:
over = obsolete_vers[n][idx]
orel = None
if over.find(":") >= 0:
(oep,over) = over.split(":");
else:
oep = None
# Ignore epochs for now ...
rc = miscutils.compareEVR((0, v, r), (0, over, orel))
if rc >= 1 and ((oef & rpm.RPMSENSE_GREATER) or (oef & rpm.RPMSENSE_EQUAL)):
really_obsoleted = 1;
if rc == 0 and (oef & rpm.RPMSENSE_EQUAL):
really_obsoleted = 1;
if rc <= -1 and ((oef & rpm.RPMSENSE_LESS) or (oef & rpm.RPMSENSE_EQUAL)):
really_obsoleted = 1;
if really_obsoleted == 1:
print str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","")
if options.verbose:
print >> sys.stderr, str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") + " is obsoleted by:"
for replacement in obsolete[n]:
print >> sys.stderr, " " + replacement
idx+=1
2012-10-04 18:07:42 +04:00
#if new
2012-10-04 18:07:42 +04:00
if options.new:
for (n,a) in pkgdict.keys():
evrlist = pkgdict[(n,a)]
if len(evrlist) < abs(keepnum):
newevrs = evrlist
else:
newevrs = evrlist[keepnum:]
if len(evrlist[:keepnum]) > 0:
old_found = 1
if options.remove_old:
for dropped in evrlist[:keepnum]:
(e,v,r,d) = dropped
pkg = str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","")
2012-09-19 15:23:23 +04:00
subprocess.call(["rm", "-f", pkg])
if options.verbose:
for dropped in evrlist[:keepnum]:
(e,v,r,d) = dropped
print >> sys.stderr, _("Dropped ") + str(verfile[(n,a,e,v,r,d)])
print >> sys.stderr, _(" superseded by: ")
for left in newevrs:
(e,v,r,d) = left
print >> sys.stderr, " " + str(verfile[(n,a,e,v,r,d)])
for (e,v,r,d) in newevrs:
for pkg in verfile[(n,a,e,v,r,d)]:
outputpackages.append(pkg)
if options.old:
for (n,a) in pkgdict.keys():
evrlist = pkgdict[(n,a)]
if len(evrlist) < abs(keepnum):
continue
oldevrs = evrlist[:keepnum]
if len(oldevrs) > 0:
old_found = 1
for (e,v,r,d) in oldevrs:
for pkg in verfile[(n,a,e,v,r,d)]:
outputpackages.append(pkg)
if options.remove_old:
subprocess.call(["rm", "-f", pkg])
if options.verbose:
print >> sys.stderr, _("Dropped ") + pkg
print >> sys.stderr, _(" superseded by: ")
for left in evrlist[keepnum:]:
(e,v,r,d) = left
print >> sys.stderr, " " + str(verfile[(n,a,e,v,r,d)])
if not options.quiet:
outputpackages.sort()
for pkg in outputpackages:
if options.space:
print '%s' % pkg,
else:
print pkg
if old_found==1:
sys.exit(3)
if __name__ == "__main__":
main(sys.argv)