import rpm import re from urllib2 import urlopen, HTTPError, URLError import subprocess import platform import sys import os ARCH = platform.machine() def get_command_output(command, fatal_fails=True): '''Execute command using subprocess.Popen and return its stdout output string. If return code is not 0, print error message and exit''' #vprint("Executing command: " + str(command)) res = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = list(res.communicate()) if sys.stdout.encoding: output[0] = output[0].decode(sys.stdout.encoding).encode("UTF-8") output[1] = output[1].decode(sys.stdout.encoding).encode("UTF-8") if(res.returncode != 0 and fatal_fails): print(_("Error while calling command") + " '" + " ".join(command) + "'") if(output[1] != None or output[0] != None): print(_("Error message: \n")+ ((output[0].strip() + "\n") if output[0]!=None else "") + (output[1].strip() if output[1]!=None else "") ) exit(1) return [output[0], output[1], res.returncode] class MediaSet(object): @staticmethod def from_list(media): ms = MediaSet() ms.urls = [] ms.media = {} ms.by_url = {} for medium in media: med, url = medium ms.media[med] = url ms.by_url[url] = med ms.urls.append(url) return ms @staticmethod def from_system(cmd): ms = MediaSet() ms.urls = [] ms.media = {} ms.by_url = {} lines = get_command_output(cmd + ["--list-url", "--list-media", 'active'])[0].strip().split("\n") for line in lines: parts = line.split(" ") medium = ' '.join(parts[:-1]) url = parts[-1] if(url.endswith("/")): url = url[:-1] if(url.find('/') != -1): ms.media[medium] = url ms.by_url[parts[-1]] = medium ms.urls.append(url) return ms class NEVR(object): EQUAL = rpm.RPMSENSE_EQUAL #8 GREATER = rpm.RPMSENSE_GREATER #4 LESS = rpm.RPMSENSE_LESS #2 #re_ver = re.compile('^([\d\.]+:)?([\w\d\.\-\[\]]+)(:[\d\.]+)?$') re_dep_ver = re.compile('^([^ \[\]]+)\[([\>\<\=\!]*) ([^ ]+)\]$') re_dep = re.compile('^([^ \[\]]+)$') types = {None: 0, '==' : EQUAL, '' : EQUAL, '=' : EQUAL, '>=' : EQUAL|GREATER, '<=' : EQUAL|LESS, '>' : GREATER, '<' : LESS, '!=' : LESS|GREATER, '<>' : LESS|GREATER} __slots__=['N', 'EVR', 'DE', 'DT', 'FL', 'E', 'VR'] def __init__(self, N, EVR, DE=None, DT=None, FL=None, E=None): self.N = N self.EVR = EVR self.DE = DE self.DT = DT self.FL = FL self.E = E self.VR = EVR if E: if EVR.startswith(E + ':'): self.VR = EVR[len(E)+1:] else: self.EVR = E + ':' + self.EVR #try to get E if not self.E and self.EVR and self.EVR.find(':') != -1: items = self.EVR.split(':') if items[0].find('.') == -1 and items[0].find('-') == -1: self.E = items[0] if not self.E and self.EVR: self.E = '0' self.EVR = '0:' + self.EVR if self.DE == 'None': self.DE = None def __str__(self): if self.FL: for t in NEVR.types: if not t: continue if NEVR.types[t] == self.FL: return "%s %s %s" % (self.N, t, self.EVR) if self.EVR: return "%s == %s" % (self.N, self.EVR) return "%s" % (self.N) def __repr__(self): return self.__str__() def __eq__(self, val): if not isinstance(val, NEVR): raise Exception("Internal error: comparing between NEVR and " + str(type(val))) return str(self) == str(val) def __ne__(self, val): return not (self == val) @staticmethod def from_depstring(s, DE_toremove=None): s = s.replace('[*]', '') # We may have buggy strings ending with "[== " s = re.sub(r"\[==\s*$", "", s) if DE_toremove: res = NEVR.re_dep_ver.match(s) if res: (name, t, val) = res.groups() if val.endswith(':' + DE_toremove): val = val[:-(len(DE_toremove) + 1)] EVR = '%s[%s %s]' % (name, t, val) res = NEVR.re_dep.match(s) if res: return NEVR(res.group(1), None) res = NEVR.re_dep_ver.match(s) if not res: raise Exception('Incorrect requirement string: ' + s) (name, t, val) = res.groups() return NEVR(name, val, FL=NEVR.types[t]) re_version = re.compile("(\.)?((alpha)|(cvs)|(svn)|(r))?\d+((mdv)|(mdk)|(mnb))") @staticmethod def from_filename(rpmname, E=None): ''' Returns [name, version] for given rpm file or package name ''' suffix = ['.x86_64', '.noarch', '.i386', '.i486', '.i586', '.i686'] for s in suffix: if(rpmname.endswith(s)): rpmname = rpmname[:-len(s)] sections = rpmname.split("-") if(NEVR.re_version.search(sections[-1]) == None): name = sections[:-3] version = sections[-3:-1] else: name = sections[:-2] version = sections[-2:] return NEVR("-".join(name), "-".join(version), FL=NEVR.EQUAL, E=E) def satisfies(self, val): if self.N != val.N: return False if self.EVR == None or val.EVR == None: return True (pname, pt, pval) = (self.N, self.FL, self.EVR) (rname, rt, rval) = (val.N, val.FL, val.EVR) def cut_part(seperator, val1, val2): if val1 and val2 and val1.count(seperator) != val2.count(seperator): n = max(val1.count(seperator), val2.count(seperator)) val1 = seperator.join(val1.split(seperator)[:n]) val2 = seperator.join(val2.split(seperator)[:n]) return (val1, val2) (rval, pval) = cut_part(':', rval, pval) (rval, pval) = cut_part('-', rval, pval) res = rpm.evrCompare(rval, pval) if res == 1: # > if pt & NEVR.GREATER: return True elif pt & NEVR.LESS: if rt & NEVR.LESS: return True else: return False else: if rt & NEVR.LESS: return True else: return False elif res == 0: if rt & NEVR.EQUAL and pt & NEVR.EQUAL: return True if rt & NEVR.LESS and pt & NEVR.LESS: return True if rt & NEVR.GREATER and pt & NEVR.GREATER: return True return False else: # < if rt & NEVR.GREATER: return True elif rt & NEVR.LESS: if pt & NEVR.LESS: return True else: return False else: if pt & NEVR.LESS: return True else: return False class PackageSet: tags = ['provides','requires','obsoletes','suggests', 'conflicts'] alltags = tags + ['nevr', 'arch'] def __init__(self): self.what = {} self.packages = {} def load_from_system(self): print(_("Loading the list of installed packages...")) ts = rpm.TransactionSet() mi = ts.dbMatch() for tag in PackageSet.tags: self.what[tag] = {} for h in mi: name = h['name'] if(name == 'gpg-pubkey'): continue if(name not in self.packages): self.packages[h['name']] = {} else: print(_("Duplicating ") + name + '-' + h['version'] + '-' + h['release']) print(_("Already found: ") + name + '-' + self.packages[name]["nevr"].EVR) E = str(h['epoch']) V = h['version'] R = h['release'] DE = h['distepoch'] DT = h['disttag'] if E == None or E == 'None': E = '0' EVR = "%s:%s-%s" % (E, V, R) nevr = NEVR(name, EVR, FL=NEVR.EQUAL, DE=DE, DT=DT, E=E) self.packages[name]['nevr'] = nevr self.packages[name]['arch'] = h['arch'] for tag in PackageSet.tags: if tag not in self.packages[name]: self.packages[name][tag] = [] dss = h.dsFromHeader(tag[:-1] + 'name') for s in dss: fl = s.Flags() #undocumented flag for special dependencies if fl & 16777216: continue fl = fl % 16 _evr = s.EVR() if _evr == '': evr = NEVR(s.N(), None, FL=fl) else: evr = NEVR(s.N(), _evr, FL=fl) self.packages[name][tag].append(evr) if evr.N not in self.what[tag]: self.what[tag][evr.N] = [] self.what[tag][evr.N].append((name, evr)) def load_from_repository(self, ms): url_by_synthesis_url = {} global fields def get_synthesis_by_url(url): if url.startswith('file://'): url = url[6:] if url.startswith('/'): medium = ms.by_url[url] return '/var/lib/urpmi/%s/synthesis.hdlist.cz' % medium else: return url + "/media_info/synthesis.hdlist.cz" medium_by_synth = {} synthesis_lists = [] for url in ms.urls: synth = get_synthesis_by_url(url) synthesis_lists.append(synth) url_by_synthesis_url[synth] = url medium_by_synth[synth] = ms.by_url[url] def clear_data(): '''Clears the data of the current package from 'fields' dictionary''' global fields fields = {"provides":[], "requires":[], "obsoletes":[], "suggests":[], "conflicts":[], "info":[], "summary":[]} arches32 = ['i386', 'i486', 'i586', 'i686'] for tag in PackageSet.tags: self.what[tag] = {} #the following code is awful, I know. But it's easy-to-understand and clear. # don't like it - write better and send me :) for synthesis_list in synthesis_lists: try: #print synthesis_list print(_("Processing medium ") + medium_by_synth[synthesis_list] + "...") if(synthesis_list.startswith("http://") or synthesis_list.startswith("ftp://")): r = urlopen(synthesis_list) s = r.read() r.close() elif(synthesis_list.startswith("rsync://")): tmppath = '/tmp/urpm-reposync.synthesis_lists' if (not os.path.exists(tmppath)): os.mkdir(tmppath) filename = tmppath + '/' + os.path.basename(synthesis_list) os.system("rsync --copy-links %s %s 1>/dev/null 2>&1" % (synthesis_list, filename)) r = open(filename) s = r.read() r.close() shutil.rmtree(tmppath) elif(synthesis_list.startswith("/")): #local file if not os.path.exists(synthesis_list): print(_('Could not read synthesis file. (File %s not found)') % synthesis_list) continue r = open(synthesis_list) s = r.read() r.close() res = subprocess.Popen(['gzip', '-d'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = res.communicate(s) clear_data() for line in output[0].split('\n'): if(line == ''): # there can be empty lines continue items = line.split("@") data = [x.strip() for x in items[2:]] fields[items[1]] = data if(items[1] == "info"): rpmname = items[2] size = int(items[4]) nevr = NEVR.from_filename(items[2], E=items[3]) nevr.E = items[3] disttagepoch = '-' if(len(items)>6): disttagepoch = items[6] nevr.DT = items[6] if(len(items)>7): disttagepoch += items[7] nevr.DE = items[7] arch = items[2].split('.')[-1] if arch in arches32 and ARCH in arches32: arch = ARCH in_repo = nevr.N in self.packages new_arch_correct = (arch == ARCH or arch == 'noarch') if in_repo: if nevr.DE == self.packages[nevr.N]['nevr'].DE: ver_newer = rpm.evrCompare(nevr.EVR, self.packages[nevr.N]['nevr'].EVR) == 1 else: ver_newer = (nevr.DE > self.packages[nevr.N]['nevr'].DE) old_arch_correct = (self.packages[nevr.N]['arch'] == ARCH or self.packages[nevr.N]['arch'] == 'noarch') else: ver_newer = None old_arch_correct = None toinst = not in_repo or (not old_arch_correct and new_arch_correct) or \ (ver_newer and old_arch_correct == new_arch_correct) if toinst: #remove old data if nevr.N in self.packages: for tag in PackageSet.tags: for dep in self.packages[nevr.N][tag]: self.what[tag][dep.N].remove((nevr.N, dep)) else: self.packages[nevr.N] = {} self.packages[nevr.N]['nevr'] = nevr self.packages[nevr.N]["arch"] = arch self.packages[nevr.N]["synthesis_list"] = synthesis_list self.packages[nevr.N]["filename"] = rpmname self.packages[nevr.N]["size"] = size for tag in PackageSet.tags: self.packages[nevr.N][tag] = [] for item in fields[tag]: if item == '': continue dep = NEVR.from_depstring(item, DE_toremove=nevr.DE) self.packages[nevr.N][tag].append(dep) if dep.N not in self.what[tag]: self.what[tag][dep.N] = [] self.what[tag][dep.N].append((nevr.N, dep)) self.packages[nevr.N]['medium'] = medium_by_synth[synthesis_list] clear_data() except (HTTPError,URLError): print(_("File can not be processed! Url: ") + synthesis_list) def whattag(self, tag, val): if val.N not in self.what[tag]: return [] found = [] for (pkg, dep) in self.what[tag][val.N]: if dep.satisfies(val): found.append(pkg) return found def whattag_revert(self, tag, val): if val.N not in self.what[tag]: return [] found = [] for (pkg, dep) in self.what[tag][val.N]: if val.satisfies(dep): found.append(pkg) return found def whatprovides(self, val): return self.whattag('provides', val) def whatobsoletes(self, val): return self.whattag_revert('obsoletes', val) def whatrequires(self, val): return self.whattag_revert('requires', val) def whatconflicts(self, val): return self.whattag_revert('conflicts', val) def whatrequires_pkg(self, pkg): found = [] for req in self.packages[pkg]['provides']: found += [(d, req) for d in self.whatrequires(req)] return found