mirror of
https://abf.rosa.ru/djam/urpm-tools.git
synced 2025-02-23 17:32:46 +00:00
305 lines
11 KiB
Python
Executable file
305 lines
11 KiB
Python
Executable file
#!/usr/bin/python
|
|
'''
|
|
" Repomanage utility for distributions using urpm
|
|
"
|
|
" The tool traverses a directory, build a dict of
|
|
" foo[(name, arch)] = [/path/to/file/that/is/highest, /path/to/equalfile]
|
|
" and then reports newest/old packages
|
|
"
|
|
" Based on repomanage from yum-utils
|
|
"
|
|
" Copyright (C) 2011 ROSA Laboratory.
|
|
" Written by Denis Silakov <denis.silakov@rosalab.ru>
|
|
"
|
|
" This program is free software: you can redistribute it and/or modify
|
|
" it under the terms of the GNU General Public License or the GNU Lesser
|
|
" General Public License as published by the Free Software Foundation,
|
|
" either version 2 of the Licenses, or (at your option) any later version.
|
|
"
|
|
" This program is distributed in the hope that it will be useful,
|
|
" but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
" MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
" GNU General Public License for more details.
|
|
"
|
|
" You should have received a copy of the GNU General Public License
|
|
" and the GNU Lesser General Public License along with this program.
|
|
" If not, see <http://www.gnu.org/licenses/>.
|
|
'''
|
|
|
|
import os
|
|
import sys
|
|
import rpm
|
|
import fnmatch
|
|
import subprocess
|
|
import string
|
|
from rpm5utils import miscutils, arch, transaction, Rpm5UtilsError
|
|
import urpmmisc
|
|
|
|
import argparse
|
|
|
|
import gettext
|
|
gettext.install('urpm-tools')
|
|
|
|
|
|
def errorprint(stuff):
|
|
print >> sys.stderr, stuff
|
|
|
|
|
|
def getFileList(path, ext, filelist):
|
|
"""Return all files in path matching ext, store them in filelist, recurse dirs
|
|
return list object"""
|
|
|
|
extlen = len(ext)
|
|
try:
|
|
dir_list = os.listdir(path)
|
|
except OSError, e:
|
|
errorprint(_('Error accessing directory %(path)s, %(e)s') % {"path": path,"e": str(e)})
|
|
return []
|
|
|
|
for d in dir_list:
|
|
if os.path.isdir(path + '/' + d):
|
|
filelist = getFileList(path + '/' + d, ext, filelist)
|
|
else:
|
|
if string.lower(d[-extlen:]) == '%s' % (ext):
|
|
newpath = os.path.normpath(path + '/' + d)
|
|
filelist.append(newpath)
|
|
|
|
return filelist
|
|
|
|
|
|
def trimRpms(rpms, excludeGlobs):
|
|
badrpms = []
|
|
for fn in rpms:
|
|
for glob in excludeGlobs:
|
|
if fnmatch.fnmatch(fn, glob):
|
|
#~ print 'excluded: %s' % fn
|
|
if fn not in badrpms:
|
|
badrpms.append(fn)
|
|
for fn in badrpms:
|
|
if fn in rpms:
|
|
rpms.remove(fn)
|
|
|
|
return rpms
|
|
|
|
|
|
def parseargs(args):
|
|
parser = argparse.ArgumentParser(description=_('manage a directory of rpm packages and report newest or oldest packages'))
|
|
|
|
# new is only used to make sure that the user is not trying to get both
|
|
# new and old, after this old and not old will be used.
|
|
# (default = not old = new)
|
|
parser.add_argument("path", metavar="path",
|
|
help=_('path to directory with rpm packages'))
|
|
group = parser.add_mutually_exclusive_group();
|
|
group.add_argument("-o", "--old", default=False, action="store_true",
|
|
help=_('print the older packages'))
|
|
group.add_argument("-n", "--new", default=True, action="store_true",
|
|
help=_('print the newest packages (this is the default behavior)'))
|
|
group.add_argument("-b", "--obsolete", default=False, action="store_true",
|
|
help=_('report obsolete packages'))
|
|
parser.add_argument("-r", "--remove-old", default=False, action="store_true",
|
|
help=_('remove older packages'))
|
|
parser.add_argument("-s", "--space", default=False, action="store_true",
|
|
help=_('space separated output, not newline'))
|
|
parser.add_argument("-k", "--keep", default=1, dest='keep', action="store",
|
|
help=_('number of newest packages to keep - defaults to 1'))
|
|
parser.add_argument("-c", "--nocheck", default=0, action="store_true",
|
|
help=_('do not check package payload signatures/digests'))
|
|
group_log = parser.add_mutually_exclusive_group();
|
|
group_log.add_argument("-q", "--quiet", default=0, action="store_true",
|
|
help=_('be completely quiet'))
|
|
group_log.add_argument("-V", "--verbose", default=False, action="store_true",
|
|
help=_('be verbose - say which packages are decided to be old and why \
|
|
(this info is dumped to STDERR)'))
|
|
|
|
opts = parser.parse_args()
|
|
|
|
return opts
|
|
|
|
|
|
def main(args):
|
|
|
|
options = parseargs(args)
|
|
mydir = options.path
|
|
|
|
rpmList = []
|
|
rpmList = getFileList(mydir, '.rpm', rpmList)
|
|
verfile = {}
|
|
pkgdict = {} # hold all of them - put them in (n,a) = [(e,v,r),(e1,v1,r1)]
|
|
obsolete = {}
|
|
obsolete_vers = {}
|
|
obsolete_flags = {}
|
|
|
|
keepnum = int(options.keep)*(-1) # the number of items to keep
|
|
|
|
if len(rpmList) == 0:
|
|
errorprint(_('No files to process'))
|
|
sys.exit(1)
|
|
|
|
# 'New' is still used by default to be compatible with repomanage from yum-utils
|
|
if options.obsolete or options.old:
|
|
options.new = False
|
|
|
|
ts = rpm.TransactionSet()
|
|
if options.nocheck:
|
|
ts.setVSFlags(~(rpm._RPMVSF_NOPAYLOAD))
|
|
else:
|
|
ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD))
|
|
|
|
for pkg in rpmList:
|
|
try:
|
|
hdr = miscutils.hdrFromPackage(ts, pkg)
|
|
except Rpm5UtilsError, e:
|
|
msg = _("Error opening pkg %(pkg)s: %(err)s") % {"pkg": pkg, "err": str(e)}
|
|
errorprint(msg)
|
|
continue
|
|
|
|
if options.obsolete:
|
|
obsolete_by_pkg = hdr[rpm.RPMTAG_OBSOLETENAME]
|
|
obsolete_by_pkg_flags = hdr[rpm.RPMTAG_OBSOLETEFLAGS]
|
|
obsolete_by_pkg_vers = hdr[rpm.RPMTAG_OBSOLETEVERSION]
|
|
idx=0;
|
|
for obs in obsolete_by_pkg:
|
|
# Do not count packages obsoleted by themselves - let's leave this for rpmlint
|
|
if hdr['name'] != obs:
|
|
if obs not in obsolete:
|
|
obsolete[obs] = []
|
|
obsolete_flags[obs] = []
|
|
obsolete_vers[obs] = []
|
|
obsolete[obs].append(hdr['name'])
|
|
obsolete_vers[obs].append(obsolete_by_pkg_vers[idx])
|
|
obsolete_flags[obs].append(obsolete_by_pkg_flags[idx])
|
|
idx+=1
|
|
|
|
pkgtuple = miscutils.pkgDistTupleFromHeader(hdr)
|
|
(n,a,e,v,r,d) = pkgtuple
|
|
del hdr
|
|
|
|
if (n,a) not in pkgdict:
|
|
pkgdict[(n,a)] = []
|
|
pkgdict[(n,a)].append((e,v,r,d))
|
|
|
|
if pkgtuple not in verfile:
|
|
verfile[pkgtuple] = []
|
|
verfile[pkgtuple].append(pkg)
|
|
|
|
for natup in pkgdict.keys():
|
|
evrlist = pkgdict[natup]
|
|
if len(evrlist) > 1:
|
|
evrlist = urpmmisc.unique(evrlist)
|
|
evrlist.sort(miscutils.compareDEVR)
|
|
pkgdict[natup] = evrlist
|
|
|
|
del ts
|
|
|
|
# now we have our dicts - we can return whatever by iterating over them
|
|
|
|
outputpackages = []
|
|
|
|
# a flag indicating that old packages were found
|
|
old_found = 0
|
|
|
|
if options.obsolete:
|
|
for (n,a) in pkgdict.keys():
|
|
evrlist = pkgdict[(n,a)]
|
|
idx=0
|
|
if n in obsolete:
|
|
for pkg in evrlist:
|
|
(e,v,r,d) = pkg
|
|
|
|
# Check OBSOLETEFLAGS and OBSOLETEVERSION - do we really satisfy them?
|
|
really_obsoleted = 0;
|
|
oef = obsolete_flags[n][idx]
|
|
|
|
if obsolete_vers[n][idx].find("-") >= 0:
|
|
(over,orel) = obsolete_vers[n][idx].split("-");
|
|
else:
|
|
over = obsolete_vers[n][idx]
|
|
orel = None
|
|
|
|
if over.find(":") >= 0:
|
|
(oep,over) = over.split(":");
|
|
else:
|
|
oep = None
|
|
|
|
rc = miscutils.compareEVR((e, v, r), (oep, over, orel))
|
|
|
|
if rc >= 1 and ((oef & rpm.RPMSENSE_GREATER) or (oef & rpm.RPMSENSE_EQUAL)):
|
|
really_obsoleted = 1;
|
|
if rc == 0 and (oef & rpm.RPMSENSE_EQUAL):
|
|
really_obsoleted = 1;
|
|
if rc <= -1 and ((oef & rpm.RPMSENSE_LESS) or (oef & rpm.RPMSENSE_EQUAL)):
|
|
really_obsoleted = 1;
|
|
|
|
if really_obsoleted == 1:
|
|
print str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","")
|
|
if options.verbose:
|
|
print >> sys.stderr, str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","") + " is obsoleted by:"
|
|
for replacement in obsolete[n]:
|
|
print >> sys.stderr, " " + replacement
|
|
idx+=1
|
|
|
|
#if new
|
|
if options.new:
|
|
for (n,a) in pkgdict.keys():
|
|
evrlist = pkgdict[(n,a)]
|
|
|
|
if len(evrlist) < abs(keepnum):
|
|
newevrs = evrlist
|
|
else:
|
|
newevrs = evrlist[keepnum:]
|
|
if len(evrlist[:keepnum]) > 0:
|
|
old_found = 1
|
|
if options.remove_old:
|
|
for dropped in evrlist[:keepnum]:
|
|
(e,v,r,d) = dropped
|
|
pkg = str(verfile[(n,a,e,v,r,d)]).replace("['","").replace("']","")
|
|
subprocess.call(["rm", "-f", pkg])
|
|
if options.verbose:
|
|
for dropped in evrlist[:keepnum]:
|
|
(e,v,r,d) = dropped
|
|
print >> sys.stderr, _("Dropped ") + str(verfile[(n,a,e,v,r,d)])
|
|
print >> sys.stderr, _(" superseded by: ")
|
|
for left in newevrs:
|
|
(e,v,r,d) = left
|
|
print >> sys.stderr, " " + str(verfile[(n,a,e,v,r,d)])
|
|
|
|
for (e,v,r,d) in newevrs:
|
|
for pkg in verfile[(n,a,e,v,r,d)]:
|
|
outputpackages.append(pkg)
|
|
|
|
if options.old:
|
|
for (n,a) in pkgdict.keys():
|
|
evrlist = pkgdict[(n,a)]
|
|
|
|
if len(evrlist) < abs(keepnum):
|
|
continue
|
|
|
|
oldevrs = evrlist[:keepnum]
|
|
if len(oldevrs) > 0:
|
|
old_found = 1
|
|
for (e,v,r,d) in oldevrs:
|
|
for pkg in verfile[(n,a,e,v,r,d)]:
|
|
outputpackages.append(pkg)
|
|
if options.remove_old:
|
|
subprocess.call(["rm", "-f", pkg])
|
|
if options.verbose:
|
|
print >> sys.stderr, _("Dropped ") + pkg
|
|
print >> sys.stderr, _(" superseded by: ")
|
|
for left in evrlist[keepnum:]:
|
|
(e,v,r,d) = left
|
|
print >> sys.stderr, " " + str(verfile[(n,a,e,v,r,d)])
|
|
|
|
if not options.quiet:
|
|
outputpackages.sort()
|
|
for pkg in outputpackages:
|
|
if options.space:
|
|
print '%s' % pkg,
|
|
else:
|
|
print pkg
|
|
|
|
if old_found==1:
|
|
sys.exit(3)
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|