From eac74ec1e464305ba99539e886277acbf8182e07 Mon Sep 17 00:00:00 2001 From: Rohan McGovern Date: Thu, 21 Sep 2023 10:24:13 +1000 Subject: [PATCH] Further reduce memory usage on large yum repos [RHELDST-20453] The Fetcher type was designed to return a 'str'. That wasn't a good idea because it implies that every fetched file must be loaded into memory completely. On certain large yum repos, decompressed primary XML can be hundreds of MB, and it's not appropriate to require loading that all into memory at once. Make it support a file-like object (stream of bytes). Since the SAX XML parser supports reading from a stream, this makes it possible to avoid loading everything into memory at once. A test of repo-autoindex CLI against /content/dist/rhel/server/7/7Server/x86_64/os showed major improvement: - before: ~1200MiB - after: ~80MiB Note that achieving the full improvement requires any downstream users of the library (e.g. exodus-gw) to update their Fetcher implementation as well, to stop returning a 'str'. --- README.md | 4 ++ pyproject.toml | 2 +- repo_autoindex/_impl/api.py | 56 ++++++++++++++++++-------- repo_autoindex/_impl/base.py | 11 +++-- repo_autoindex/_impl/kickstart.py | 16 +++++--- repo_autoindex/_impl/pulp.py | 13 ++++-- repo_autoindex/_impl/yum.py | 23 ++++++----- tests/test_http_fetcher.py | 13 +++++- tests/test_kickstart_render_typical.py | 15 +++++-- 9 files changed, 110 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 07d1c9d..f1003ff 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,10 @@ information about the usage of `repo-autoindex`. ## Changelog +### v1.2.0 - 2023-09-22 + +- Support streamed fetching to reduce memory usage when fetching large files. + ### v1.1.2 - 2023-09-18 - Add `py.typed` to make package PEP 561 compliant / enable downstream type-checking. diff --git a/pyproject.toml b/pyproject.toml index 8fc46cf..acd66be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "repo-autoindex" -version = "1.1.2" +version = "1.2.0" description = "Generic static HTML indexes of various repository types" authors = ["Rohan McGovern "] license = "GPL-3.0-or-later" diff --git a/repo_autoindex/_impl/api.py b/repo_autoindex/_impl/api.py index 4954e54..c7ee2c3 100644 --- a/repo_autoindex/_impl/api.py +++ b/repo_autoindex/_impl/api.py @@ -1,11 +1,14 @@ import gzip import logging -from collections.abc import AsyncGenerator -from typing import Optional, Type +from collections.abc import AsyncGenerator, Awaitable, Callable +from typing import Optional, Type, BinaryIO +import tempfile +import io import aiohttp -from .base import Fetcher, GeneratedIndex, Repo, ContentError, FetcherError + +from .base import Fetcher, IOFetcher, GeneratedIndex, Repo, ContentError, FetcherError from .yum import YumRepo from .pulp import PulpFileRepo from .kickstart import KickstartRepo @@ -15,7 +18,9 @@ REPO_TYPES: list[Type[Repo]] = [KickstartRepo, YumRepo, PulpFileRepo] def http_fetcher(session: aiohttp.ClientSession) -> Fetcher: - async def get_content_with_session(url: str) -> Optional[str]: + async def get_content_with_session( + url: str, + ) -> Optional[BinaryIO]: LOG.info("Fetching: %s", url) async with session.get(url) as resp: if resp.status == 404: @@ -26,26 +31,39 @@ def http_fetcher(session: aiohttp.ClientSession) -> Fetcher: # Any other error status is fatal resp.raise_for_status() + out: BinaryIO = tempfile.NamedTemporaryFile(prefix="repo-autoindex") # type: ignore + async for chunk in resp.content: + out.write(chunk) + out.flush() + out.seek(0) + # Deal with the non-ideal content negotiation # for certain storage backends. if url.endswith(".gz") and resp.content_type in ( "application/x-gzip", "application/octet-stream", ): - compressed = await resp.content.read() - uncomp = gzip.decompress(compressed) - return uncomp.decode("utf-8") + out = gzip.GzipFile(fileobj=out) # type: ignore - return await resp.text() + return out return get_content_with_session -def with_error_handling(fetcher: Fetcher) -> Fetcher: - # wraps a fetcher such that any raised exceptions are wrapped into FetcherError - async def new_fetcher(url: str) -> Optional[str]: +def wrapped_fetcher(fetcher: Fetcher) -> IOFetcher: + # wraps a fetcher as passed in by the caller into an internal + # fetcher enforcing certain behaviors: + # + # - wraps all exceptions in FetcherError + # + # - adapts 'str' outputs into io streams + # + async def new_fetcher(url: str) -> Optional[BinaryIO]: try: - return await fetcher(url) + out = await fetcher(url) + if isinstance(out, str): + out = io.BytesIO(out.encode()) + return out except Exception as exc: raise FetcherError from exc @@ -79,9 +97,15 @@ async def autoindex( - if the fetcher can determine, without error, that the requested content does not exist: it must return ``None``. - - if the fetcher can retrieve the requested content, it must return the entire - content at the given URL as a ``str``. This implies that, for example, - decompressing a compressed file is the responsibility of the fetcher. + - if the fetcher can retrieve the requested content, it must return the + content at the given URL as a file-like object. + + Returning a ``str`` is also possible, but not recommended since it + requires loading an entire file into memory at once, and some + repositories contain very large files. + + Note that decompressing compressed files (such as bzipped XML in + yum repositories) is the responsibility of the fetcher. - if the fetcher encounters an exception, it may allow the exception to propagate. @@ -124,7 +148,7 @@ async def autoindex( while url.endswith("/"): url = url[:-1] - fetcher = with_error_handling(fetcher) + fetcher = wrapped_fetcher(fetcher) try: for repo_type in REPO_TYPES: diff --git a/repo_autoindex/_impl/base.py b/repo_autoindex/_impl/base.py index e9954a7..55a8121 100644 --- a/repo_autoindex/_impl/base.py +++ b/repo_autoindex/_impl/base.py @@ -1,11 +1,14 @@ from abc import ABC, abstractmethod from collections.abc import AsyncGenerator, Awaitable, Callable from dataclasses import dataclass -from typing import Optional, Type, TypeVar +from typing import Optional, Type, TypeVar, BinaryIO, Union T = TypeVar("T") -Fetcher = Callable[[str], Awaitable[Optional[str]]] +Fetcher = Callable[[str], Awaitable[Optional[Union[str, BinaryIO]]]] + +# Like public Fetcher type above but does not allow 'str' outputs. +IOFetcher = Callable[[str], Awaitable[Optional[BinaryIO]]] ICON_FOLDER = "📂" @@ -59,7 +62,7 @@ class Repo(ABC): self, base_url: str, entry_point_content: str, - fetcher: Fetcher, + fetcher: IOFetcher, ): self.base_url = base_url self.entry_point_content = entry_point_content @@ -73,7 +76,7 @@ class Repo(ABC): @classmethod @abstractmethod - async def probe(cls: Type[T], fetcher: Fetcher, url: str) -> Optional[T]: + async def probe(cls: Type[T], fetcher: IOFetcher, url: str) -> Optional[T]: """Determine if a specified URL seems to point at a repository of this type. If so, returns an initialized Repo of a concrete subtype. If not, returns None. diff --git a/repo_autoindex/_impl/kickstart.py b/repo_autoindex/_impl/kickstart.py index ef21675..c25e50f 100644 --- a/repo_autoindex/_impl/kickstart.py +++ b/repo_autoindex/_impl/kickstart.py @@ -5,7 +5,7 @@ import configparser import json import os -from .base import Repo, GeneratedIndex, Fetcher, IndexEntry, ICON_OPTICAL, ICON_QCOW +from .base import GeneratedIndex, IOFetcher, IndexEntry, ICON_OPTICAL from .template import TemplateContext from .tree import treeify from .yum import YumRepo @@ -20,7 +20,7 @@ class KickstartRepo(YumRepo): repomd_xml: str, extra_files: str, treeinfo: str, - fetcher: Fetcher, + fetcher: IOFetcher, ): super().__init__(base_url, repomd_xml, fetcher) self.base_url = base_url @@ -211,12 +211,12 @@ class KickstartRepo(YumRepo): @classmethod async def probe( - cls: Type["KickstartRepo"], fetcher: Fetcher, url: str + cls: Type["KickstartRepo"], fetcher: IOFetcher, url: str ) -> Optional["KickstartRepo"]: treeinfo_url = f"{url}/treeinfo" treeinfo_content = await fetcher(treeinfo_url) extra_files_url = f"{url}/extra_files.json" - extra_files_content = await fetcher(extra_files_url) or "" + extra_files_content = await fetcher(extra_files_url) repomd_xml_url = f"{url}/repodata/repomd.xml" repomd_xml = await fetcher(repomd_xml_url) @@ -232,4 +232,10 @@ class KickstartRepo(YumRepo): if treeinfo_content is None or repomd_xml is None: return None - return cls(url, repomd_xml, extra_files_content, treeinfo_content, fetcher) + return cls( + url, + repomd_xml.read().decode(), + extra_files_content.read().decode() if extra_files_content else "", + treeinfo_content.read().decode(), + fetcher, + ) diff --git a/repo_autoindex/_impl/pulp.py b/repo_autoindex/_impl/pulp.py index 40eed82..2580f4c 100644 --- a/repo_autoindex/_impl/pulp.py +++ b/repo_autoindex/_impl/pulp.py @@ -2,7 +2,14 @@ from typing import Optional, Type from collections.abc import AsyncGenerator import logging -from .base import Repo, GeneratedIndex, Fetcher, IndexEntry, ICON_OPTICAL, ICON_QCOW +from .base import ( + IOFetcher, + Repo, + GeneratedIndex, + IndexEntry, + ICON_OPTICAL, + ICON_QCOW, +) from .template import TemplateContext from .tree import treeify @@ -53,7 +60,7 @@ class PulpFileRepo(Repo): @classmethod async def probe( - cls: Type["PulpFileRepo"], fetcher: Fetcher, url: str + cls: Type["PulpFileRepo"], fetcher: IOFetcher, url: str ) -> Optional["PulpFileRepo"]: manifest_url = f"{url}/PULP_MANIFEST" manifest_content = await fetcher(manifest_url) @@ -61,4 +68,4 @@ class PulpFileRepo(Repo): if manifest_content is None: return None - return cls(url, manifest_content, fetcher) + return cls(url, manifest_content.read().decode(), fetcher) diff --git a/repo_autoindex/_impl/yum.py b/repo_autoindex/_impl/yum.py index 96dc935..960281d 100644 --- a/repo_autoindex/_impl/yum.py +++ b/repo_autoindex/_impl/yum.py @@ -3,14 +3,21 @@ import logging import os from collections.abc import AsyncGenerator, Generator, Iterable, Mapping from dataclasses import dataclass -from typing import Optional, Type, Any, TypeVar, NoReturn, overload +from typing import BinaryIO, Optional, Type, Any from xml.dom.minidom import Element from xml.dom.pulldom import END_ELEMENT, START_ELEMENT from xml.sax.handler import ContentHandler from defusedxml import pulldom, sax # type: ignore -from .base import ICON_PACKAGE, Fetcher, GeneratedIndex, IndexEntry, Repo, ContentError +from .base import ( + ICON_PACKAGE, + GeneratedIndex, + IOFetcher, + IndexEntry, + Repo, + ContentError, +) from .template import TemplateContext from .tree import treeify @@ -85,12 +92,12 @@ class PackagesParser(ContentHandler): self.current_package: Optional[Package] = None self.packages: list[Package] = [] - def parse(self, xmlstr: str) -> Iterable[Package]: + def parse(self, xml: BinaryIO) -> Iterable[Package]: self.packages = [] # Parse the XML document; this will invoke our start/end element handlers # which in turn will populate self.packages - sax.parseString(xmlstr.encode("utf-8"), self) + sax.parse(xml, self) return self.packages @@ -190,7 +197,6 @@ class YumRepo(Repo): return out async def _package_entries(self) -> list[IndexEntry]: - primary_nodes = list( pulldom_elements( self.entry_point_content, @@ -215,8 +221,7 @@ class YumRepo(Repo): key=lambda e: e.text, ) - def __packages_from_primary(self, primary_xml: str) -> Iterable[Package]: - LOG.debug("primary xml: %s", primary_xml) + def __packages_from_primary(self, primary_xml: BinaryIO) -> Iterable[Package]: return PackagesParser().parse(primary_xml) def __render_entries( @@ -237,7 +242,7 @@ class YumRepo(Repo): @classmethod async def probe( cls: Type["YumRepo"], - fetcher: Fetcher, + fetcher: IOFetcher, url: str, ) -> Optional["YumRepo"]: repomd_xml_url = f"{url}/repodata/repomd.xml" @@ -248,4 +253,4 @@ class YumRepo(Repo): return None # it is a yum repo - return cls(url, repomd_xml, fetcher) + return cls(url, repomd_xml.read().decode(), fetcher) diff --git a/tests/test_http_fetcher.py b/tests/test_http_fetcher.py index 9019cae..8e4ef5f 100644 --- a/tests/test_http_fetcher.py +++ b/tests/test_http_fetcher.py @@ -7,6 +7,17 @@ from repo_autoindex._impl.api import http_fetcher class FakeReader: def __init__(self, body: bytes): self.body = body + self.iterating = False + + def __aiter__(self): + self.iterating = True + return self + + async def __anext__(self): + if not self.iterating: + raise StopAsyncIteration + self.iterating = False + return self.body async def read(self): return self.body @@ -53,4 +64,4 @@ async def test_http_fetcher_decompresses(content_type: str): fetcher = http_fetcher(session) response = await fetcher("/some/path.gz") - assert response == text + assert response.read().decode() == text diff --git a/tests/test_kickstart_render_typical.py b/tests/test_kickstart_render_typical.py index 509c156..6dcac1c 100644 --- a/tests/test_kickstart_render_typical.py +++ b/tests/test_kickstart_render_typical.py @@ -1,4 +1,5 @@ -from typing import Optional +import io +from typing import BinaryIO, Optional import textwrap from repo_autoindex import autoindex @@ -445,7 +446,7 @@ repository = . type = variant uid = BaseOS""" -TREEINFO_APPSTREAM="""[general] +TREEINFO_APPSTREAM = """[general] ; WARNING.0 = This section provides compatibility with pre-productmd treeinfos. ; WARNING.1 = Read productmd documentation for details about new format. arch = x86_64 @@ -531,8 +532,14 @@ class StaticFetcher: def __init__(self): self.content: dict[str, str] = {} - async def __call__(self, url: str) -> Optional[str]: - return self.content.get(url) + async def __call__(self, url: str) -> Optional[BinaryIO]: + out = self.content.get(url) + if out is not None: + # Since fetchers are allowed to return either str or an io stream, + # this test wraps the canned strings into a stream (while some other + # tests do not) to ensure both cases are covered. + out = io.BytesIO(out.encode()) + return out async def test_typical_index():