urpm-tools/urpm-repodiff.py
2012-09-05 15:45:44 +04:00

1379 lines
47 KiB
Python
Executable file

#!/usr/bin/python
'''
" Repodiff utility for finding differences between different repositories
"
" The tool downloads, unpacks and parses synthesis.hdlist.cz and
" changelog.xml.lzma to genererate lists of newly added packages,
" removed from new repository packages and updated packages.
" The tool outputs data to standart output or to file.
" It can show if a removed packages is obsoleted by some package
" in new repositories. Also the tool can output data in format of
" HTML table.
"
" REQUIREMENTS
" ============
" - urpmi
" - python-2.7
" - lzma
" - gzip
" - libxml2 python library
" - rpm python library
"
" Copyright (C) 2012 ROSA Laboratory.
" Written by Vladimir Testov <vladimir.testov@rosalab.ru>
"
" This program is free software: you can redistribute it and/or modify
" it under the terms of the GNU General Public License or the GNU Lesser
" General Public License as published by the Free Software Foundation,
" either version 2 of the Licenses, or (at your option) any later version.
"
" This program is distributed in the hope that it will be useful,
" but WITHOUT ANY WARRANTY; without even the implied warranty of
" MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
" GNU General Public License for more details.
"
" You should have received a copy of the GNU General Public License
" and the GNU Lesser General Public License along with this program.
" If not, see <http://www.gnu.org/licenses/>.
'''
import argparse
import urllib
import tempfile
import os
import subprocess
import re
import libxml2
import sys
from datetime import date
import rpm
import shutil
import urllib2
import urpmmisc
import gettext
gettext.install('urpm-tools')
old_dir = "old"
new_dir = "new"
htmlname = "repodiff.html"
synthtags = ["provides", "requires", "obsoletes", "conflicts", "suggests",
"summary", "info"]
minus_check = re.compile('-')
re_search_unver = re.compile("([^\[\]]+)[\[\]]")
re_search_verrel = re.compile("\[(== |> |< |>= |<= )([\{\}+=0-9a-zA-Z_\.]*:)?([[\{\}+=0-9a-zA-Z_\.]+)(-[[\{\}+=0-9a-zA-Z_\.]+)?([^\[\]]*)\]$")
synthesis_arch = "synthesis.hdlist.cz"
synthesis_arch_renamed = "synthesis.hdlist.gz"
synthesis_file = "synthesis.hdlist"
changelog_arch = "changelog.xml.lzma"
changelog_file = "changelog.xml"
default_output = "sys.stdout"
timeout = 5
def ParseCommandLine():
"""Parse arguments.
Parse arguments from command line.
Return these arguments.
"""
parser = argparse.ArgumentParser(
description=_("Tool for comparing sets of repositories."))
parser.add_argument("--old", "-o", action="store", nargs='+', required="True",
metavar="OLD_REPO", help=_("URL or PATH to old repositories"))
parser.add_argument("--new", "-n", action="store", nargs='+', required="True",
metavar="NEW_REPO", help=_("URL or PATH to new repositories"))
parser.add_argument("--size", "-s", action="store_true",
help=_("Show differences in package sizes."))
parser.add_argument("--simple", action="store_false",
help=_("Simple output format."))
parser.add_argument("--quiet", "-q", action="store_false",
help=_("Hide service messages."))
parser.add_argument("--changelog", "-c", action="store_true",
help=_("Show changelog difference."))
parser.add_argument("--html", action="store_true",
help=_("Output in HTML format, if --output is not present\
\"%s\" will be created in current directory. \
--size, --simple and --changelog options are ignored.") % htmlname)
parser.add_argument("--output", "-out", action="store", nargs=1, default='',
metavar="OUTPUT_FILE", help=_("Change standart output to \"OUTPUT_FILE\"."))
return parser.parse_args()
def exit_proc(arg):
"""
Remove trash.
"""
err_tmpdir = arg.temp_dir
err_output = arg.output
if err_output != None:
err_output.close()
if os.path.isdir(err_tmpdir):
shutil.rmtree(err_tmpdir)
exit(0)
def CheckURL(url, arg):
"""URL check.
Check that URL is gettable.
"""
try:
urllib2.urlopen(url, None, timeout)
except:
print _("Error: URL to repository \"%s\" is incorrect") % url
exit_proc(arg)
def CheckArgs(urlpath, arg):
"""Trivial checks.
Check that url or path is correct.
"""
if (urlpath.startswith("http://") or urlpath.startswith("ftp://")):
if not urlpath.endswith('/'):
urlpath = urlpath + '/'
tmp_url = urlpath + "media_info/"
CheckURL(tmp_url, arg)
elif (os.path.isdir(urlpath)) or urlpath.startswith("file://"):
if urlpath.startswith("file://./"):
urlpath = urlpath[7:]
else:
urlpath = urlpath[6:]
if not urlpath.endswith('/'):
urlpath = urlpath + '/'
urlpath = urlpath + "media_info/"
if not os.path.isdir(urlpath):
print _("Error: directory %s does not exist") % urlpath
exit_proc(arg)
else:
(e1,e2,urltmp) = urpmmisc.GetUrlFromRepoName(urlpath)
if (urltmp):
if not urltmp.endswith('/'):
urltmp = urltmp + '/'
urlpath = urltmp + "media_info/"
CheckURL(urlpath, arg)
else:
print _("Error: \"%s\" is not correct url, path or name of repository") % urlpath
exit_proc(arg)
return urlpath
def CheckOutput(arg):
"""Check output file.
Check if the file can be created and redirect standart output to this file.
"""
file_output = arg.output
ifhtml = arg.html
if (file_output == default_output):
if(ifhtml):
try:
arg.output = open(htmlname, "w")
except:
print _("Error: Cannot open %s for writing.") % htmlname
exit_proc(arg)
return
else:
arg.output = sys.stdout
return
if(file_output != ''):
if(os.path.isfile(file_output)):
print _("Error: File %s already exists") % file_output
arg.output = None
exit_proc(arg)
else:
dirname = os.path.dirname(file_output)
if(dirname == '') or (os.path.exists(dirname)):
try:
arg.output = open(file_output, "w")
except IOError:
print _("Error: File %s cannot be created") % file_output
arg.output = None
exit_proc(arg)
else:
print _("Error: Path %s does not exist.") % dirname
arg.output = None
exit_proc(arg)
def CheckParam(arg):
"""Check parameters.
Ignore some parameters in HTML-case.
"""
if arg.html:
arg.size = 0
arg.simple = 0
arg.changelog = 0
def GetFile(urlpath, filename, localdir, arg):
"""Donwload archive.
"""
ifnotquiet = arg.quiet
if not os.path.isdir(localdir):
os.makedirs(os.path.realpath(localdir))
if ifnotquiet:
print (_("getting file %s from ") % filename) + "\n " + urlpath + filename
if os.path.isdir(urlpath):
try:
shutil.copyfile(urlpath + filename, localdir + filename)
except:
print _("Error: file %s was not copied") % filename
exit_proc(arg)
else:
try:
file_from = urllib2.urlopen(urllib2.Request(urlpath + filename), None, timeout)
file_to = open(localdir + filename, "w")
shutil.copyfileobj(file_from, file_to)
except:
print _("Error: file %(from)s was not downloaded to %(to)s") %{"from": urlpath + filename, "to": localdir + filename}
exit_proc(arg)
file_from.close()
file_to.close()
def GetFiles(arg):
"""Get all needed files.
"""
ifchangelog = arg.changelog
file_dir = []
file_name = []
file_path = []
for i in range(len(arg.old)):
file_name.append(synthesis_arch)
file_dir.append(arg.temp_old[i])
file_path.append(arg.old[i] + "media_info/")
if ifchangelog:
file_name.append(changelog_arch)
file_dir.append(arg.temp_old[i])
file_path.append(arg.old[i] + "media_info/")
for i in range(len(arg.new)):
file_name.append(synthesis_arch)
file_dir.append(arg.temp_new[i])
file_path.append(arg.new[i] + "media_info/")
if ifchangelog:
file_name.append(changelog_arch)
file_dir.append(arg.temp_new[i])
file_path.append(arg.new[i] + "media_info/")
for i in range(len(file_name)):
GetFile(file_path[i], file_name[i], file_dir[i], arg)
def RenameSynthFile(localdir, arg):
"""Rename.
Rename Synthesis file so zgip can understand format.
"""
ifnotquiet = arg.quiet
if not os.path.isfile(localdir + synthesis_arch):
print _("Error: file not found: ") + localdir + synthesis_arch
exit_proc(arg)
try:
os.rename(localdir + synthesis_arch, localdir + synthesis_arch_renamed)
except OSError:
print _("Error: cannot rename file %(from)s to %(to)s") % {"from": synthesis_arch, "to": synthesis_arch_renamed}
exit_proc(arg)
if not os.path.isfile(localdir + synthesis_arch_renamed):
print _("Error: file %s is missing.") % (localdir + synthesis_arch_renamed)
exit_proc(arg)
else:
if ifnotquiet:
print _("file %(from)s was renamed to %(to)s") % {"from": synthesis_arch, "to": synthesis_arch_renamed}
def UnpackFiles(files_dir, ifchangelog, ifnotquiet):
"""Unpack.
Unpack needed files in selected directory.
"""
if ifchangelog:
if ifnotquiet:
print _("unpacking file ") + changelog_arch
subprocess.call(["lzma", "-df", files_dir + changelog_arch])
if ifnotquiet:
print _("unpacking file ") + synthesis_arch_renamed
subprocess.call(["gzip", "-df", files_dir + synthesis_arch_renamed])
def ParseVersion(names_list):
"""Parse version info is present.
Parse version information from the field. e.g. provided_name[>= 1.2.3-4.5.6]
is parsed to (provided_name, sign, (epoch, version, release))
"""
new_names_list = []
for name in names_list:
match = re_search_unver.match(name)
if match:
tmp_entry = match.group(1)
else:
tmp_entry = name
match = re_search_verrel.search(name)
if match:
sign = match.group(1)[:-1]
epoch = match.group(2)
if epoch:
epoch = epoch[:-1]
else:
epoch = ''
version = match.group(3)
release = match.group(4)
if release:
release = release[1:]
else:
release = ''
verrel = (epoch, version, release)
else:
sign = ''
verrel = ('','','')
new_names_list.append((tmp_entry, sign, verrel))
return new_names_list
def ParseSynthesis(synthfile, pkgdict, arg):
"""Collect info about packages.
Parse synthesis.hdlist file (or add new entries to pkgdict).
pkgdict is a dictionary with format:
pkgdict[name]=(verrel,(s0,s1,s2))
where:
name - is package name parsed from package filename
verrel - is tuple (version, release, epoch)
s0[] - is package info
s1 - is package summary
s2[] - is list of obsoleted packages
"""
ifnotquiet = arg.quiet
if not os.path.isfile(synthfile):
print _("Error: Synthesis file %s was not found.") % synthfile
exit_proc(arg)
if ifnotquiet:
print _("Parsing synthesis")
try:
synth = open(synthfile)
tmp = ['', '', '']
for synthline in synth:
if synthline.endswith('\n'):
synthline = synthline[:-1]
tmpline = synthline.split('@')
tag = tmpline[1]
if tag == synthtags[2]:
tmp[2] = tmpline[2:]
elif tag == synthtags[5]:
tmp[1] = '@'.join(tmpline[2:])
elif tag == synthtags[6]:
tmp[0] = tmpline[2:]
disttagepoch = ChkTagEpoch(tmp[0])
tmp[2] = ParseVersion(tmp[2])
(name, version, release) = RPMNameFilter(tmp[0][0], disttagepoch) #disttag + distepoch
verrel = (version, release, tmp[0][1])
if(not name in pkgdict):
pkgdict[name]=(verrel, (tmp[0], tmp[1], tmp[2]))
elif(compare_versions(pkgdict[name][0], verrel) == -1):
pkgdict[name]=(verrel, (tmp[0], tmp[1], tmp[2]))
tmp = ['', '', '']
synth.close()
except IOError:
print _("Error: Failed to open synthesis file ") + synthfile
exit_proc(arg)
def ChkDist(disttag, distepoch):
"""No minus in tag and epoch.
Trivial check that tag and epoch hasn't got '-' in their name
"""
if minus_check.search(disttag) or minus_check.search(distepoch):
print _("REPODIFF-Warning: strange format of <disttag> or <distepoch>: ") +\
disttag + distepoch
def ChkTagEpoch(i):
"""No minus in tag and epoch.
Trivial check that tag and epoch hasn't got '-' in their name
"""
if len(i) == 4:
return '-'
elif len(i) == 5:
disttag = i[4]
distepoch = ''
ChkDist(disttag, distepoch)
return disttag + distepoch
elif len(i) == 6:
disttag = i[4]
distepoch = i[5]
ChkDist(disttag, distepoch)
return disttag + distepoch
else:
print _("REPODIFF-Warning: strange <info>: ") + str(i)
def RPMNameFilter(rpmname, disttagepoch):
"""Parse name and verrel.
Function that parses name, version and release of a package.
"""
string = rpmname.split('-')
lastpart = string.pop()
tmp = lastpart.split('.')
tmp.pop()
lastpart = '.'.join(tmp)
if (lastpart[0].isdigit() or (not lastpart.startswith(disttagepoch))) and\
(not lastpart.isdigit()):
name = '-'.join(string[:-1])
ver = string[-1]
rel = lastpart
else:
name = '-'.join(string[:-2])
ver = string[-2]
rel = string[-1]
return (name, ver, rel)
def compare_versions(first_entry, second_entry):
"""Compare two verrel tuples.
dict_entry and comp_entry are verrel tuples
verrel = (version, release, epoch).
Return 1 if the first argument is higher.
0 if they are equivalent.
-1 if the second argument is higher.
"""
(version1, release1, first_epoch) = first_entry
(version2, release2, second_epoch) = second_entry
return(rpm.labelCompare((first_epoch, version1, release1),
(second_epoch, version2, release2)))
def ParsePackage(arg):
"""Processing files, parsing synthesis, getting pkgdict.
pkgdict is a dictionary with format:
pkgdict[name]=(verrel,(s0,s1,s2))
where:
name - is package name parsed from package filename
verrel - is tuple (version, release, epoch)
s0[] - is package info
s1 - is package summary
s2[] - is list of obsoleted packages
"""
ifchangelog = arg.changelog
ifnotquiet = arg.quiet
pkgdict_old = {}
for directory in arg.temp_old:
RenameSynthFile(directory, arg)
UnpackFiles(directory, ifchangelog, ifnotquiet)
ParseSynthesis(directory + synthesis_file, pkgdict_old, arg)
pkgdict_new = {}
for directory in arg.temp_new:
RenameSynthFile(directory, arg)
UnpackFiles(directory, ifchangelog, ifnotquiet)
ParseSynthesis(directory + synthesis_file, pkgdict_new, arg)
return pkgdict_old, pkgdict_new
def CreateDicts(dict_old, dict_new):
"""Creating dictionaries.
Creating dictionaries for new, updated and removed(deleted) packages
from two dictionaries: old and new, for old and new repositories.
dict_old, dict_new are dictionaries with format:
pkgdict[name]=(verrel,(s0,s1,s2))
where:
name - is package name parsed from package filename
verrel - is tuple (version, release, epoch)
s0[] - is package info
s1 - is package summary
s2[] - is list of obsoleted packages
dict_new_packages and dict_del_packages have the same format.
dict_upd_packages has format:
dict_upd_packages[name]=((verrel_old,(so0,so1,so2)),
(verrel_new,(sn0,sn1,sn2)),ifdowngraded)
or
dict_upd_packages[name]=(dict_old[name],dict_new[name],ifdowngraded)
"""
dict_new_packages = {}
dict_del_packages = {}
dict_upd_packages = {}
for name in dict_new:
if(name in dict_old): #updated or downgraded
compare_result = compare_versions(dict_new[name][0],
dict_old[name][0])
if(compare_result > 0): #updated
dict_upd_packages[name] = (dict_old[name], dict_new[name], 0)
elif(compare_result < 0): #downgraded ?
dict_upd_packages[name] = (dict_old[name], dict_new[name], 1)
else: #new
dict_new_packages[name] = dict_new[name]
for name in dict_old:
if(not name in dict_new): #removed
dict_del_packages[name] = dict_old[name]
return (dict_new_packages, dict_del_packages, dict_upd_packages)
def ProcessNewPackages(dict_new_packages, file_output):
"""Processing newly added packages.
dict_new_packages[name]=(verrel,(s0,s1,s2))
where:
name - is package name parsed from package filename
verrel - is tuple (version, release, epoch)
s0[] - is package info
s1 - is package summary
s2[] - is list of obsoleted packages
"""
sorted_list = sorted(dict_new_packages)
for name in sorted_list:
file_output.write(_("New package: ") + dict_new_packages[name][1][0][0] +\
"\n " + dict_new_packages[name][1][1] + "\n\n")
def GenerateDictObsoleted(dict_new, ifnotquiet):
"""Generate Dictionary of obsoleted packages.
pkgdict[name]=(verrel,(s0,s1,s2))
where:
name - is package name parsed from package filename
verrel - is tuple (version, release, epoch)
s0[] - package info
s1 - package summary
s2[] - list of packages obsoleted by current package
"""
if ifnotquiet:
print _("Generating obsoleted list.")
obsoleted_by = {}
for name in dict_new:
for (obsolete, sign, verrel) in dict_new[name][1][2]:
if(not obsolete in obsoleted_by):
obsoleted_by[obsolete] = []
obsoleted_by[obsolete].append((dict_new[name][1][0][0], sign, verrel))
return obsoleted_by
def compare_verrel(verrel1, sign, verrel2):
if (sign == ''):
return 1
(e1, v1, r1) = verrel1
(e2, v2, r2) = verrel2
# checks
if (v2 == '') or (v1 == ''):
return 1
if (e1 == '') or (e2 == ''):
e1 = '0'
e2 = '0'
if (r1 == '') or (r2 == ''):
r1 = '0'
r2 = '0'
# compare
compare = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
if (sign == "=="):
if (compare == 0):
return 1
elif (sign == ">"):
if (compare == 1):
return 1
elif (sign == "<"):
if (compare == -1):
return 1
elif (sign == ">="):
if (compare > -1):
return 1
elif (sign == "<="):
if (compare < 1):
return 1
return 0
def ProcessDelPackages(dict_del_packages, dict_obsoleted, file_output):
"""Process deleted packages.
Printing every deleted package. Show if package is obsoleted.
pkgdict[name]=(verrel,(s0,s1,s2))
where:
name - is package name parsed from package filename
verrel - is tuple (version, release, epoch)
s0[] - is package info
s1 - is package summary
s2[] - is list of obsoleted packages
dict_obsoleted is dictionary
dict_obsoleted[name]=[obs1, ...]
"""
sorted_list = sorted(dict_del_packages)
for name in sorted_list:
file_output.write(_("Removed package: ") + dict_del_packages[name][1][0][0] + '\n')
if (name in dict_obsoleted):
tmp_list = []
for (obsolete, sign, verrel) in dict_obsoleted[name]:
if (compare_verrel(dict_del_packages[name][0], sign, verrel)):
tmp_list.append(obsolete)
sorted_obsolete = sorted(tmp_list)
for obs_package_name in sorted_obsolete:
file_output.write(_(" Obsoleted by ") + obs_package_name + '\n')
def ParseLogfile(dict_log, logfile, dict_upd_packages, mode, arg):
"""Parse Changelog.
mode == 0 - for old changelog: we search only for 1st entry in changelog
mode == 1 - for new changelog: we collect entries from changelog untill
we find remembered entry from changelog
Parse changelog.xml to compare changes between updated packages.
dict_log - is dictionary with format:
dict_log[name] =
[(verrel, (time,name,text)), (verrel,[(time,name,text),...])]
dict_upd_packages[name] = [old_pkg[name],new_pkg[name],ifdowngraded]
or dict_upd_packages[name] =
[(verler,(s0,s1,s2)),(verrel,(s0,s1,s2)),ifdowngraded]
"""
ifnotquiet = arg.quiet
if ifnotquiet:
print _("Reading changelog")
if not os.path.isfile(logfile):
print _("Error: Can't find changelog ") + logfile
exit_proc(arg)
doc = libxml2.parseFile(logfile)
if (not doc):
print _("Error: Can't read changelog ") + logfile + "."
exit_proc(arg)
root = doc.children
if root.name != "media_info":
print _("Error: Wrong changelog.")
doc.freeDoc()
exit_proc(arg)
tag_changelog = root.children
while(tag_changelog):
if(tag_changelog.name != "changelogs"):
tag_changelog = tag_changelog.next
continue
tag_property = tag_changelog.properties
pkgname = ''
disttag = ''
distepoch = ''
while(tag_property):
if (tag_property.name == "fn"):
pkgname = tag_property.content
elif (tag_property.name == "disttag"):
disttag = tag_property.content
elif (tag_property.name == "distepoch"):
distepoch = tag_property.content
tag_property = tag_property.next
if (pkgname == ''):
print _("Error: Corrupted changelog")
doc.freeDoc()
exit_proc(arg)
disttagepoch = disttag + distepoch
if (disttagepoch == ''):
disttagepoch = '-'
(result_key, version, release) = RPMNameFilter(pkgname, disttagepoch)
verrel = (version, release, "-1")
# skip entry if it wasn't updated
if result_key not in dict_upd_packages:
tag_changelog = tag_changelog.next
continue
ifdowngraded = dict_upd_packages[result_key][2]
# skip entry if it's name is not in dictionary
if(dict_upd_packages[result_key][mode][1][0][0] != pkgname):
tag_changelog = tag_changelog.next
continue
# skip entry if it has been found already with appropriate version
if(result_key in dict_log) and (dict_log[result_key][mode]):
tag_changelog = tag_changelog.next
continue
# if "old" repository do not have changelog of the package
if(mode == 1) and (not result_key in dict_log):
dict_log[result_key] = []
dict_log[result_key].append([])
dict_log[result_key].append([])
dict_log[result_key][0] = (verrel, [])
log_current = tag_changelog.children
result_changelog = []
while(log_current):
if(log_current.name != "log"):
log_current = log_current.next
continue
if(log_current.properties.name == "time"):
entry_time = log_current.properties.content
else:
entry_time = 0
if(mode == 1) and (not ifdowngraded) and\
(entry_time <= dict_log[result_key][0][1][0]):
break
log_child = log_current.children
while(log_child):
if(log_child.name == "log_name"):
entry_name = log_child.content
elif(log_child.name == "log_text"):
entry_text = log_child.content
log_child = log_child.next
result_changelog.append((entry_time, entry_name, entry_text))
if(mode == ifdowngraded):
break
log_current = log_current.next
if(mode == 0):
dict_log[result_key] = []
dict_log[result_key].append([])
dict_log[result_key].append([])
if not ifdowngraded:
dict_log[result_key][0] = (verrel, result_changelog[0])
else:
dict_log[result_key][0] = (verrel, result_changelog)
else:
if not ifdowngraded:
dict_log[result_key][1] = (verrel, result_changelog)
else: #special actions for downgraded packages
new_result = []
time_to_stop = result_changelog[0][0]
tmp_change = dict_log[result_key][0][1] #changelog list
if tmp_change: #changelog is not empty
i = 0
length = len(tmp_change)
while i < length:
if tmp_change[i][0] <= time_to_stop:
i = i + 1
break
new_result.append(tmp_change[i])
i = i + 1
dict_log[result_key][1] = (verrel, new_result)
tag_changelog = tag_changelog.next
doc.freeDoc()
def GenerateLogfileDiff(dict_upd_packages, arg):
"""Changelog difference list.
Generate changelog difference list.
dict_upd_packages[name] = [old_pkg[name],new_pkg[name],ifdowngraded]
or dict_upd_packages[name] = [(verler,(s0,s1,s2)),(verrel,(s0,s1,s2)),ifdowngraded]
"""
ifnotquiet = arg.quiet
temp_old = arg.temp_old
temp_new = arg.temp_new
if ifnotquiet:
print _("Generating changes list.")
dict_logfile_diff = {}
dict_log = {}
for old_dir in temp_old:
ParseLogfile(dict_log, old_dir + changelog_file, dict_upd_packages, 0, arg)
for new_dir in temp_new:
ParseLogfile(dict_log, new_dir + changelog_file, dict_upd_packages, 1, arg)
for name in dict_upd_packages:
if(name in dict_log):
if dict_log[name][1]:
entry = dict_log[name][1][1]
else:
print _("REPODIFF-Warning: Package %s was not described in changelogs.xml") % name
entry = [(0, '', _("REPODIFF-Warning: Changelogs of a package are absent in \"new\" repository."))]
else:
print _("REPODIFF-Warning: Package %s was not described in changelogs.xml") % name
entry = [(0, '', _("REPODIFF-Warning: Changelogs of a package are absent."))]
dict_logfile_diff[name] = entry
return dict_logfile_diff
def ChangelogPrint(changes_list, file_output):
"""Changelog difference.
Output changes in changelog.
changes_list is list with format:
changes_list = [(time,author,text)]
"""
for entry in changes_list:
file_output.write("* " + str(date.fromtimestamp(float(entry[0]))) +\
" " + entry[1] + '\n' + entry[2] + '\n\n')
def PrintLogfileDiff(package_name, dict_logfile_diff, file_output):
"""Changelog difference.
Output changes in changelog.
dict_logfile_diff is dictionary with format:
dict_logfile_diff[name] = [(time,author,text)]
"""
if package_name in dict_logfile_diff:
ChangelogPrint(dict_logfile_diff[package_name], file_output)
else:
file_output.write(_("Package %s has no changelog info\n") % package_name)
def ProcessUpdPackages(dict_upd_packages, dict_logfile_diff, arg):
"""Process updated packages.
ifsizes - is indicator: should we (1) or should we not (0) print
difference in package sizes.
ifnotsimple - is indicator: should we (0) or shoudl we not (1) print
difference in changelogs.
Process updated packages and output everything needed info.
dict_upd_packages[name] = [old_pkg[name],new_pkg[name],ifdowngraded]
or dict_upd_packages[name] = [(verler,(s0,s1,s2)),(verrel,(s0,s1,s2)),ifdowngraded]
"""
ifnotsimple = arg.simple
file_output = arg.output
ifchangelog = arg.changelog
ifsizes = arg.size
file_output.write(_("\n\nUpdated packages:\n\n"))
sorted_list = sorted(dict_upd_packages)
for name in sorted_list:
package = dict_upd_packages[name][1][1][0][0]
if ifnotsimple:
file_output.write(package + '\n' + '-'*len(package) + '\n')
if dict_upd_packages[name][2]:
file_output.write(_(" ***DOWNGRADED***\n"))
if ifchangelog:
PrintLogfileDiff(name, dict_logfile_diff, file_output)
else:
old_package = dict_upd_packages[name][0][1][0][0]
file_output.write(name + ": " + old_package + " -> " + package + '\n')
if(ifsizes):
sizediff = int(dict_upd_packages[name][1][1][0][2]) - \
int(dict_upd_packages[name][0][1][0][2])
file_output.write(_("Size Change: %d bytes\n\n") % sizediff)
def PrintSummary(dict_new_packages, dict_del_packages, dict_upd_packages, file_output):
"""Output summary.
Output summary: numbers of new/removew/updated packages at all.
"""
file_output.write("Summary:\n")
length = len(dict_new_packages)
if length:
file_output.write(_(" Total added packages: ") + str(length) + '\n')
length = len(dict_del_packages)
if length:
file_output.write(_(" Total removed packages: ") + str(length) + '\n')
length = 0
length_d = 0
for packagename in dict_upd_packages:
if dict_upd_packages[packagename][2] == 0:
length = length + 1
else:
length_d = length_d + 1
if length:
file_output.write(_(" Total updated packages: ") + str(length) + '\n')
if length_d:
file_output.write(_(" Total downgraded packages: ") + str(length_d) + '\n')
def HTML_ParsePackage(arg):
"""Parse hdlist.
HTML-specific ParsePackage(). Calls for ParsePackage
"""
ifchangelog = arg.changelog
ifnotquiet = arg.quiet
html_old_dict_list = []
html_new_dict_list = []
for directory in arg.temp_old:
tmp_dict = {}
RenameSynthFile(directory, arg)
UnpackFiles(directory, 0, ifnotquiet)
ParseSynthesis(directory + synthesis_file, tmp_dict, arg)
html_old_dict_list.append(tmp_dict)
for directory in arg.temp_new:
tmp_dict = {}
RenameSynthFile(directory, arg)
UnpackFiles(directory, 0, ifnotquiet)
ParseSynthesis(directory + synthesis_file, tmp_dict, arg)
html_new_dict_list.append(tmp_dict)
return html_old_dict_list, html_new_dict_list
def HTML_UniteOld(list_dict_old):
"""Union of dictionaries.
HTML-specific.
"""
dict_old = list_dict_old[0]
i = 1
while(i < len(list_dict_old)):
for name in list_dict_old[i]:
if name not in dict_old:
dict_old[name] = list_dict_old[i][name]
elif(compare_versions(dict_old[name][0], list_dict_old[i][name][0]) == -1):
dict_old[name] = list_dict_old[i][name]
i = i + 1
return dict_old
def HTML_CreateDicts(dict_old, list_dict_new):
"""Create dictionary of packages.
Dictionary of packages and types of changes.
"""
dict_packages = {}
i = 0
for dict_new in list_dict_new:
(tmp_new, tmp_del, tmp_upd) = CreateDicts(dict_old, dict_new)
for packagename in tmp_new:
if packagename not in dict_packages:
dict_packages[packagename] = []
dict_packages[packagename].append((tmp_new[packagename], i, 1))
for packagename in tmp_del:
if packagename not in dict_packages:
dict_packages[packagename] = []
dict_packages[packagename].append((tmp_del[packagename], i, 2))
for packagename in tmp_upd:
if packagename not in dict_packages:
dict_packages[packagename] = []
if tmp_upd[packagename][2] == 0:
dict_packages[packagename].append((tmp_upd[packagename][1], i, 3))
elif tmp_upd[packagename][2] == 1:
dict_packages[packagename].append((tmp_upd[packagename][1], i, 4))
i = i + 1
return dict_packages
def CssOutput():
"""Output style.
Output contents of style tag or to .css file.
"""
csscontent = '\nbody {\nfont-size: 1em;\nmargin: 1em;\ncolor: black;\nbackground-color: white;\n}\n' +\
'th {\nborder-bottom-style: double;\n}\n' +\
'h1 {\nfont-size: 1.6em;\n}\n' +\
'h2 {\nfont-size: 1.4em;\n}\n' +\
'ul {\nfont-size: 1.2em;\n}\n' +\
'li {\nfont-size: 1em; list-style: disc;\n}\n' +\
'.even {\nbackground-color: #CCCCCC;\n}\n' +\
'.odd {\nbackground-color: #FFFFFF;\n}\n' +\
'.new {\nbackground-color: #C6DEFF;\n}\n' +\
'.removed {\nbackground-color: #FFC3CE;\n}\n' +\
'.updated {\nbackground-color: #CCFFCC;\n}\n' +\
'.downgraded {\nbackground-color: #F4F4AF;\n}\n' +\
'p.bold {\n font-weight: bold\n}\n'
return csscontent
def JavaScriptOutput():
"""Output scripts.
Output javascript to script tag or to .js file.
"""
javacontent = """
var tableBody;
var table2sort;
var imgUp;
var imgDown;
var suffix;
var lastSortCol;
var lastSortOrderAsc;
var index;
var rows;
function TableSorter(table,suf) {
this.table2sort = table;
this.suffix = suf;
this.lastSortCol = -1;
this.lastSortOrderAsc = true;
this.tableBody = this.table2sort.getElementsByTagName("tbody")[0];
this.imgUp = document.createTextNode(String.fromCharCode(0x2193));
this.imgDown = document.createTextNode(String.fromCharCode(0x2191));
}
TableSorter.prototype.sort = function (col, type) {
if (this.lastSortCol != -1) {
sortCell = document.getElementById("sortCell" + this.suffix + this.lastSortCol);
if (sortCell != null) {
if (this.lastSortOrderAsc == true) {
sortCell.removeChild(this.imgUp);
} else {
sortCell.removeChild(this.imgDown);
}
}
sortLink = document.getElementById("sortCellLink" + this.suffix + this.lastSortCol);
if(sortLink != null) {
sortLink.title = "Sort Ascending";
}
}else{
this.rows = this.tableBody.rows;
}
if (this.lastSortCol == col) {
this.lastSortOrderAsc = !this.lastSortOrderAsc;
} else {
this.lastSortCol = col;
this.lastSortOrderAsc = true;
}
var newRows = new Array();
var newRowsCount = 0;
for (i = 1; i < this.rows.length; i ++) {
newRows[newRowsCount++] = this.rows[i];
}
index = this.lastSortCol;
if (type == 'string') {
newRows.sort(sortFunction_string);
}
else {
newRows.sort(sortFunction_attr);
}
if (this.lastSortOrderAsc == false) {
newRows.reverse();
}
var count = 0;
var newclass;
for (i = 0; i < newRows.length; i++) {
if (count++ % 2 == 0){
newclass = "odd";
}else{
newclass = "even";
}
newRows[i].className = newclass;
this.table2sort.tBodies[0].appendChild(newRows[i]);
}
sortCell = document.getElementById("sortCell" + this.suffix + col);
if (sortCell == null) {
} else {
if (this.lastSortOrderAsc == true) {
sortCell.appendChild(this.imgUp);
} else {
sortCell.appendChild(this.imgDown);
}
}
sortLink = document.getElementById("sortCellLink" + this.suffix + col);
if (sortLink == null) {
} else {
if (this.lastSortOrderAsc == true) {
sortLink.title = "Sort Descending";
} else {
sortLink.title = "Sort Ascending";
}
}
}
function getCellContent(elem) {
if (typeof elem == "string") return elem;
if (typeof elem == "undefined") { return elem };
if (elem.innerText) return elem.innerText;
var str = "";
var cs = elem.childNodes;
var l = cs.length;
for (var i = 0; i < l; i++) {
switch (cs[i].nodeType) {
case 1: // 'ELEMENT_NODE'
str += getCellContent(cs[i]);
break;
case 3: // 'TEXT_NODE'
str += cs[i].nodeValue;
break;
}
}
return str;
}
function sortFunction_attr(a, b) {
elem1 = a.cells[index] ;
elem2 = b.cells[index] ;
str1 = elem1.className;
str2 = elem2.className;
sub1 = getCellContent(a.cells[0]).toLowerCase();
sub2 = getCellContent(b.cells[0]).toLowerCase();
if (str1 == str2){
if (sub1 == sub2) return 0;
if (sub1 < sub2) return -1;
return 1;
}
if (str1 < str2) return -1;
return 1;
}
function sortFunction_string(a, b) {
str1 = getCellContent(a.cells[index]).toLowerCase();
str2 = getCellContent(b.cells[index]).toLowerCase();
if (str1 == str2) return 0;
if (str1 < str2) return -1;
return 1;
}
var diffTableSorter = null;
function init_diff(){
if( document.getElementById("table_diff") ) {
diffTableSorter = new TableSorter(document.getElementById("table_diff"), 'diff');
}
}
function sort_diff(col, type) {
if( diffTableSorter != null ) {
diffTableSorter.sort(col, type);
}
}
"""
return javacontent
def HTML_OutputHead(file_output):
"""Output beginning of the document.
Outputs static text.
"""
file_output.write('<!--?xml version="1.0" encoding="UTF-8"?-->\n' +\
'<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">\n' +\
'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">\n' +
'<head>\n' +\
'<title>Differences between Mandriva / Rosa releases</title>\n' +\
'<meta name="keywords" content="Mandriva,Rosa,RPM,changes"/>\n' +\
'<meta name="description" content="List of changes between Mandriva / Rosa releases"/>\n' +\
'<meta http-equiv="content-type" content="text/html; charset=UTF-8"/>\n' +\
'<style type="text/css">' +\
CssOutput() +\
'</style>\n' +\
'<script language="JavaScript" type="text/javascript">' +\
JavaScriptOutput() +\
'</script>\n' +\
'</head>\n' +\
'<body>\n\n')
def GetRepoInfo(dict_packages, packagename, lenold, lennew, list_dict_old, list_dict_new):
"""Generate package-specific information.
Generates class and name to be displayed in the table.
"""
result1 = []
result2 = []
flag = 0
for i in range(lenold):
if packagename in list_dict_old[i]:
result1.append(list_dict_old[i][packagename][0][0] + '-' +\
list_dict_old[i][packagename][0][1])
else:
result1.append("N/A")
result2.append('')
tmplist = dict_packages[packagename]
tmpdict = {}
for (entry, reponum, entry_type) in dict_packages[packagename]:
tmpdict[reponum] = (entry[0][0] + '-' + entry[0][1], entry_type)
for i in range(lennew):
if(i not in tmpdict):
if(packagename not in list_dict_new[i]):
result1.append("N/A")
result2.append("")
else:
result1.append(list_dict_new[i][packagename][0][0] + '-' +\
list_dict_new[i][packagename][0][1])
result2.append("")
else:
(name, entry_type) = tmpdict[i]
if entry_type == 1:
result1.append(name)
result2.append('class = "new"')
elif entry_type == 2:
result1.append("Removed")
result2.append('class = "removed"')
flag = 1
elif entry_type == 3:
result1.append(name)
result2.append('class = "updated"')
elif entry_type == 4:
result1.append(name)
result2.append('class = "downgraded"')
return (result1, result2, flag)
def HTML_OutputBody(dict_packages, list_dict_old, list_dict_new, arg):
"""Output table.
Outputs table in HTML format.
"""
old = arg.old
new = arg.new
file_output = arg.output
file_output.write('<h1>Difference between repositories.</h1>\n' +\
'<p class="bold">The use of color coding in tables:</p>\n' +\
'<table>\n' +\
'<tbody><tr><td class="new">New</td>\n' +\
'<td class="updated">Updated</td></tr>\n' +\
'<tr><td class="downgraded">Downgraded</td>\n' +\
'<td class="removed">Removed</td></tr>\n' +\
'</tbody></table>\n\n')
repo_list = []
all_list = []
all_list.extend(old)
all_list.extend(new)
lenold = len(old)
lennew = len(new)
length = lenold + lennew
reptext = 'repositories' if lenold > 1 else 'repository'
tmp_string = '<h2>Old ' + reptext + ':</h2>\n<ul>\n'
for i in range(lenold):
tmp_string = tmp_string + '<li>Repository ' + str(i) + ' : <a href="' +\
old[i] + '">' + old[i] + '</a></li>\n'
tmp_string = tmp_string + '</ul>\n'
file_output.write(tmp_string)
reptext = 'repositories' if lennew > 1 else 'repository'
tmp_string = '<h2>New ' + reptext + ':</h2>\n<ul>\n'
for k in range(lennew):
i = i + 1
tmp_string = tmp_string + '<li>Repository ' + str(i) + ' : <a href="' +\
new[k] + '">' + new[k] + '</a></li>\n'
tmp_string = tmp_string + '</ul>\n'
file_output.write(tmp_string)
tmp_string = '<h2>Difference between '
i = 0
while(i < length):
if(i < length - 2):
delimeter = " , "
elif(i == length - 2):
delimeter = " and "
else:
delimeter = ''
temp = '<a href="' + all_list[i] + '">' + \
'Repository ' + str(i) + '</a>'
if i < lenold:
repo_list.append('<th>Repository ' + str(i) + '</th>')
else:
ii = i + 1
repo_list.append('<th id="sortCelldiff'+str(ii)+'"><a id="sortCellLinkdiff'+str(ii)+'" title="Sort Ascending" href="javascript:sort_diff('+str(ii)+', \'className\')">Repository '+str(i)+'</a></th>')
tmp_string = tmp_string + temp + delimeter
i = i + 1
tmp_string = tmp_string + ".</h2>\n"
file_output.write(tmp_string)
tmp_string = '<table id="table_diff">\n<tbody>\n<tr><th id="sortCelldiff0"><a id="sortCellLinkdiff0" title="Sort Ascending" href="javascript:sort_diff(0, \'string\')">Package name</a></th>'
for reponame in repo_list:
tmp_string = tmp_string + reponame
tmp_string = tmp_string + '</tr>\n'
strnum = 1
resrange = []
for i in range(lennew):
resrange.append(lenold + i)
sorted_list = sorted(dict_packages, key=str.lower)
for packagename in sorted_list:
if strnum % 2:
strtype = "odd"
else:
strtype = "even"
tmp_string = tmp_string + '<tr class="' + strtype + '">'
tmp_string = tmp_string + '<td>' + packagename + '</td>'
(repo_name, repo_class, flag) = GetRepoInfo(dict_packages, packagename,
lenold, lennew, list_dict_old, list_dict_new)
if flag:
if(repo_name[lenold] == "Removed"):
res = 0
for k in resrange:
if(repo_name[k] != "Removed"):
res = 1
if res:
for k in resrange:
if(repo_name[k] == "Removed"):
repo_name[k] = "N/A"
repo_class[k] = ''
else:
for k in resrange:
if(repo_name[k] == "Removed"):
repo_name[k] = "N/A"
repo_class[k] = ''
for i in range(length):
tmp_string = tmp_string + '<td ' + repo_class[i] + '>' +\
repo_name[i] + '</td>'
tmp_string = tmp_string + '</tr>\n'
strnum = strnum + 1
tmp_string = tmp_string + '</tbody>\n</table>\n'
file_output.write(tmp_string)
def HTML_OutputTail(file_output):
"""Output end of document.
Outputs static text.
"""
file_output.write('''
<script language='JavaScript' type='text/javascript'>
init_diff();
</script>
''');
file_output.write('</body>\n</html>\n')
def HTML_Output(dict_packages, list_dict_old, list_dict_new, arg):
"""Output HTML file.
Generates HTML file.
"""
ifnotquiet = arg.quiet
file_output = arg.output
if ifnotquiet:
print _("Creating HTML file.")
HTML_OutputHead(file_output)
HTML_OutputBody(dict_packages, list_dict_old, list_dict_new, arg)
HTML_OutputTail(file_output)
def main(args):
arg = ParseCommandLine()
arg.temp_dir = tempfile.mkdtemp() + '/'
head_old = arg.temp_dir + old_dir
head_new = arg.temp_dir + new_dir
arg.temp_old = []
arg.temp_new = []
if (arg.output):
tmp_output = arg.output[0]
else:
tmp_output = default_output
arg.output = None;
for i in range(len(arg.old)):
arg.old[i] = CheckArgs(arg.old[i], arg)
arg.temp_old.append(head_old + str(i) + '/')
for i in range(len(arg.new)):
arg.new[i] = CheckArgs(arg.new[i], arg)
arg.temp_new.append(head_new + str(i) + '/')
arg.output = tmp_output
CheckOutput(arg)
CheckParam(arg)
ifsizes = arg.size
ifnotsimple = arg.simple
output_file = arg.output
ifnotquiet = arg.quiet
ifhtml = arg.html
ifchangelog = arg.changelog
GetFiles(arg)
if not ifhtml:
(dict_old, dict_new) = ParsePackage(arg)
(dict_new_packages, dict_del_packages, dict_upd_packages) = CreateDicts(
dict_old, dict_new)
dict_obsoleted = GenerateDictObsoleted(dict_new, ifnotquiet)
if(dict_upd_packages) and (ifnotsimple) and (ifchangelog):
dict_logfile_diff = GenerateLogfileDiff(dict_upd_packages, arg)
if not ifnotsimple or not ifchangelog:
dict_logfile_diff = {}
ProcessNewPackages(dict_new_packages, arg.output)
ProcessDelPackages(dict_del_packages, dict_obsoleted, arg.output)
if dict_upd_packages:
ProcessUpdPackages(dict_upd_packages, dict_logfile_diff, arg)
PrintSummary(dict_new_packages, dict_del_packages, dict_upd_packages, arg.output)
else:
(list_dict_old, list_dict_new) = HTML_ParsePackage(arg)
dict_old = HTML_UniteOld(list_dict_old)
dict_packages = HTML_CreateDicts(dict_old, list_dict_new)
HTML_Output(dict_packages, list_dict_old, list_dict_new, arg)
exit_proc(arg)
if __name__ == "__main__":
main(sys.argv)