mirror of
https://github.com/rpm-software-management/tito.git
synced 2025-02-23 20:22:46 +00:00
Merge pull request #345 from xsuchy/tarutf8
Partial revert "Fix #335. Handle source tarballs with UTF8 characters…
This commit is contained in:
commit
fd6945d84b
4 changed files with 0 additions and 232 deletions
Binary file not shown.
Binary file not shown.
|
@ -1,76 +0,0 @@
|
|||
|
||||
import unittest
|
||||
from tito.buildparser import BuildTargetParser
|
||||
from tito.compat import * # NOQA
|
||||
from tito.exception import TitoException
|
||||
|
||||
|
||||
class BuildTargetParserTests(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
unittest.TestCase.setUp(self)
|
||||
self.valid_branches = ["branch1", "branch2"]
|
||||
|
||||
self.release_target = "project-x.y.z"
|
||||
self.releasers_config = RawConfigParser()
|
||||
self.releasers_config.add_section(self.release_target)
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
"branch1:project-x.y.z-candidate")
|
||||
|
||||
def test_parser_gets_correct_targets(self):
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
release_targets = parser.get_build_targets()
|
||||
self.assertTrue("branch1" in release_targets)
|
||||
self.assertEqual("project-x.y.z-candidate", release_targets["branch1"])
|
||||
self.assertFalse("branch2" in release_targets)
|
||||
|
||||
def test_invalid_branch_raises_exception(self):
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
"invalid-branch:project-x.y.z-candidate")
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
self.assertRaises(TitoException, parser.get_build_targets)
|
||||
|
||||
def test_missing_semicolon_raises_exception(self):
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
"invalid-branchproject-x.y.z-candidate")
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
self.assertRaises(TitoException, parser.get_build_targets)
|
||||
|
||||
def test_empty_branch_raises_exception(self):
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
":project-x.y.z-candidate")
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
self.assertRaises(TitoException, parser.get_build_targets)
|
||||
|
||||
def test_empty_target_raises_exception(self):
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
"branch1:")
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
self.assertRaises(TitoException, parser.get_build_targets)
|
||||
|
||||
def test_multiple_spaces_ok(self):
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
" branch1:project-x.y.z-candidate ")
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
release_targets = parser.get_build_targets()
|
||||
self.assertEqual(1, len(release_targets))
|
||||
self.assertTrue("branch1" in release_targets)
|
||||
self.assertEqual("project-x.y.z-candidate", release_targets["branch1"])
|
||||
|
||||
def test_multiple_branches_supported(self):
|
||||
self.releasers_config.set(self.release_target, "build_targets",
|
||||
"branch1:project-x.y.z-candidate branch2:second-target")
|
||||
parser = BuildTargetParser(self.releasers_config, self.release_target,
|
||||
self.valid_branches)
|
||||
release_targets = parser.get_build_targets()
|
||||
self.assertEquals(2, len(release_targets))
|
||||
self.assertTrue("branch1" in release_targets)
|
||||
self.assertEqual("project-x.y.z-candidate", release_targets["branch1"])
|
||||
self.assertTrue("branch2" in release_targets)
|
||||
self.assertEqual("second-target", release_targets['branch2'])
|
|
@ -1,156 +0,0 @@
|
|||
# coding=utf-8
|
||||
import hashlib
|
||||
import os
|
||||
import tarfile
|
||||
import unittest
|
||||
import io
|
||||
|
||||
from tito.compat import StringIO, ensure_binary
|
||||
from tito.tar import TarFixer
|
||||
from mock import Mock
|
||||
|
||||
EXPECTED_TIMESTAMP = 1429725106
|
||||
EXPECTED_REF = "3518d720bff20db887b7a5e5dddd411d14dca1f9"
|
||||
|
||||
|
||||
class TarTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.out = io.BytesIO()
|
||||
self.tarfixer = TarFixer(None, self.out, EXPECTED_TIMESTAMP, EXPECTED_REF)
|
||||
self.utf8_containing_file = os.path.join(os.path.dirname(__file__), 'resources', 'les_misérables.tar')
|
||||
self.utf8_file = os.path.join(os.path.dirname(__file__), 'resources', 'archivé.tar')
|
||||
self.test_file = os.path.join(os.path.dirname(__file__), 'resources', 'archive.tar')
|
||||
self.reference_file = os.path.join(os.path.dirname(__file__), 'resources', 'archive-fixed.tar')
|
||||
self.reference_hash = self.hash_file(self.reference_file)
|
||||
|
||||
def tearDown(self):
|
||||
self.out = None
|
||||
|
||||
def hash_file(self, filename):
|
||||
file_bytes = open(filename, 'rb').read()
|
||||
return self.hash_buffer(file_bytes)
|
||||
|
||||
def hash_buffer(self, buf):
|
||||
hasher = hashlib.sha256()
|
||||
hasher.update(buf)
|
||||
return hasher.hexdigest()
|
||||
|
||||
def _irregular_reader(self, items):
|
||||
def item_read(read_length):
|
||||
try:
|
||||
item = items.pop(0)
|
||||
except IndexError:
|
||||
# If no more items, the buffer is empty and would return empty string
|
||||
return ''
|
||||
|
||||
return item.read(read_length)
|
||||
|
||||
mock_fh = Mock()
|
||||
mock_fh.read = Mock()
|
||||
mock_fh.read.side_effect = item_read
|
||||
|
||||
return mock_fh
|
||||
|
||||
def test_full_read(self):
|
||||
items = [StringIO("1" * 5), StringIO("1" * 2), StringIO("1" * 6)]
|
||||
self.tarfixer.fh = self._irregular_reader(items)
|
||||
self.assertEqual("1" * 10, self.tarfixer.full_read(10))
|
||||
|
||||
def test_full_read_buffer_underflow(self):
|
||||
input = StringIO("1" * 9)
|
||||
self.tarfixer.fh = input
|
||||
self.assertRaises(IOError, self.tarfixer.full_read, 10)
|
||||
|
||||
def test_full_read_eventual_buffer_underflow(self):
|
||||
items = [StringIO("1" * 5), StringIO("1" * 2), StringIO("1" * 2)]
|
||||
self.tarfixer.fh = self._irregular_reader(items)
|
||||
self.assertRaises(IOError, self.tarfixer.full_read, 10)
|
||||
|
||||
def test_fix(self):
|
||||
self.fh = open(self.test_file, 'rb')
|
||||
self.tarfixer.fh = self.fh
|
||||
self.tarfixer.fix()
|
||||
self.assertEqual(self.reference_hash, self.hash_buffer(self.out.getvalue()))
|
||||
|
||||
def test_fix_fails_unless_file_in_binary_mode(self):
|
||||
self.fh = open(self.test_file, 'r')
|
||||
self.tarfixer.fh = self.fh
|
||||
self.assertRaises(IOError, self.tarfixer.fix)
|
||||
|
||||
def test_padded_size_length_small(self):
|
||||
length = 10
|
||||
block_size = 512
|
||||
self.assertEqual(512, self.tarfixer.padded_size(length, block_size))
|
||||
|
||||
def test_padded_size_length_spot_on(self):
|
||||
length = 512
|
||||
block_size = 512
|
||||
self.assertEqual(512, self.tarfixer.padded_size(length, block_size))
|
||||
|
||||
def test_padded_size_length_over(self):
|
||||
length = 513
|
||||
block_size = 512
|
||||
self.assertEqual(1024, self.tarfixer.padded_size(length, block_size))
|
||||
|
||||
def test_padded_size_length_long(self):
|
||||
length = 82607
|
||||
block_size = 512
|
||||
self.assertEqual(82944, self.tarfixer.padded_size(length, block_size))
|
||||
|
||||
def test_create_extended_header(self):
|
||||
self.tarfixer.create_extended_header()
|
||||
header = self.out.getvalue()
|
||||
self.assertEqual(512, len(header))
|
||||
self.assertEqual(ensure_binary("52 comment=%s\n" % EXPECTED_REF), header[:52])
|
||||
self.assertEqual(ensure_binary("\x00" * (512 - 53)), header[53:])
|
||||
|
||||
def test_calculate_checksum(self):
|
||||
fields = {
|
||||
'a': '\x01',
|
||||
'b': '\x02',
|
||||
'c': '\x03',
|
||||
'd': '\x04',
|
||||
}
|
||||
self.tarfixer.struct_members = list(fields.keys()) + ['checksum']
|
||||
result = self.tarfixer.calculate_checksum(fields)
|
||||
expected_result = 10 + ord(" ") * 8
|
||||
self.assertEqual("%07o\x00" % expected_result, result)
|
||||
|
||||
def test_encode_header(self):
|
||||
mode = 123
|
||||
chunk = {
|
||||
'mode': mode,
|
||||
'name': 'hello',
|
||||
}
|
||||
result = self.tarfixer.encode_header(chunk, ['mode', 'name'])
|
||||
expected_result = ["%07o\x00" % mode, "hello"]
|
||||
expected_result = list(map(lambda x: ensure_binary(x), expected_result))
|
||||
self.assertEqual(expected_result, result)
|
||||
|
||||
def test_utf8_file(self):
|
||||
# The goal of this test is to *not* throw a UnicodeDecodeError
|
||||
self.fh = open(self.utf8_file, 'rb')
|
||||
self.tarfixer.fh = self.fh
|
||||
self.tarfixer.fix()
|
||||
|
||||
self.assertEqual(self.reference_hash, self.hash_buffer(self.out.getvalue()))
|
||||
|
||||
# rewind the buffer
|
||||
self.out.seek(0)
|
||||
try:
|
||||
tarball = tarfile.open(fileobj=self.out, mode="r")
|
||||
except tarfile.TarError:
|
||||
self.fail("Unable to open generated tarball")
|
||||
|
||||
def test_utf8_containing_file(self):
|
||||
# # The goal of this test is to *not* blow up due to a corrupted tarball
|
||||
self.fh = open(self.utf8_containing_file, 'rb')
|
||||
self.tarfixer.fh = self.fh
|
||||
self.tarfixer.fix()
|
||||
|
||||
# rewind the buffer
|
||||
self.out.seek(0)
|
||||
try:
|
||||
tarball = tarfile.open(fileobj=self.out, mode="r")
|
||||
except tarfile.TarError as e:
|
||||
self.fail("Unable to open generated tarball: %s" % e)
|
Loading…
Add table
Reference in a new issue