urpm-tools/urpm-package-cleanup.py

557 lines
21 KiB
Python
Raw Normal View History

#!/usr/bin/python
'''
" Package cleanup utility for distributions using urpm
" Based on package-cleanup from yum-utils
"
" Copyright (C) 2011 ROSA Laboratory.
" Written by Denis Silakov <denis.silakov@rosalab.ru>
"
" This program is free software: you can redistribute it and/or modify
" it under the terms of the GNU General Public License or the GNU Lesser
" General Public License as published by the Free Software Foundation,
" either version 2 of the Licenses, or (at your option) any later version.
"
" This program is distributed in the hope that it will be useful,
" but WITHOUT ANY WARRANTY; without even the implied warranty of
" MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
" GNU General Public License for more details.
"
" You should have received a copy of the GNU General Public License
" and the GNU Lesser General Public License along with this program.
" If not, see <http://www.gnu.org/licenses/>.
'''
import sys
import logging
import os
import re
import subprocess
import string
import urpmmisc
import types
from rpm5utils import miscutils, arch, transaction
import argparse
import rpm
import gettext
gettext.install('urpm-tools')
def exactlyOne(l):
return len(filter(None, l)) == 1
class PackageCleanup():
NAME = 'urpm-package-cleanup'
VERSION = '0.1'
USAGE = """
urpm-package-cleanup: helps find problems in the rpmdb of system and correct them
usage: urpm-package-cleanup --problems or --leaves or --orphans or --oldkernels
"""
def __init__(self):
self.addCmdOptions()
self.main()
def addCmdOptions(self):
self.ArgParser = argparse.ArgumentParser(description=_('Find problems in the rpmdb of system and correct them'))
self.ArgParser.add_argument("--qf", "--queryformat", dest="qf",
action="store",
default='%{NAME}-%{VERSION}-%{RELEASE}.%{ARCH}',
help=_("Query format to use for output."))
self.ArgParser.add_argument("--auto", default=False,
dest="auto",action="store_true",
help=_('Use non-interactive mode'))
self.ArgParser.add_argument("--version", action='version', version=self.VERSION)
probgrp = self.ArgParser.add_argument_group(_('Orphans Options'))
probgrp.add_argument("--orphans", default=False,
dest="orphans",action="store_true",
help=_('List installed packages which are not available from'\
' currently configured repositories'))
probgrp.add_argument("--update", default=False,
dest="update",action="store_true",
help=_('Use only update media. This means that urpmq will search'\
' and resolve dependencies only in media marked as containing updates'\
' (e.g. which have been created with "urpmi.addmedia --update").'))
probgrp.add_argument("--media", metavar='media', nargs='+',
help=_('Select specific media to be used, instead of defaulting to all available '\
'media (or all update media if --update is used). No rpm will be found in '
'other media.'))
probgrp.add_argument("--excludemedia", metavar='media', nargs='+',
help=_('Do not use the specified media.'))
probgrp = self.ArgParser.add_argument_group(_('Dependency Problems Options'))
probgrp.add_argument("--problems", default=False,
dest="problems", action="store_true",
help=_('List dependency problems in the local RPM database'))
probgrp.add_argument("--suggests", default=False,
dest="suggests", action="store_true",
help=_('List missing suggestions of installed packages'))
dupegrp = self.ArgParser.add_argument_group(_('Duplicate Package Options'))
dupegrp.add_argument("--dupes", default=False,
dest="dupes", action="store_true",
help=_('Scan for duplicates in your rpmdb'))
dupegrp.add_argument("--cleandupes", default=False,
dest="cleandupes", action="store_true",
help=_('Scan for duplicates in your rpmdb and remove older '))
dupegrp.add_argument("--noscripts", default=False,
dest="noscripts", action="store_true",
help=_("disable rpm scriptlets from running when cleaning duplicates"))
leafgrp = self.ArgParser.add_argument_group(_('Leaf Node Options'))
leafgrp.add_argument("--leaves", default=False, dest="leaves",
action="store_true",
help=_('List leaf nodes in the local RPM database'))
leafgrp.add_argument("--all", default=False, dest="all_nodes",
action="store_true",
help=_('list all packages leaf nodes that do not match'\
' leaf-regex'))
leafgrp.add_argument("--leaf-regex",
default="(^(compat-)?lib(?!reoffice).+|.*libs?[\d-]*|.*-data$)",
help=_('A package name that matches this regular expression' \
' (case insensitively) is a leaf'))
leafgrp.add_argument("--exclude-devel", default=False,
action="store_true",
help=_('do not list development packages as leaf nodes'))
leafgrp.add_argument("--exclude-bin", default=False,
action="store_true",
help=_('do not list packages with files in a bin dirs as '\
'leaf nodes'))
kernelgrp = self.ArgParser.add_argument_group(_('Old Kernel Options'))
kernelgrp.add_argument("--oldkernels", default=False,
dest="kernels",action="store_true",
help=_("Remove old kernel and kernel-devel packages"))
kernelgrp.add_argument("--count",default=2,dest="kernelcount",
action="store",
help=_('Number of kernel packages to keep on the '\
'system (default 2)'))
kernelgrp.add_argument("--keepdevel", default=False, dest="keepdevel",
action="store_true",
help=_('Do not remove kernel-devel packages when '
'removing kernels'))
def _removePkg(self, pkg):
"""remove given package"""
# No smart behavior yet, simply call urpme for the package
pkgName = pkg['name'] + "-" + pkg['version']
if pkg['release']:
pkgName += '-' + pkg['release']
eraseOpts = string.join(self.tsflags, " ")
if eraseOpts:
subprocess.call(['urpme', pkgName, eraseOpts])
else:
subprocess.call(['urpme', pkgName])
@staticmethod
def _genDeptup(name, flags, version):
""" Given random stuff, generate a usable dep tuple. """
if flags == 0:
flags = None
if type(version) is types.StringType:
(r_e, r_v, r_r) = miscutils.stringToVersion(version)
# would this ever be a ListType?
elif type(version) in (types.TupleType, types.ListType):
(r_e, r_v, r_r) = version
else:
# FIXME: This isn't always type(version) is types.NoneType:
# ...not sure what it is though, come back to this
r_e = r_v = r_r = None
deptup = (name, urpmmisc.share_data(flags),
(urpmmisc.share_data(r_e), urpmmisc.share_data(r_v),
urpmmisc.share_data(r_r)))
return urpmmisc.share_data(deptup)
def _getProvides(self, req, flags, ver):
"""searches the rpmdb for what provides the arguments
returns a list of pkg objects of providing packages, possibly empty"""
ts = rpm.TransactionSet()
mi = ts.dbMatch('provides', req)
deptup = self._genDeptup(req, flags, ver)
if deptup in self._get_pro_cache:
return self._get_pro_cache[deptup]
r_v = deptup[2][1]
result = { }
for po in mi:
prov_idx = 0
for prov in po['provides']:
if prov != req:
prov_idx += 1
continue
prov_ver = po['provideversion'][prov_idx]
prov_flags = po['provideflags'][prov_idx]
prov_idx += 1
if req[0] == '/' and r_v is None:
result[po] = [(req, None, (None, None, None))]
continue
if deptup[2][1] is None and deptup[2][2] is None and deptup[2][0] is None:
result[po] = [(req, None, (None, None, None))]
else:
provtup = (req, prov_flags, (po['epoch'], po['version'], po['release']))
matched = miscutils.rangeCompare(deptup, provtup)
if not matched:
print "NOT MATCHED " + str(deptup) + " VS " + str(provtup)
if matched:
result[po] = [(req, None, (None, None, None))]
self._get_pro_cache[deptup] = result
# Check if we have dependency on file not listed
# directly in PROVIDES
if not result and req[0] == '/' and r_v is None:
mi = ts.dbMatch('filepaths', req)
for po in mi:
result[po] = [(req, None, (None, None, None))]
return result
def _find_missing_deps(self, pkgs):
"""find any missing dependencies for any installed package in pkgs"""
providers = {} # To speed depsolving, don't recheck deps that have
# already been checked
problems = []
missing_suggests = []
for po in pkgs:
req_idx = 0;
for req in po['requires']:
ver = po['requireversion'][req_idx]
flags = po['requireflags'][req_idx]
req_idx += 1
if req.startswith('rpmlib'): continue # ignore rpmlib deps
if (req,flags,ver) not in providers:
resolve_sack = self._getProvides(req,flags,ver)
else:
resolve_sack = providers[(req,flags,ver)]
if len(resolve_sack) < 1:
#~ flags = yum.depsolve.flags.get(flags, flags)
missing = miscutils.formatRequire(req,ver,flags)
# RPMSENSE_MISSINGOK == (1 << 19)
if req in po['suggests'] or flags & (1 << 19):
missing_suggests.append((po, "suggests %s" % missing))
else:
problems.append((po, "requires %s" % missing))
else:
# Store the resolve_sack so that we can re-use it if another
# package has the same requirement
providers[(req,flags,ver)] = resolve_sack
return [problems, missing_suggests]
def _find_installed_duplicates(self, ignore_kernel=True):
"""find installed duplicate packages returns a dict of
pkgname = [[dupe1, dupe2], [dupe3, dupe4]] """
multipkgs = {}
singlepkgs = {}
results = {}
ts = rpm.TransactionSet()
mi = ts.dbMatch()
for pkg in mi:
# just skip kernels and everyone is happier
if ignore_kernel:
if 'kernel' in pkg['provides_names']:
continue
if pkg['name'].startswith('kernel'):
continue
# public keys from different repos may have different versions
if pkg['name'].startswith('gpg-pubkey'):
continue
name = pkg['name']
if name in multipkgs or name in singlepkgs:
continue
pkgs = ts.dbMatch( 'name', name )
for po in pkgs:
if name not in multipkgs:
multipkgs[name] = []
if name not in singlepkgs:
singlepkgs[name] = []
if arch.isMultiLibArch(arch=po['arch']):
multipkgs[name].append(po)
elif po['arch'] == 'noarch':
multipkgs[name].append(po)
singlepkgs[name].append(po)
elif not arch.isMultiLibArch(arch=po['arch']):
singlepkgs[name].append(po)
else:
print _("Warning: neither single nor multi lib arch: %s ") % po['arch']
for (name, pkglist) in multipkgs.items() + singlepkgs.items():
if len(pkglist) <= 1:
continue
if name not in results:
results[name] = []
if pkglist not in results[name]:
results[name].append(pkglist)
return results
def _remove_old_dupes(self):
"""add older duplicate pkgs to be removed in the transaction"""
dupedict = self._find_installed_duplicates()
removedupes = []
for (name,dupelists) in dupedict.items():
for dupelist in dupelists:
dupelist.sort()
for lowpo in dupelist[0:-1]:
removedupes.append(lowpo)
# No smart behavior yet, simply call urpme for every package
for po in removedupes:
self._removePkg(po)
def _should_show_leaf(self, po, leaf_regex, exclude_devel, exclude_bin):
"""
Determine if the given pkg should be displayed as a leaf or not.
Return True if the pkg should be shown, False if not.
"""
if po['name'] == 'gpg-pubkey':
return False
name = po['name']
if exclude_devel and name.endswith('devel'):
return False
if exclude_bin:
for file_name in po['filepaths']:
if file_name.find('bin') != -1:
return False
if leaf_regex.match(name):
return True
return False
def _get_kernels(self):
"""return a list of all installed kernels, sorted newest to oldest"""
ts = rpm.TransactionSet()
mi = ts.dbMatch('provides','kernel')
kernlist = []
for h in mi:
kernlist.append(h)
kernlist.sort()
kernlist.reverse()
return kernlist
def _get_old_kernel_devel(self, kernels, removelist):
""" List all kernel devel packages that either belong to kernel versions that
are no longer installed or to kernel version that are in the removelist"""
devellist = []
ts = rpm.TransactionSet()
mi = ts.dbMatch('provides','kernel-devel')
for po in mi:
# For all kernel-devel packages see if there is a matching kernel
# in kernels but not in removelist
keep = False
for kernel in kernels:
if kernel in removelist:
continue
(kname,karch,kepoch,kver,krel) = (kernel['name'],kernel['arch'],kernel['epoch'],kernel['version'],kernel['release'])
(dname,darch,depoch,dver,drel) = (po['name'],po['arch'],po['epoch'],po['version'],po['release'])
if (karch,kepoch,kver,krel) == (darch,depoch,dver,drel):
keep = True
if not keep:
devellist.append(po)
return devellist
def _remove_old_kernels(self, count, keepdevel):
"""Remove old kernels, keep at most count kernels (and always keep the running
kernel"""
count = int(count)
kernels = self._get_kernels()
runningkernel = os.uname()[2]
# Vanilla kernels dont have a release, only a version
if '-' in runningkernel:
splt = runningkernel.split('-')
if len(splt) == 2:
(kver,krel) = splt
else: # Handle cases where a custom build kernel has an extra '-' in the release
kver=splt[1]
krel="-".join(splt[1:])
if krel.split('.')[-1] == os.uname()[-1]:
krel = ".".join(krel.split('.')[:-1])
else:
kver = runningkernel
krel = ""
remove = kernels[count:]
toremove = []
# Remove running kernel from remove list
for kernel in remove:
if kernel['version'] == kver and krel.startswith(kernel['release']):
print _("Not removing kernel %(kver)s-%(krel)s because it is the running kernel") % {'kver': kver, 'krel': krel}
else:
toremove.append(kernel)
# Now extend the list with all kernel-devel pacakges that either
# have no matching kernel installed or belong to a kernel that is to
# be removed
if not keepdevel:
toremove.extend(self._get_old_kernel_devel(kernels, toremove))
for po in toremove:
self._removePkg(po)
def main(self):
opts = self.ArgParser.parse_args(sys.argv[1:])
if not exactlyOne([opts.problems, opts.dupes, opts.leaves, opts.kernels,
opts.orphans, opts.cleandupes]):
print self.ArgParser.format_help()
sys.exit(1)
self.tsflags = []
if opts.problems:
ts = rpm.TransactionSet()
mi = ts.dbMatch()
self._get_pro_cache = {}
(issues, missing_suggests) = self._find_missing_deps(mi)
for (pkg, prob) in issues:
print _('Package %(qf)s %(prob)s') % {'qf': pkg.sprintf(opts.qf), 'prob': prob}
if( opts.suggests ):
print _("Missing suggests:")
for (pkg, prob) in missing_suggests:
print 'Package %s %s' % (pkg.sprintf(opts.qf), prob)
if issues:
sys.exit(2)
else:
if (not opts.suggests) or (len(missing_suggests) == 0):
print _('No Problems Found')
sys.exit(0)
else:
sys.exit(3)
if opts.dupes:
dupes = self._find_installed_duplicates()
for name, pkglists in dupes.items():
for pkglist in pkglists:
for pkg in pkglist:
print '%s' % pkg.sprintf(opts.qf)
sys.exit(0)
if opts.kernels:
if os.geteuid() != 0:
print _("Error: Cannot remove kernels as a user, must be root")
sys.exit(1)
if int(opts.kernelcount) < 1:
print _("Error: should keep at least 1 kernel!")
sys.exit(100)
if opts.auto:
self.tsflags.append('--auto')
self._remove_old_kernels(opts.kernelcount, opts.keepdevel)
sys.exit(0)
#~ self.run_with_package_names.add('yum-utils')
#~ if hasattr(self, 'doUtilBuildTransaction'):
#~ errc = self.doUtilBuildTransaction()
#~ if errc:
#~ sys.exit(errc)
#~ else:
#~ try:
#~ self.buildTransaction()
#~ except yum.Errors.YumBaseError, e:
#~ self.logger.critical("Error building transaction: %s" % e)
#~ sys.exit(1)
#~
#~ if len(self.tsInfo) < 1:
#~ print 'No old kernels to remove'
#~ sys.exit(0)
#~
#~ sys.exit(self.doUtilTransaction())
if opts.leaves:
self._ts = transaction.TransactionWrapper()
leaves = self._ts.returnLeafNodes()
leaf_reg = re.compile(opts.leaf_regex, re.IGNORECASE)
for po in sorted(leaves):
if opts.all_nodes or \
self._should_show_leaf(po, leaf_reg, opts.exclude_devel,
opts.exclude_bin):
print po.sprintf(opts.qf)
sys.exit(0)
if opts.orphans:
""" Just a wrapper that invokes urpmq """
aux_opts = ""
if opts.excludemedia:
aux_opts = " --excludemedia " + " ".join(opts.excludemedia)
if opts.media:
aux_opts += " --media " + " ".join(opts.media)
if opts.update:
aux_opts += " --update "
subprocess.call(["urpmq", "--not-available", aux_opts])
sys.exit(0)
if opts.cleandupes:
if os.geteuid() != 0:
print _("Error: Cannot remove packages as a user, must be root")
sys.exit(1)
if opts.noscripts:
self.tsflags.append('--noscripts')
if opts.auto:
self.tsflags.append('--auto')
self._remove_old_dupes()
#~ self.run_with_package_names.add('yum-utils')
#~ if hasattr(self, 'doUtilBuildTransaction'):
#~ errc = self.doUtilBuildTransaction()
#~ if errc:
#~ sys.exit(errc)
#~ else:
#~ try:
#~ self.buildTransaction()
#~ except yum.Errors.YumBaseError, e:
#~ self.logger.critical("Error building transaction: %s" % e)
#~ sys.exit(1)
#~ if len(self.tsInfo) < 1:
#~ print 'No duplicates to remove'
#~ sys.exit(0)
if __name__ == '__main__':
# setup_locale()
util = PackageCleanup()