#!/usr/bin/python # -*- coding: utf-8 -*- import os import sys import gettext import argparse import sqlite3 import rpm import re import xml.etree.ElementTree as ET import subprocess import shutil import time import multiprocessing as mp import gc gettext.install('urpm-tools') DB = 'repo.db' NUM_PROCESSES = 4 # number of CPU's (evaluated automatically) RPMFILEMODE_DIRECTORY = 0x4000 RPMFILEMODE_EXECUTE = 0111 def getFileList(path, ext, filelist): extlen = len(ext) dir_list = os.listdir(path) for d in dir_list: if os.path.isdir(path + '/' + d): filelist = getFileList(path + '/' + d, ext, filelist) else: if d[-extlen:].lower() == ext: newpath = os.path.normpath(path + '/' + d) filelist.append(newpath) return filelist def parseargs(): parser = argparse.ArgumentParser(description=_('extract packages metadata' ' from RPM repositories')) parser.add_argument('config', metavar='config', help=_('path to repo-analyze-config.xml')) parser.add_argument('-O', '--no-shared-objects', action='store_true', help=_('don\'t process shared objects')) parser.add_argument('-S', '--no-so-symbols', action='store_true', help=_('don\'t process shared object symbols')) opts = parser.parse_args() return opts def to_string(rpm, tag, val): if type(val) == type([]): if not(val): return None try: return str(val).decode('utf-8') except: print >> sys.stderr, 'Invalid UTF-8 string!\n(%s:\n%s = "%s")\n' % \ (rpm, tag, val) return str(val).decode('utf-8', 'replace') def init_database(conn): conn.execute(""" CREATE TABLE repodirs(id INTEGER PRIMARY KEY NOT NULL, name TEXT UNIQUE, path TEXT, arch TEXT, sources TEXT)""") conn.execute(""" CREATE TABLE repodir_depends(id INTEGER PRIMARY KEY NOT NULL, repodir_id INTEGER, depend_repodir_name TEXT)""") conn.execute(""" CREATE TABLE IF NOT EXISTS package_files(id INTEGER PRIMARY KEY NOT NULL, package_id INTEGER NOT NULL, basename TEXT, path TEXT, size INTEGER, mode INTEGER, link_to_file_id INTEGER, link_to_path TEXT, mark TEXT)""") conn.execute(""" CREATE TABLE package_depend_res(id INTEGER PRIMARY KEY NOT NULL, package_id INTEGER, requires_id INTEGER, provides_id INTEGER, dep_package_id INTEGER)""") conn.execute(""" CREATE TABLE so_needed(id INTEGER PRIMARY KEY NOT NULL, obj_file_id INTEGER, name TEXT)""") conn.execute(""" CREATE TABLE so_needed_res(id INTEGER PRIMARY KEY NOT NULL, so_needed_id INTEGER, dep_obj_file_id INTEGER, res_type INTEGER)""") conn.execute(""" CREATE TABLE obj_symbols(id INTEGER PRIMARY KEY NOT NULL, obj_file_id INTEGER, name TEXT, sym_type INTEGER)""") conn.execute(""" CREATE TABLE obj_symbols_res(id INTEGER PRIMARY KEY NOT NULL, obj_sym_id INTEGER, dep_obj_sym_id INTEGER, res_type INTEGER)""") conn.execute("""PRAGMA synchronous = OFF""") conn.execute("""PRAGMA journal_mode = OFF""") def index_database(conn): print 'Indexing the database...' conn.executescript(""" CREATE INDEX rd_name ON repodirs(name); CREATE INDEX pkg_name ON packages(name); CREATE INDEX pkg_nvra ON packages(nvra); CREATE INDEX pkg_arch ON packages(arch); CREATE INDEX pkg_group ON packages(rpm_group); CREATE INDEX pkg_repodir ON packages(repodir_id); CREATE INDEX pkg_dr_pkg_req ON package_depend_res(package_id, requires_id); CREATE INDEX pkg_dr_pkg_prov ON package_depend_res(dep_package_id, provides_id); CREATE INDEX pkg_file_pkg_id ON package_files(package_id); CREATE INDEX pkg_file_name ON package_files(basename); CREATE INDEX pkg_file_path ON package_files(path); CREATE INDEX pkg_file_mark ON package_files(mark); CREATE INDEX so_needed_obj_id ON so_needed(obj_file_id); CREATE INDEX so_needed_res_sn ON so_needed_res(so_needed_id); CREATE INDEX symbols_obj_name_type ON obj_symbols(obj_file_id, name, sym_type); CREATE INDEX symbols_name_type ON obj_symbols(name, sym_type); CREATE INDEX symbols_res_sym ON obj_symbols_res(obj_sym_id); """) dep_tables = ['rpm_requires', 'rpm_provides', 'rpm_conflicts', 'rpm_obsoletes'] for table in dep_tables: conn.execute('CREATE INDEX %(tbl)s_pkg ON %(tbl)s(package_id)' % {'tbl': table}) conn.execute('CREATE INDEX %(tbl)s_name ON %(tbl)s(name)' % {'tbl': table}) conn.commit() def add_repodir(xrepodir, conn): dbc = conn.cursor() dbc.execute(""" INSERT INTO repodirs (name, path, sources) VALUES (?, ?, ?) """, [xrepodir.get('name'), xrepodir.get('path'), xrepodir.get('sources')]) repodir_id = dbc.lastrowid for depend in xrepodir.findall('dependency'): dbc.execute(""" INSERT INTO repodir_depends(repodir_id, depend_repodir_name) VALUES (?, ?) """, [repodir_id, depend.text.strip()]) conn.commit() return repodir_id def get_build_archs(xrepodir, xrepodirs): build_archs = [] for depend in xrepodir.findall('dependency'): arch_sign = '$arch' depend_repo = depend.text.strip() spos = depend_repo.find(arch_sign) if spos >= 0: drepo_prefix = depend_repo[:spos] drepo_postfix = depend_repo[spos + len(arch_sign):] for xrepodir in xrepodirs.findall('dir'): repo_name = xrepodir.get('name') if repo_name.startswith(drepo_prefix) and \ repo_name.endswith(drepo_postfix): repo_arch = repo_name[len(drepo_prefix) : len(repo_name) - len(drepo_postfix)] if repo_arch == 'SRPMS': continue if repo_arch not in build_archs: build_archs.append(repo_arch) if build_archs: return build_archs return [None] def get_rpm_header(rpm_ts, pkg): hdr = None try: fdno = os.open(pkg, os.O_RDONLY) except OSError as exc: raise Exception('Unable to open file %s.\n%s' % (pkg, exc)) try: hdr = rpm_ts.hdrFromFdno(fdno) except rpm.error as exc: raise Exception('Unable to read RPM header for %s\n%s.' % (pkg, exc)) finally: os.close(fdno) return hdr def generate_new_id(generator, gen_lock): gen_lock.acquire() last_id = generator.value last_id += 1 generator.value = last_id gen_lock.release() return last_id FILE_REC_ID_IDX = 0 FILE_REC_PATH_IDX = 3 FILE_REC_LINK_IDX = 6 FILE_REC_MARK_IDX = 7 def register_object(data, object_file_record, temp_dir, no_so_symbols): so_needed = data['so_needed'] obj_symbols = data['obj_symbols'] obj_id = object_file_record[0] obj_file_path = object_file_record[3] temp_obj_file = os.path.join(temp_dir, obj_file_path.lstrip('/')) target_file = None file_mark = None od_out = '' nmundef_out = '' nmdef_out = '' if os.path.islink(temp_obj_file): target_file = os.path.join(os.path.dirname(obj_file_path), os.readlink(temp_obj_file)) file_mark = 'link' elif not os.path.exists(temp_obj_file): file_mark = 'not-found' else: p = subprocess.Popen(['objdump', '-p', temp_obj_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) od_out = p.communicate()[0] if p.returncode != 0: file_mark = 'invalid-format' elif not(no_so_symbols): p = subprocess.Popen(['nm', '-p', '-D', '--undefined-only', temp_obj_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) nmundef_out = p.communicate()[0] if p.returncode != 0: file_mark = 'no-symbols' else: p = subprocess.Popen(['nm', '-p', '-D', '--defined-only', temp_obj_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) nmdef_out = p.communicate()[0] if p.returncode != 0: file_mark = 'no-symbols' else: file_mark = 'so' object_file_record[FILE_REC_LINK_IDX] = target_file object_file_record[FILE_REC_MARK_IDX] = file_mark dynsection = False for odline in od_out.split('\n'): odls = odline.strip() if odls == '': dynsection = False elif odls == 'Динамический раздел:' or odls == 'Dynamic section:': dynsection = True elif dynsection: needrem = re.match(r'\s+NEEDED\s+(.*)', odline) if needrem: so_needed.append([obj_id, needrem.group(1)]) for symline in nmundef_out.split('\n'): smre = re.match(r'^.([\S]*)\s+(\w)\s(.*)$', symline) if smre: if smre.group(2) in ['v', 'w']: continue symname = smre.group(3) obj_symbols.append([obj_id, symname, 0]) for symline in nmdef_out.split('\n'): smre = re.match(r'^.([\S]*)\s+(\w)\s(.*)$', symline) if smre: symname = smre.group(3) obj_symbols.append([obj_id, symname, 1]) return obj_id def extract_files(pkg, files_list, obj_so_files_idx, temp_dir): #local_pkg = getLocalPackageName(pkg) local_pkg = pkg filelist = os.path.join(temp_dir, 'files.lst') with open(filelist, 'w') as f: for i in obj_so_files_idx: f.write('.' + files_list[i][FILE_REC_PATH_IDX] + '\n') rpm_cpio_cmd = 'rpm2cpio ' + local_pkg + ' | cpio -ivdu -E ' + filelist p = subprocess.Popen(rpm_cpio_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=temp_dir, shell=True) output = p.communicate()[0] if p.returncode != 0: print >> sys.stderr, 'Couldn\'t extract files from package %s.' \ '\n\t%s' % (pkg, output) return False return True def process_package_worker(num, queue_in, generator, gen_lock, db_struct, repodir_id, build_archs, temp_dir, no_shared_objects, no_so_symbols): rpm_ts = rpm.TransactionSet() rpm_ts.setVSFlags(~(rpm.RPMVSF_NEEDPAYLOAD)) data = {} data['packages'] = [] for table in db_struct['dep_tables']: data[table] = [] data['package_files'] = [] data['so_needed'] = [] data['obj_symbols'] = [] while True: job = queue_in.get() if job == None: break (pkg, ) = job pkg_id = generate_new_id(generator, gen_lock) hdr = get_rpm_header(rpm_ts, pkg) package_values = [] package_values.append(pkg_id) for tag in db_struct['packages_tags']: hval = hdr[tag] package_values.append( sqlite3.Binary(hval) if tag in db_struct['blob_tags'] else \ to_string(pkg, tag, hval) if type(hval) in [type([]), type('')] else \ hval ) package_values.append(repodir_id) package_values.append(pkg) package_values.append(None) data['packages'].append(package_values) for table in db_struct['dep_tables']: table_data = data[table] rpref = 'RPMTAG_' + table[4 : -1].upper() # rpm_requires (dep_name, dep_flags, dep_version) = \ (hdr[rpref + 'NAME'], hdr[rpref + 'FLAGS'], hdr[rpref + 'VERSION']) for i in xrange(0, len(hdr[rpref + 'NAME'])): for build_arch in build_archs: table_data.append([dep_name[i].decode('utf-8'), dep_flags[i], dep_version[i], pkg_id, build_arch]) # fonts-ttf-decoratives-1.3-27-rosa.lts2012.0.noarch.rpm provides font(derdämonschriftkegel) (pkg_file_paths, pkg_file_names, pkg_file_sizes, pkg_file_modes) = \ (hdr['RPMTAG_FILEPATHS'], hdr['RPMTAG_BASENAMES'], hdr['RPMTAG_FILESIZES'], hdr['RPMTAG_FILEMODES']) files_list = data['package_files'] files_dirs = {} obj_so_files_idx = [] for i in xrange(0, len(pkg_file_paths)): file_name = pkg_file_names[i] file_path = pkg_file_paths[i] pkg_file_id = generate_new_id(generator, gen_lock) files_list.append([pkg_file_id, #FILE_REC_ID_IDX = 0 pkg_id, file_name.decode('utf-8'), file_path.decode('utf-8'), #FILE_REC_PATH_IDX = 3 pkg_file_sizes[i], pkg_file_modes[i], None, #link_to_path FILE_REC_LINK_IDX = 6 None #mark FILE_REC_LINK_IDX = 7 ]) if pkg_file_modes[i] & RPMFILEMODE_DIRECTORY != 0: files_dirs[file_path] = False continue dir_name = os.path.dirname(file_path) if dir_name not in files_dirs: files_dirs[dir_name] = True if no_shared_objects: continue if os.path.splitext(file_name)[1] in \ ['.debug', '.xz', '.conf', '.py', '.c', '.h', '.hpp', '.png', '.cc', '.cpp', '.sh', '.java', '.pl', '.patch', '.desktop']: continue if file_path.startswith('/usr/lib/debug/.build-id') or \ file_path.endswith('/ld.so.cache'): continue if re.search(r'\.so($|\.)', file_name) or \ (pkg_file_modes[i] & RPMFILEMODE_EXECUTE) != 0: obj_so_files_idx.append(len(files_list) - 1) for fdir in sorted(files_dirs.keys()): if files_dirs[fdir]: # Add parent directories as implicit files # TODO: recursive processing? pkg_file_id = generate_new_id(generator, gen_lock) files_list.append([pkg_file_id, #FILE_REC_ID_IDX = 0 pkg_id, os.path.basename(fdir), fdir, #FILE_REC_PATH_IDX = 3 0, -1, # special mode None, #link_to_path FILE_REC_LINK_IDX = 6 None #mark FILE_REC_LINK_IDX = 7 ]) if obj_so_files_idx: pkg_temp_dir = os.path.join(temp_dir, os.path.basename(pkg)) os.makedirs(pkg_temp_dir) if extract_files(pkg, files_list, obj_so_files_idx, pkg_temp_dir): for i in obj_so_files_idx: register_object(data, files_list[i], pkg_temp_dir, no_so_symbols) shutil.rmtree(pkg_temp_dir, True) queue_in.task_done() conn = sqlite3.connect(DB, timeout=30) conn.executemany(""" INSERT INTO packages (%s) VALUES (%s)""" % (db_struct['packages_field_names'], db_struct['packages_values_template']), data['packages']) for table in db_struct['dep_tables']: conn.executemany(""" INSERT INTO %s (name, flags, version, package_id, build_arch) VALUES (?, ?, ?, ?, ?)""" % table, data[table]) conn.executemany(""" INSERT INTO package_files (id, package_id, basename, path, size, mode, link_to_path, mark) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", data['package_files']) conn.executemany(""" INSERT INTO so_needed(obj_file_id, name) VALUES(?, ?) """, data['so_needed']) conn.executemany(""" INSERT INTO obj_symbols(obj_file_id, name, sym_type) VALUES(?, ?, ?) """, data['obj_symbols']) conn.commit() queue_in.task_done() generator_value = 0 def process_repodir(repodir_path, repodir_id, build_archs, conn, db_struct, tempdir, no_shared_objects, no_so_symbols): rpm_list = [] rpm_list = getFileList(repodir_path, '.rpm', rpm_list) if not rpm_list: return print repodir_path, ': ', len(rpm_list) if not db_struct.get('defined'): rpm_ts = rpm.TransactionSet() rpm_ts.setVSFlags(~(rpm.RPMVSF_NEEDPAYLOAD)) # ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD)) hdr = get_rpm_header(rpm_ts, rpm_list[0]) packages_extra_fields = {'repodir_id': 'INTEGER', 'rpm_filepath': 'TEXT', 'sourcerpm_package': 'TEXT'} file_tags_re = r'^RPMTAG_(BASENAMES|FILE[\w\d]+)' dir_tags_re = r'^RPMTAG_DIR(INDEXES|NAMES)' changelog_tags_re = r'^RPMTAG_CHANGELOG\w+' trigger_tags_re = r'^RPMTAG_TRIGGER\w+' datetime_tags = ['RPMTAG_PACKAGETIME', 'RPMTAG_RPMLIBTIMESTAMP', ] db_struct['blob_tags'] = ['RPMTAG_DSAHEADER', 'RPMTAG_HEADERIMMUTABLE', 'RPMTAG_PKGID', 'RPMTAG_SIGMD5'] reserved_field_names = ['id', 'group'] skip_tags_re = '^RPMTAG_(C|D|E|N|P|R|V|HEADERIMMUTABLE)$' #C - CONFLICTNAME, D - DISTEPOCH, E - EPOCH, N - NAME, O - OBSOLETENAME #P - PROVIDENAME, R - RELEASE, V - VERSION types = {"" : "TEXT", "": "INTEGER", "": "TEXT", "": "TEXT"} dep_tags_re = r'^RPMTAG_(CONFLICT|OBSOLETE|PROVIDE|REQUIRE)\w+' db_struct['dep_tables'] = ['rpm_requires', 'rpm_provides', 'rpm_conflicts', 'rpm_obsoletes'] packages_field_names = 'id, ' packages_values_template = '?,' packages_tags = [] packages_fields = '' rpmtags = [str(t) for t in dir(rpm) if t.startswith('RPMTAG_') ] for tag in rpmtags: if re.match(file_tags_re, tag) or re.match(dir_tags_re, tag) or \ re.match(changelog_tags_re, tag) or \ re.match(skip_tags_re, tag) or re.match(trigger_tags_re, tag) or \ re.match(dep_tags_re, tag): continue sqltype = "TIMESTAMP" if tag in datetime_tags else \ "BLOB" if tag in db_struct['blob_tags'] else \ types[str(type(hdr[tag]))] fieldname = tag.replace('RPMTAG_', '').lower() if fieldname in reserved_field_names: fieldname = 'rpm_' + fieldname packages_tags.append(tag) packages_field_names += fieldname + ', ' packages_values_template += '?, ' packages_fields += fieldname + ' ' + sqltype + ', ' nef = 0 for extra_field in sorted(packages_extra_fields.keys()): packages_field_names += (', ' if nef > 0 else '') + extra_field packages_values_template += (', ' if nef > 0 else '') + '?' packages_fields += (', ' if nef > 0 else '') + extra_field + ' ' + \ packages_extra_fields[extra_field] nef += 1 conn.execute(""" CREATE TABLE IF NOT EXISTS packages(id INTEGER PRIMARY KEY NOT NULL, %s) """ % (packages_fields)) for table in db_struct['dep_tables']: conn.execute(""" CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY NOT NULL, name TEXT, flags INTEGER, version TEXT, build_arch TEXT, package_id INTEGER NOT NULL)""" % (table)) conn.commit() db_struct['packages_tags'] = packages_tags db_struct['packages_field_names'] = packages_field_names db_struct['packages_values_template'] = packages_values_template db_struct['defined'] = True queue_in = mp.JoinableQueue() for pkg in rpm_list: queue_in.put((pkg, )) for i in xrange(NUM_PROCESSES): queue_in.put(None) # run workers gc.collect() # Trying to prevent Exception AssertionError: AssertionError() in ignored time.sleep(1) gc.disable() global generator_value id_generator = mp.Value('i', generator_value) generator_lock = mp.Lock() workers = [] for i in xrange(NUM_PROCESSES): worker = mp.Process(target = process_package_worker, args = (i, queue_in, id_generator, generator_lock, db_struct, repodir_id, build_archs, tempdir, no_shared_objects, no_so_symbols)) workers.append(worker) worker.start() queue_in.join() gc.enable() generator_value = id_generator.value def main(args): if os.path.exists(DB): os.unlink(DB) if hasattr(os, "sysconf"): if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): nproc = os.sysconf("SC_NPROCESSORS_ONLN") if isinstance(nproc, int) and nproc > 0: NUM_PROCESSES = nproc conn = sqlite3.connect(DB) init_database(conn) conn.commit() options = parseargs() parser = ET.XMLParser() tree = ET.parse(options.config, parser=parser) config_root = tree.getroot() tempdir = '/dev/shm/rt-tmp/' shutil.rmtree(tempdir, True) os.mkdir(tempdir) rpm_db_struct = {} for xrepodir in config_root.find('repositories').findall('dir'): repodir_id = add_repodir(xrepodir, conn) build_archs = [None] if xrepodir.get('sources') != '.' else \ get_build_archs(xrepodir, config_root.find('repositories')) process_repodir(xrepodir.get('path'), repodir_id, build_archs, conn, rpm_db_struct, tempdir, options.no_shared_objects, options.no_so_symbols) shutil.rmtree(tempdir, True) index_database(conn) if __name__ == "__main__": main(sys.argv)