Merge pull request #67 from rohanpm/mem-usage-stream

Further reduce memory usage on large yum repos [RHELDST-20453]
This commit is contained in:
Rohan McGovern 2023-09-22 07:26:59 +10:00 committed by GitHub
commit ad1fee0f41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 110 additions and 43 deletions

View file

@ -24,6 +24,10 @@ information about the usage of `repo-autoindex`.
## Changelog
### v1.2.0 - 2023-09-22
- Support streamed fetching to reduce memory usage when fetching large files.
### v1.1.2 - 2023-09-18
- Add `py.typed` to make package PEP 561 compliant / enable downstream type-checking.

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "repo-autoindex"
version = "1.1.2"
version = "1.2.0"
description = "Generic static HTML indexes of various repository types"
authors = ["Rohan McGovern <rmcgover@redhat.com>"]
license = "GPL-3.0-or-later"

View file

@ -1,11 +1,14 @@
import gzip
import logging
from collections.abc import AsyncGenerator
from typing import Optional, Type
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Optional, Type, BinaryIO
import tempfile
import io
import aiohttp
from .base import Fetcher, GeneratedIndex, Repo, ContentError, FetcherError
from .base import Fetcher, IOFetcher, GeneratedIndex, Repo, ContentError, FetcherError
from .yum import YumRepo
from .pulp import PulpFileRepo
from .kickstart import KickstartRepo
@ -15,7 +18,9 @@ REPO_TYPES: list[Type[Repo]] = [KickstartRepo, YumRepo, PulpFileRepo]
def http_fetcher(session: aiohttp.ClientSession) -> Fetcher:
async def get_content_with_session(url: str) -> Optional[str]:
async def get_content_with_session(
url: str,
) -> Optional[BinaryIO]:
LOG.info("Fetching: %s", url)
async with session.get(url) as resp:
if resp.status == 404:
@ -26,26 +31,39 @@ def http_fetcher(session: aiohttp.ClientSession) -> Fetcher:
# Any other error status is fatal
resp.raise_for_status()
out: BinaryIO = tempfile.NamedTemporaryFile(prefix="repo-autoindex") # type: ignore
async for chunk in resp.content:
out.write(chunk)
out.flush()
out.seek(0)
# Deal with the non-ideal content negotiation
# for certain storage backends.
if url.endswith(".gz") and resp.content_type in (
"application/x-gzip",
"application/octet-stream",
):
compressed = await resp.content.read()
uncomp = gzip.decompress(compressed)
return uncomp.decode("utf-8")
out = gzip.GzipFile(fileobj=out) # type: ignore
return await resp.text()
return out
return get_content_with_session
def with_error_handling(fetcher: Fetcher) -> Fetcher:
# wraps a fetcher such that any raised exceptions are wrapped into FetcherError
async def new_fetcher(url: str) -> Optional[str]:
def wrapped_fetcher(fetcher: Fetcher) -> IOFetcher:
# wraps a fetcher as passed in by the caller into an internal
# fetcher enforcing certain behaviors:
#
# - wraps all exceptions in FetcherError
#
# - adapts 'str' outputs into io streams
#
async def new_fetcher(url: str) -> Optional[BinaryIO]:
try:
return await fetcher(url)
out = await fetcher(url)
if isinstance(out, str):
out = io.BytesIO(out.encode())
return out
except Exception as exc:
raise FetcherError from exc
@ -79,9 +97,15 @@ async def autoindex(
- if the fetcher can determine, without error, that the requested content does not
exist: it must return ``None``.
- if the fetcher can retrieve the requested content, it must return the entire
content at the given URL as a ``str``. This implies that, for example,
decompressing a compressed file is the responsibility of the fetcher.
- if the fetcher can retrieve the requested content, it must return the
content at the given URL as a file-like object.
Returning a ``str`` is also possible, but not recommended since it
requires loading an entire file into memory at once, and some
repositories contain very large files.
Note that decompressing compressed files (such as bzipped XML in
yum repositories) is the responsibility of the fetcher.
- if the fetcher encounters an exception, it may allow the exception to
propagate.
@ -124,7 +148,7 @@ async def autoindex(
while url.endswith("/"):
url = url[:-1]
fetcher = with_error_handling(fetcher)
fetcher = wrapped_fetcher(fetcher)
try:
for repo_type in REPO_TYPES:

View file

@ -1,11 +1,14 @@
from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Awaitable, Callable
from dataclasses import dataclass
from typing import Optional, Type, TypeVar
from typing import Optional, Type, TypeVar, BinaryIO, Union
T = TypeVar("T")
Fetcher = Callable[[str], Awaitable[Optional[str]]]
Fetcher = Callable[[str], Awaitable[Optional[Union[str, BinaryIO]]]]
# Like public Fetcher type above but does not allow 'str' outputs.
IOFetcher = Callable[[str], Awaitable[Optional[BinaryIO]]]
ICON_FOLDER = "📂"
@ -59,7 +62,7 @@ class Repo(ABC):
self,
base_url: str,
entry_point_content: str,
fetcher: Fetcher,
fetcher: IOFetcher,
):
self.base_url = base_url
self.entry_point_content = entry_point_content
@ -73,7 +76,7 @@ class Repo(ABC):
@classmethod
@abstractmethod
async def probe(cls: Type[T], fetcher: Fetcher, url: str) -> Optional[T]:
async def probe(cls: Type[T], fetcher: IOFetcher, url: str) -> Optional[T]:
"""Determine if a specified URL seems to point at a repository of this type.
If so, returns an initialized Repo of a concrete subtype. If not, returns None.

View file

@ -5,7 +5,7 @@ import configparser
import json
import os
from .base import Repo, GeneratedIndex, Fetcher, IndexEntry, ICON_OPTICAL, ICON_QCOW
from .base import GeneratedIndex, IOFetcher, IndexEntry, ICON_OPTICAL
from .template import TemplateContext
from .tree import treeify
from .yum import YumRepo
@ -20,7 +20,7 @@ class KickstartRepo(YumRepo):
repomd_xml: str,
extra_files: str,
treeinfo: str,
fetcher: Fetcher,
fetcher: IOFetcher,
):
super().__init__(base_url, repomd_xml, fetcher)
self.base_url = base_url
@ -211,12 +211,12 @@ class KickstartRepo(YumRepo):
@classmethod
async def probe(
cls: Type["KickstartRepo"], fetcher: Fetcher, url: str
cls: Type["KickstartRepo"], fetcher: IOFetcher, url: str
) -> Optional["KickstartRepo"]:
treeinfo_url = f"{url}/treeinfo"
treeinfo_content = await fetcher(treeinfo_url)
extra_files_url = f"{url}/extra_files.json"
extra_files_content = await fetcher(extra_files_url) or ""
extra_files_content = await fetcher(extra_files_url)
repomd_xml_url = f"{url}/repodata/repomd.xml"
repomd_xml = await fetcher(repomd_xml_url)
@ -232,4 +232,10 @@ class KickstartRepo(YumRepo):
if treeinfo_content is None or repomd_xml is None:
return None
return cls(url, repomd_xml, extra_files_content, treeinfo_content, fetcher)
return cls(
url,
repomd_xml.read().decode(),
extra_files_content.read().decode() if extra_files_content else "",
treeinfo_content.read().decode(),
fetcher,
)

View file

@ -2,7 +2,14 @@ from typing import Optional, Type
from collections.abc import AsyncGenerator
import logging
from .base import Repo, GeneratedIndex, Fetcher, IndexEntry, ICON_OPTICAL, ICON_QCOW
from .base import (
IOFetcher,
Repo,
GeneratedIndex,
IndexEntry,
ICON_OPTICAL,
ICON_QCOW,
)
from .template import TemplateContext
from .tree import treeify
@ -53,7 +60,7 @@ class PulpFileRepo(Repo):
@classmethod
async def probe(
cls: Type["PulpFileRepo"], fetcher: Fetcher, url: str
cls: Type["PulpFileRepo"], fetcher: IOFetcher, url: str
) -> Optional["PulpFileRepo"]:
manifest_url = f"{url}/PULP_MANIFEST"
manifest_content = await fetcher(manifest_url)
@ -61,4 +68,4 @@ class PulpFileRepo(Repo):
if manifest_content is None:
return None
return cls(url, manifest_content, fetcher)
return cls(url, manifest_content.read().decode(), fetcher)

View file

@ -3,14 +3,21 @@ import logging
import os
from collections.abc import AsyncGenerator, Generator, Iterable, Mapping
from dataclasses import dataclass
from typing import Optional, Type, Any, TypeVar, NoReturn, overload
from typing import BinaryIO, Optional, Type, Any
from xml.dom.minidom import Element
from xml.dom.pulldom import END_ELEMENT, START_ELEMENT
from xml.sax.handler import ContentHandler
from defusedxml import pulldom, sax # type: ignore
from .base import ICON_PACKAGE, Fetcher, GeneratedIndex, IndexEntry, Repo, ContentError
from .base import (
ICON_PACKAGE,
GeneratedIndex,
IOFetcher,
IndexEntry,
Repo,
ContentError,
)
from .template import TemplateContext
from .tree import treeify
@ -85,12 +92,12 @@ class PackagesParser(ContentHandler):
self.current_package: Optional[Package] = None
self.packages: list[Package] = []
def parse(self, xmlstr: str) -> Iterable[Package]:
def parse(self, xml: BinaryIO) -> Iterable[Package]:
self.packages = []
# Parse the XML document; this will invoke our start/end element handlers
# which in turn will populate self.packages
sax.parseString(xmlstr.encode("utf-8"), self)
sax.parse(xml, self)
return self.packages
@ -190,7 +197,6 @@ class YumRepo(Repo):
return out
async def _package_entries(self) -> list[IndexEntry]:
primary_nodes = list(
pulldom_elements(
self.entry_point_content,
@ -215,8 +221,7 @@ class YumRepo(Repo):
key=lambda e: e.text,
)
def __packages_from_primary(self, primary_xml: str) -> Iterable[Package]:
LOG.debug("primary xml: %s", primary_xml)
def __packages_from_primary(self, primary_xml: BinaryIO) -> Iterable[Package]:
return PackagesParser().parse(primary_xml)
def __render_entries(
@ -237,7 +242,7 @@ class YumRepo(Repo):
@classmethod
async def probe(
cls: Type["YumRepo"],
fetcher: Fetcher,
fetcher: IOFetcher,
url: str,
) -> Optional["YumRepo"]:
repomd_xml_url = f"{url}/repodata/repomd.xml"
@ -248,4 +253,4 @@ class YumRepo(Repo):
return None
# it is a yum repo
return cls(url, repomd_xml, fetcher)
return cls(url, repomd_xml.read().decode(), fetcher)

View file

@ -7,6 +7,17 @@ from repo_autoindex._impl.api import http_fetcher
class FakeReader:
def __init__(self, body: bytes):
self.body = body
self.iterating = False
def __aiter__(self):
self.iterating = True
return self
async def __anext__(self):
if not self.iterating:
raise StopAsyncIteration
self.iterating = False
return self.body
async def read(self):
return self.body
@ -53,4 +64,4 @@ async def test_http_fetcher_decompresses(content_type: str):
fetcher = http_fetcher(session)
response = await fetcher("/some/path.gz")
assert response == text
assert response.read().decode() == text

View file

@ -1,4 +1,5 @@
from typing import Optional
import io
from typing import BinaryIO, Optional
import textwrap
from repo_autoindex import autoindex
@ -445,7 +446,7 @@ repository = .
type = variant
uid = BaseOS"""
TREEINFO_APPSTREAM="""[general]
TREEINFO_APPSTREAM = """[general]
; WARNING.0 = This section provides compatibility with pre-productmd treeinfos.
; WARNING.1 = Read productmd documentation for details about new format.
arch = x86_64
@ -531,8 +532,14 @@ class StaticFetcher:
def __init__(self):
self.content: dict[str, str] = {}
async def __call__(self, url: str) -> Optional[str]:
return self.content.get(url)
async def __call__(self, url: str) -> Optional[BinaryIO]:
out = self.content.get(url)
if out is not None:
# Since fetchers are allowed to return either str or an io stream,
# this test wraps the canned strings into a stream (while some other
# tests do not) to ensure both cases are covered.
out = io.BytesIO(out.encode())
return out
async def test_typical_index():