#!/usr/bin/python ''' " Package cleanup utility for distributions using urpm " Based on package-cleanup from yum-utils " " Copyright (C) 2011 ROSA Laboratory. " Written by Denis Silakov " " This program is free software: you can redistribute it and/or modify " it under the terms of the GNU General Public License or the GNU Lesser " General Public License as published by the Free Software Foundation, " either version 2 of the Licenses, or (at your option) any later version. " " This program is distributed in the hope that it will be useful, " but WITHOUT ANY WARRANTY; without even the implied warranty of " MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the " GNU General Public License for more details. " " You should have received a copy of the GNU General Public License " and the GNU Lesser General Public License along with this program. " If not, see . ''' import sys import logging import os import re import subprocess import string import urpmmisc import types from rpm5utils import miscutils, arch, transaction import argparse import rpm import gettext gettext.install('urpm-tools') def exactlyOne(l): return len(filter(None, l)) == 1 class PackageCleanup(): NAME = 'urpm-package-cleanup' VERSION = '0.1' USAGE = """ urpm-package-cleanup: helps find problems in the rpmdb of system and correct them usage: urpm-package-cleanup --problems or --leaves or --orphans or --oldkernels """ def __init__(self): self.addCmdOptions() self.main() def addCmdOptions(self): self.ArgParser = argparse.ArgumentParser(description=_('Find problems in the rpmdb of system and correct them')) self.ArgParser.add_argument("--qf", "--queryformat", dest="qf", action="store", default='%{NAME}-%{VERSION}-%{RELEASE}.%{ARCH}', help=_("Query format to use for output.")) self.ArgParser.add_argument("--auto", default=False, dest="auto",action="store_true", help=_('Use non-interactive mode')) self.ArgParser.add_argument("--version", action='version', version=self.VERSION) probgrp = self.ArgParser.add_argument_group(_('Orphans Options')) probgrp.add_argument("--orphans", default=False, dest="orphans",action="store_true", help=_('List installed packages which are not available from'\ ' currently configured repositories')) probgrp.add_argument("--update", default=False, dest="update",action="store_true", help=_('Use only update media. This means that urpmq will search'\ ' and resolve dependencies only in media marked as containing updates'\ ' (e.g. which have been created with "urpmi.addmedia --update").')) probgrp.add_argument("--media", metavar='media', nargs='+', help=_('Select specific media to be used, instead of defaulting to all available '\ 'media (or all update media if --update is used). No rpm will be found in ' 'other media.')) probgrp.add_argument("--excludemedia", metavar='media', nargs='+', help=_('Do not use the specified media.')) probgrp = self.ArgParser.add_argument_group(_('Dependency Problems Options')) probgrp.add_argument("--problems", default=False, dest="problems", action="store_true", help=_('List dependency problems in the local RPM database')) probgrp.add_argument("--suggests", default=False, dest="suggests", action="store_true", help=_('List missing suggestions of installed packages')) dupegrp = self.ArgParser.add_argument_group(_('Duplicate Package Options')) dupegrp.add_argument("--dupes", default=False, dest="dupes", action="store_true", help=_('Scan for duplicates in your rpmdb')) dupegrp.add_argument("--cleandupes", default=False, dest="cleandupes", action="store_true", help=_('Scan for duplicates in your rpmdb and remove older ')) dupegrp.add_argument("--noscripts", default=False, dest="noscripts", action="store_true", help=_("disable rpm scriptlets from running when cleaning duplicates")) leafgrp = self.ArgParser.add_argument_group(_('Leaf Node Options')) leafgrp.add_argument("--leaves", default=False, dest="leaves", action="store_true", help=_('List leaf nodes in the local RPM database')) leafgrp.add_argument("--all", default=False, dest="all_nodes", action="store_true", help=_('list all packages leaf nodes that do not match'\ ' leaf-regex')) leafgrp.add_argument("--leaf-regex", default="(^(compat-)?lib(?!reoffice).+|.*libs?[\d-]*|.*-data$)", help=_('A package name that matches this regular expression' \ ' (case insensitively) is a leaf')) leafgrp.add_argument("--exclude-devel", default=False, action="store_true", help=_('do not list development packages as leaf nodes')) leafgrp.add_argument("--exclude-bin", default=False, action="store_true", help=_('do not list packages with files in a bin dirs as '\ 'leaf nodes')) kernelgrp = self.ArgParser.add_argument_group(_('Old Kernel Options')) kernelgrp.add_argument("--oldkernels", default=False, dest="kernels",action="store_true", help=_("Remove old kernel and kernel-devel packages")) kernelgrp.add_argument("--count",default=2,dest="kernelcount", action="store", help=_('Number of kernel packages to keep on the '\ 'system (default 2)')) kernelgrp.add_argument("--keepdevel", default=False, dest="keepdevel", action="store_true", help=_('Do not remove kernel-devel packages when ' 'removing kernels')) def _removePkg(self, pkg): """remove given package""" # No smart behavior yet, simply call urpme for the package pkgName = pkg['name'] + "-" + pkg['version'] if pkg['release']: pkgName += '-' + pkg['release'] eraseOpts = string.join(self.tsflags, " ") if eraseOpts: subprocess.call(['urpme', pkgName, eraseOpts]) else: subprocess.call(['urpme', pkgName]) @staticmethod def _genDeptup(name, flags, version): """ Given random stuff, generate a usable dep tuple. """ if flags == 0: flags = None if type(version) is types.StringType: (r_e, r_v, r_r) = miscutils.stringToVersion(version) # would this ever be a ListType? elif type(version) in (types.TupleType, types.ListType): (r_e, r_v, r_r) = version else: # FIXME: This isn't always type(version) is types.NoneType: # ...not sure what it is though, come back to this r_e = r_v = r_r = None deptup = (name, urpmmisc.share_data(flags), (urpmmisc.share_data(r_e), urpmmisc.share_data(r_v), urpmmisc.share_data(r_r))) return urpmmisc.share_data(deptup) def _getProvides(self, req, flags, ver): """searches the rpmdb for what provides the arguments returns a list of pkg objects of providing packages, possibly empty""" ts = rpm.TransactionSet() mi = ts.dbMatch('provides', req) deptup = self._genDeptup(req, flags, ver) if deptup in self._get_pro_cache: return self._get_pro_cache[deptup] r_v = deptup[2][1] result = { } for po in mi: prov_idx = 0 for prov in po['provides']: if prov != req: prov_idx += 1 continue prov_ver = po['provideversion'][prov_idx] prov_flags = po['provideflags'][prov_idx] prov_idx += 1 if req[0] == '/' and r_v is None: result[po] = [(req, None, (None, None, None))] continue if deptup[2][1] is None and deptup[2][2] is None and deptup[2][0] is None: result[po] = [(req, None, (None, None, None))] else: provtup = (req, prov_flags, (po['epoch'], po['version'], po['release'])) matched = miscutils.rangeCompare(deptup, provtup) if not matched: print "NOT MATCHED " + str(deptup) + " VS " + str(provtup) if matched: result[po] = [(req, None, (None, None, None))] self._get_pro_cache[deptup] = result # Check if we have dependency on file not listed # directly in PROVIDES if not result and req[0] == '/' and r_v is None: mi = ts.dbMatch('filepaths', req) for po in mi: result[po] = [(req, None, (None, None, None))] return result def _find_missing_deps(self, pkgs): """find any missing dependencies for any installed package in pkgs""" providers = {} # To speed depsolving, don't recheck deps that have # already been checked problems = [] missing_suggests = [] for po in pkgs: req_idx = 0; for req in po['requires']: ver = po['requireversion'][req_idx] flags = po['requireflags'][req_idx] req_idx += 1 if req.startswith('rpmlib'): continue # ignore rpmlib deps if (req,flags,ver) not in providers: resolve_sack = self._getProvides(req,flags,ver) else: resolve_sack = providers[(req,flags,ver)] if len(resolve_sack) < 1: #~ flags = yum.depsolve.flags.get(flags, flags) missing = miscutils.formatRequire(req,ver,flags) # RPMSENSE_MISSINGOK == (1 << 19) if req in po['suggests'] or flags & (1 << 19): missing_suggests.append((po, "suggests %s" % missing)) else: problems.append((po, "requires %s" % missing)) else: # Store the resolve_sack so that we can re-use it if another # package has the same requirement providers[(req,flags,ver)] = resolve_sack return [problems, missing_suggests] def _find_installed_duplicates(self, ignore_kernel=True): """find installed duplicate packages returns a dict of pkgname = [[dupe1, dupe2], [dupe3, dupe4]] """ multipkgs = {} singlepkgs = {} results = {} ts = rpm.TransactionSet() mi = ts.dbMatch() for pkg in mi: # just skip kernels and everyone is happier if ignore_kernel: if 'kernel' in pkg['provides_names']: continue if pkg['name'].startswith('kernel'): continue # public keys from different repos may have different versions if pkg['name'].startswith('gpg-pubkey'): continue name = pkg['name'] if name in multipkgs or name in singlepkgs: continue pkgs = ts.dbMatch( 'name', name ) for po in pkgs: if name not in multipkgs: multipkgs[name] = [] if name not in singlepkgs: singlepkgs[name] = [] if arch.isMultiLibArch(arch=po['arch']): multipkgs[name].append(po) elif po['arch'] == 'noarch': multipkgs[name].append(po) singlepkgs[name].append(po) elif not arch.isMultiLibArch(arch=po['arch']): singlepkgs[name].append(po) else: print _("Warning: neither single nor multi lib arch: %s ") % po['arch'] for (name, pkglist) in multipkgs.items() + singlepkgs.items(): if len(pkglist) <= 1: continue if name not in results: results[name] = [] if pkglist not in results[name]: results[name].append(pkglist) return results def _remove_old_dupes(self): """add older duplicate pkgs to be removed in the transaction""" dupedict = self._find_installed_duplicates() removedupes = [] for (name,dupelists) in dupedict.items(): for dupelist in dupelists: dupelist.sort() for lowpo in dupelist[0:-1]: removedupes.append(lowpo) # No smart behavior yet, simply call urpme for every package for po in removedupes: self._removePkg(po) def _should_show_leaf(self, po, leaf_regex, exclude_devel, exclude_bin): """ Determine if the given pkg should be displayed as a leaf or not. Return True if the pkg should be shown, False if not. """ if po['name'] == 'gpg-pubkey': return False name = po['name'] if exclude_devel and name.endswith('devel'): return False if exclude_bin: for file_name in po['filepaths']: if file_name.find('bin') != -1: return False if leaf_regex.match(name): return True return False def _get_kernels(self): """return a list of all installed kernels, sorted newest to oldest""" ts = rpm.TransactionSet() mi = ts.dbMatch('provides','kernel') kernlist = [] for h in mi: kernlist.append(h) kernlist.sort() kernlist.reverse() return kernlist def _get_old_kernel_devel(self, kernels, removelist): """ List all kernel devel packages that either belong to kernel versions that are no longer installed or to kernel version that are in the removelist""" devellist = [] ts = rpm.TransactionSet() mi = ts.dbMatch('provides','kernel-devel') for po in mi: # For all kernel-devel packages see if there is a matching kernel # in kernels but not in removelist keep = False for kernel in kernels: if kernel in removelist: continue (kname,karch,kepoch,kver,krel) = (kernel['name'],kernel['arch'],kernel['epoch'],kernel['version'],kernel['release']) (dname,darch,depoch,dver,drel) = (po['name'],po['arch'],po['epoch'],po['version'],po['release']) if (karch,kepoch,kver,krel) == (darch,depoch,dver,drel): keep = True if not keep: devellist.append(po) return devellist def _remove_old_kernels(self, count, keepdevel): """Remove old kernels, keep at most count kernels (and always keep the running kernel""" count = int(count) kernels = self._get_kernels() runningkernel = os.uname()[2] # Vanilla kernels dont have a release, only a version if '-' in runningkernel: splt = runningkernel.split('-') if len(splt) == 2: (kver,krel) = splt else: # Handle cases where a custom build kernel has an extra '-' in the release kver=splt[1] krel="-".join(splt[1:]) if krel.split('.')[-1] == os.uname()[-1]: krel = ".".join(krel.split('.')[:-1]) else: kver = runningkernel krel = "" remove = kernels[count:] toremove = [] # Remove running kernel from remove list for kernel in remove: if kernel['version'] == kver and krel.startswith(kernel['release']): print _("Not removing kernel %(kver)s-%(krel)s because it is the running kernel") % {'kver': kver, 'krel': krel} else: toremove.append(kernel) # Now extend the list with all kernel-devel pacakges that either # have no matching kernel installed or belong to a kernel that is to # be removed if not keepdevel: toremove.extend(self._get_old_kernel_devel(kernels, toremove)) for po in toremove: self._removePkg(po) def main(self): opts = self.ArgParser.parse_args(sys.argv[1:]) if not exactlyOne([opts.problems, opts.dupes, opts.leaves, opts.kernels, opts.orphans, opts.cleandupes]): print self.ArgParser.format_help() sys.exit(1) self.tsflags = [] if opts.problems: ts = rpm.TransactionSet() mi = ts.dbMatch() self._get_pro_cache = {} (issues, missing_suggests) = self._find_missing_deps(mi) for (pkg, prob) in issues: print _('Package %(qf)s %(prob)s') % {'qf': pkg.sprintf(opts.qf), 'prob': prob} if( opts.suggests ): print _("Missing suggests:") for (pkg, prob) in missing_suggests: print 'Package %s %s' % (pkg.sprintf(opts.qf), prob) if issues: sys.exit(2) else: if (not opts.suggests) or (len(missing_suggests) == 0): print _('No Problems Found') sys.exit(0) else: sys.exit(3) if opts.dupes: dupes = self._find_installed_duplicates() for name, pkglists in dupes.items(): for pkglist in pkglists: for pkg in pkglist: print '%s' % pkg.sprintf(opts.qf) sys.exit(0) if opts.kernels: if os.geteuid() != 0: print _("Error: Cannot remove kernels as a user, must be root") sys.exit(1) if int(opts.kernelcount) < 1: print _("Error: should keep at least 1 kernel!") sys.exit(100) if opts.auto: self.tsflags.append('--auto') self._remove_old_kernels(opts.kernelcount, opts.keepdevel) sys.exit(0) #~ self.run_with_package_names.add('yum-utils') #~ if hasattr(self, 'doUtilBuildTransaction'): #~ errc = self.doUtilBuildTransaction() #~ if errc: #~ sys.exit(errc) #~ else: #~ try: #~ self.buildTransaction() #~ except yum.Errors.YumBaseError, e: #~ self.logger.critical("Error building transaction: %s" % e) #~ sys.exit(1) #~ #~ if len(self.tsInfo) < 1: #~ print 'No old kernels to remove' #~ sys.exit(0) #~ #~ sys.exit(self.doUtilTransaction()) if opts.leaves: self._ts = transaction.TransactionWrapper() leaves = self._ts.returnLeafNodes() leaf_reg = re.compile(opts.leaf_regex, re.IGNORECASE) for po in sorted(leaves): if opts.all_nodes or \ self._should_show_leaf(po, leaf_reg, opts.exclude_devel, opts.exclude_bin): print po.sprintf(opts.qf) sys.exit(0) if opts.orphans: """ Just a wrapper that invokes urpmq """ aux_opts = "" if opts.excludemedia: aux_opts = " --excludemedia " + " ".join(opts.excludemedia) if opts.media: aux_opts += " --media " + " ".join(opts.media) if opts.update: aux_opts += " --update " subprocess.call(["urpmq", "--not-available", aux_opts]) sys.exit(0) if opts.cleandupes: if os.geteuid() != 0: print _("Error: Cannot remove packages as a user, must be root") sys.exit(1) if opts.noscripts: self.tsflags.append('--noscripts') if opts.auto: self.tsflags.append('--auto') self._remove_old_dupes() #~ self.run_with_package_names.add('yum-utils') #~ if hasattr(self, 'doUtilBuildTransaction'): #~ errc = self.doUtilBuildTransaction() #~ if errc: #~ sys.exit(errc) #~ else: #~ try: #~ self.buildTransaction() #~ except yum.Errors.YumBaseError, e: #~ self.logger.critical("Error building transaction: %s" % e) #~ sys.exit(1) #~ if len(self.tsInfo) < 1: #~ print 'No duplicates to remove' #~ sys.exit(0) if __name__ == '__main__': # setup_locale() util = PackageCleanup()