Merge pull request #34 from FrostyX/upload-cgi-tests

Upload cgi tests
This commit is contained in:
Michal Novotný 2020-05-11 19:58:12 +02:00 committed by GitHub
commit 968cd7b1f3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 324 additions and 0 deletions

View file

@ -93,6 +93,16 @@ To interact with DistGit server, you can use use [rpkg](https://pagure.io/rpkg-u
You can see example of deployement in [Fedora Infastructure](https://infrastructure.fedoraproject.org/cgit/ansible.git/tree/roles/distgit) and in [Copr](https://infrastructure.fedoraproject.org/cgit/ansible.git/tree/roles/copr/dist_git).
Developer Guide
---------------
#### Tests
```
nosetests -vs .
```
LICENSE
-------

View file

@ -34,10 +34,17 @@ Requires: python-requests
Requires: python-configparser
Requires: python-grokmirror
Requires: fedmsg
BuildRequires: python-nose
BuildRequires: python-nose-parameterized
BuildRequires: python-requests
BuildRequires: python-configparser
%else
Requires: python3-requests
Recommends: python3-grokmirror
Requires: python3-fedmsg
BuildRequires: python3-nose
BuildRequires: python3-parameterized
BuildRequires: python3-requests
%endif
%description
@ -95,6 +102,10 @@ getent group packager > /dev/null || \
exit 0
%check
nosetests .
%install
# ------------------------------------------------------------------------------
# /usr/share/ ........... scripts

0
scripts/httpd/upload.cgi Normal file → Executable file
View file

303
tests/test_upload_script.py Normal file
View file

@ -0,0 +1,303 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import errno
import os
try:
import unittest2 as unittest
except ImportError:
import unittest
import tempfile
import shutil
import re
import subprocess
import requests
import time
import random
from configparser import ConfigParser
PY2 = sys.version_info.major == 2
if PY2:
NOSE_PARAMETERIZED_NO_WARN=1
from nose_parameterized import parameterized
else:
from parameterized import parameterized
# Path to the actual CGI script that should be tested
CGI_SCRIPT = os.path.join(os.path.dirname(__file__), '../scripts/httpd/upload.cgi')
# A snippet for creating the server in a temporary location. We need to write a
# separate script as it needs to run with working directory set to the
# temporary directory.
SERVER = """#!/usr/bin/{python}
from {http_package} import HTTPServer
from {cgi_package} import CGIHTTPRequestHandler
s = HTTPServer(('%s', %s), CGIHTTPRequestHandler)
s.handle_request()
""".format(python="python2" if PY2 else "python3",
http_package="BaseHTTPServer" if PY2 else "http.server",
cgi_package="CGIHTTPServer" if PY2 else "http.server")
# MD5 hash of "hello.txt" and "new.txt" strings used in a few tests
HASH = '2e54144ba487ae25d03a3caba233da71'
NEW_HASH = 'fce67ea4590d3b789fff55a37271f29f'
SHA512 = 'acec329f80cc50edbab0dfbc2283d427ac673f84e6d8b949101791867b9b7771a53d2ffb1f8386189227beed4395b9a78171a1349700e2885c70ae14358d72ff' # noqa
NEW_SHA512 = 'd3d67bc3e3848925892de9b132c9ff4054a05c9dbc7b4366d16b5c6b87c898df60da162e9ec415d4dc16470128ef52c11c44fc06da2841543ddeb351b10e9fb2' # noqa
# The first value is what will be sent in the request, the second is the full
# namespaced module name.
EXISTING_MODULES = [
("pkg", "rpms/pkg"),
("rpms/pkg", "rpms/pkg"),
("apbs/pkg", "apbs/pkg"),
]
NON_EXISTING_MODULES = [
("bad", "rpms/bad"),
("rpms/bad", "rpms/bad"),
("apbs/bad", "apbs/bad"),
]
OLD_FILE_MODULES = [
('old', 'rpms/old'),
('rpms/old', 'rpms/old'),
('apbs/old', 'apbs/old'),
]
GIT_DIR = 'srv/git'
CACHE_DIR = 'srv/cache/lookaside'
class UploadTest(unittest.TestCase):
def setUp(self):
self.hostname = 'localhost'
self.port = random.randrange(59898, 65534)
# Create temporary filesystem tree
self.topdir = tempfile.mkdtemp()
os.chmod(self.topdir, 0o0777)
# Copy cgi script and tweak it with new path
cgi = os.path.join(self.topdir, 'cgi-bin', 'upload.cgi')
os.mkdir(os.path.join(self.topdir, 'cgi-bin'))
_copy_tweak(CGI_SCRIPT, cgi, self.topdir)
shutil.copystat(CGI_SCRIPT, cgi)
# Generate temporary distgit config for this test run
self.config = _dump_config_file(self.topdir)
self._run_server()
# Create a package with a single source file in each namespace
for _, module in EXISTING_MODULES:
self.setup_module(module)
self.touch('%s/%s/hello.txt/md5/%s/hello.txt' % (CACHE_DIR, module, HASH))
self.touch('%s/%s/hello.txt/sha512/%s/hello.txt' % (CACHE_DIR, module, SHA512))
# These are modules with sources in old MD5 path only.
for _, module in OLD_FILE_MODULES:
self.setup_module(module)
self.touch('%s/%s/hello.txt/%s/hello.txt' % (CACHE_DIR, module, HASH))
def tearDown(self):
shutil.rmtree(self.topdir)
if not self.server.poll():
# The server did not exit yet, so let's kill it.
try:
self.server.terminate()
except OSError as exc:
# It's possible for the server to exit before we try to kill
# it. In that case we get ESRCH (No such process). This is
# fine. All other exceptions should still be reported and fail
# the tests.
if exc.errno != errno.ESRCH:
raise
self.server.wait()
def _log_output(self):
self.output.seek(0)
print(self.output.read())
def _run_server(self):
"""Start a server in a temporary directory, and capture its output."""
script = os.path.join(self.topdir, 'server.py')
with open(script, 'w') as f:
f.write(SERVER % (self.hostname, self.port))
os.chmod(script, 0o0755)
self.output = tempfile.TemporaryFile()
self.server = subprocess.Popen(script, cwd=self.topdir,
stdout=self.output,
stderr=subprocess.STDOUT,
env={'SCRIPT_FILENAME': 'foo',
'DISTGIT_CONFIG': self.config,
'REQUEST_URI': '/repo/foo/bar/upload.cgi'})
time.sleep(0.1) # Wait for server to be up.
self.url = 'http://%s:%s/cgi-bin/upload.cgi' % (self.hostname, self.port)
def upload(self, name, hash, hashtype='md5', filename=None, filepath=None, mtime=None):
"""Send a request to the CGI script. Exactly one of filename and
filepath has to be provided.
:param name: name of the module
:param hash: hash of the file
:param filename: name of a file to check
:param filepath: path to a file to upload
"""
args = {
'name': name,
'%ssum' % hashtype: hash,
}
if filename:
args['filename'] = filename
if mtime:
args["mtime"] = mtime
files = None
if filepath:
files = {'file': open(filepath, 'rb')}
response = requests.post(self.url, data=args, files=files)
self._log_output()
self.assertEqual(response.status_code, 200)
return response.text
def touch(self, filename, contents=None):
"""Create a file in a given location and return its path."""
contents = contents or filename
path = os.path.join(self.topdir, filename)
try:
os.makedirs(os.path.dirname(path))
except OSError:
pass
print('Creating %s' % path)
with open(path, 'w') as f:
f.write(contents)
return path
def setup_module(self, name):
for path in [GIT_DIR + '/%s.git', CACHE_DIR + '/%s']:
self.touch(os.path.join(self.topdir, path % name, '.keep'))
def assertFileExists(self, module_name, filename, hash, mtime=None):
path = os.path.join(self.topdir, CACHE_DIR, module_name, filename, hash, filename)
self.assertTrue(os.path.exists(path), '%s should exist' % path)
if mtime:
self.assertEqual(os.stat(path).st_mtime, mtime)
@parameterized.expand(EXISTING_MODULES + OLD_FILE_MODULES)
def test_check_existing_file(self, module, ns_module):
resp = self.upload(module, hash=HASH, filename='hello.txt')
self.assertEqual(resp, 'Available\n')
@parameterized.expand(EXISTING_MODULES)
def test_check_existing_file_with_bad_hash(self, module, ns_module):
resp = self.upload(module, hash='abc', filename='hello.txt')
self.assertEqual(resp, 'Missing\n')
@parameterized.expand(EXISTING_MODULES)
def test_check_missing_file(self, module, ns_module):
resp = self.upload(module, hash='abc', filename='foo.txt')
self.assertEqual(resp, 'Missing\n')
@parameterized.expand(EXISTING_MODULES)
def test_upload_file(self, module, ns_module):
test_file = self.touch('new.txt')
resp = self.upload(module, hash=NEW_HASH, filepath=test_file)
self.assertEqual(resp, 'File new.txt size 7 MD5 %s stored OK\n' % NEW_HASH)
self.assertFileExists(ns_module, 'new.txt', NEW_HASH)
self.assertFileExists(ns_module, 'new.txt', 'md5/' + NEW_HASH)
@parameterized.expand(EXISTING_MODULES)
def test_upload_file_bad_checksum(self, module, ns_module):
test_file = self.touch('hello.txt')
resp = self.upload(module, hash='ABC', filepath=test_file)
self.assertEqual(resp, 'MD5 check failed. Received %s instead of ABC.\n' % HASH)
@parameterized.expand(NON_EXISTING_MODULES)
def test_upload_to_non_existing_module(self, module, ns_module):
test_file = self.touch('hello.txt')
resp = self.upload(module, hash=HASH, filepath=test_file)
self.assertEqual(resp, 'Module "%s" does not exist!\n' % ns_module)
@parameterized.expand(EXISTING_MODULES)
def test_rejects_unknown_hash(self, module, ns_module):
test_file = self.touch('hello.txt')
resp = self.upload(module, hash='deadbeef', hashtype='crc32', filepath=test_file)
self.assertEqual(resp, "Required checksum is not present\n")
@parameterized.expand(EXISTING_MODULES)
def test_accepts_sha_512_hash(self, module, ns_module):
test_file = self.touch('new.txt')
resp = self.upload(module, hash=NEW_SHA512, hashtype='sha512', filepath=test_file)
self.assertEqual(resp, 'File new.txt size 7 SHA512 %s stored OK\n' % NEW_SHA512)
self.assertFileExists(ns_module, 'new.txt', 'sha512/' + NEW_SHA512)
@parameterized.expand(EXISTING_MODULES)
def test_bad_sha512_hash(self, module, ns_module):
test_file = self.touch('hello.txt')
resp = self.upload(module, hash='ABC', hashtype='sha512', filepath=test_file)
self.assertEqual(resp, 'SHA512 check failed. Received %s instead of ABC.\n' % SHA512)
@parameterized.expand(EXISTING_MODULES)
def test_check_existing_sha512_correct(self, module, ns_module):
resp = self.upload(module, hash=SHA512, hashtype='sha512', filename='hello.txt')
self.assertEqual(resp, 'Available\n')
@parameterized.expand(EXISTING_MODULES)
def test_check_existing_sha512_mismatch(self, module, ns_module):
resp = self.upload(module, hash='abc', hashtype='sha512', filename='hello.txt')
self.assertEqual(resp, 'Missing\n')
@parameterized.expand(EXISTING_MODULES)
def test_upload_mtime(self, module, ns_module):
test_file = self.touch('new.txt')
resp = self.upload(module, hash=NEW_HASH, filepath=test_file, mtime="1234")
self.assertFileExists(ns_module, 'new.txt', NEW_HASH, mtime=1234)
@parameterized.expand(EXISTING_MODULES)
def test_upload_invalid_mtime(self, module, ns_module):
test_file = self.touch('new.txt')
resp = self.upload(module, hash=NEW_HASH, filepath=test_file, mtime="abc")
self.assertEqual(resp, 'Invalid value sent for mtime "abc". Aborting.\n')
def _copy_tweak(source_file, dest_file, topdir):
"""Copy the script from source_file to dest_file, and tweak constants to
point to topdir.
"""
regex = re.compile(r'''^(GITREPO|CACHE_DIR)\s*=\s*['"]([^'"]+)['"]$''')
with open(source_file) as source:
with open(dest_file, 'w') as dest:
for line in source:
if PY2 and line == "#!/usr/bin/python3\n":
line = line.replace("python3", "python2")
m = regex.match(line)
if m:
line = "%s = '%s%s'\n" % (m.group(1), topdir, m.group(2))
dest.write(line)
def _dump_config_file(topdir):
config = ConfigParser()
config["dist-git"] = {
"git_author_name": "Fedora Release Engineering",
"git_author_email": "rel-eng@lists.fedoraproject.org",
"cache_dir": CACHE_DIR,
"lookaside_dir": CACHE_DIR,
"gitroot_dir": GIT_DIR,
"gitolite": True,
"grok": True,
"default_namespace": "rpms",
}
config["upload"] = {
"fedmsgs": False,
"old_paths": True,
"nomd5": False,
"disable_group_check": True,
}
tmp = os.path.join(topdir, "dist-git-test.conf")
with open(tmp, "w") as configfile:
config.write(configfile)
return tmp