Further reduce memory usage on large yum repos [RHELDST-20453]

The Fetcher type was designed to return a 'str'.
That wasn't a good idea because it implies that every fetched file must
be loaded into memory completely. On certain large yum repos,
decompressed primary XML can be hundreds of MB, and it's not appropriate
to require loading that all into memory at once.

Make it support a file-like object (stream of bytes). Since the SAX
XML parser supports reading from a stream, this makes it possible to
avoid loading everything into memory at once.

A test of repo-autoindex CLI against
/content/dist/rhel/server/7/7Server/x86_64/os showed major
improvement:

- before: ~1200MiB
- after:    ~80MiB

Note that achieving the full improvement requires any downstream users
of the library (e.g. exodus-gw) to update their Fetcher implementation
as well, to stop returning a 'str'.
This commit is contained in:
Rohan McGovern 2023-09-21 10:24:13 +10:00
parent c557a61876
commit eac74ec1e4
9 changed files with 110 additions and 43 deletions

View file

@ -24,6 +24,10 @@ information about the usage of `repo-autoindex`.
## Changelog ## Changelog
### v1.2.0 - 2023-09-22
- Support streamed fetching to reduce memory usage when fetching large files.
### v1.1.2 - 2023-09-18 ### v1.1.2 - 2023-09-18
- Add `py.typed` to make package PEP 561 compliant / enable downstream type-checking. - Add `py.typed` to make package PEP 561 compliant / enable downstream type-checking.

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "repo-autoindex" name = "repo-autoindex"
version = "1.1.2" version = "1.2.0"
description = "Generic static HTML indexes of various repository types" description = "Generic static HTML indexes of various repository types"
authors = ["Rohan McGovern <rmcgover@redhat.com>"] authors = ["Rohan McGovern <rmcgover@redhat.com>"]
license = "GPL-3.0-or-later" license = "GPL-3.0-or-later"

View file

@ -1,11 +1,14 @@
import gzip import gzip
import logging import logging
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Optional, Type from typing import Optional, Type, BinaryIO
import tempfile
import io
import aiohttp import aiohttp
from .base import Fetcher, GeneratedIndex, Repo, ContentError, FetcherError
from .base import Fetcher, IOFetcher, GeneratedIndex, Repo, ContentError, FetcherError
from .yum import YumRepo from .yum import YumRepo
from .pulp import PulpFileRepo from .pulp import PulpFileRepo
from .kickstart import KickstartRepo from .kickstart import KickstartRepo
@ -15,7 +18,9 @@ REPO_TYPES: list[Type[Repo]] = [KickstartRepo, YumRepo, PulpFileRepo]
def http_fetcher(session: aiohttp.ClientSession) -> Fetcher: def http_fetcher(session: aiohttp.ClientSession) -> Fetcher:
async def get_content_with_session(url: str) -> Optional[str]: async def get_content_with_session(
url: str,
) -> Optional[BinaryIO]:
LOG.info("Fetching: %s", url) LOG.info("Fetching: %s", url)
async with session.get(url) as resp: async with session.get(url) as resp:
if resp.status == 404: if resp.status == 404:
@ -26,26 +31,39 @@ def http_fetcher(session: aiohttp.ClientSession) -> Fetcher:
# Any other error status is fatal # Any other error status is fatal
resp.raise_for_status() resp.raise_for_status()
out: BinaryIO = tempfile.NamedTemporaryFile(prefix="repo-autoindex") # type: ignore
async for chunk in resp.content:
out.write(chunk)
out.flush()
out.seek(0)
# Deal with the non-ideal content negotiation # Deal with the non-ideal content negotiation
# for certain storage backends. # for certain storage backends.
if url.endswith(".gz") and resp.content_type in ( if url.endswith(".gz") and resp.content_type in (
"application/x-gzip", "application/x-gzip",
"application/octet-stream", "application/octet-stream",
): ):
compressed = await resp.content.read() out = gzip.GzipFile(fileobj=out) # type: ignore
uncomp = gzip.decompress(compressed)
return uncomp.decode("utf-8")
return await resp.text() return out
return get_content_with_session return get_content_with_session
def with_error_handling(fetcher: Fetcher) -> Fetcher: def wrapped_fetcher(fetcher: Fetcher) -> IOFetcher:
# wraps a fetcher such that any raised exceptions are wrapped into FetcherError # wraps a fetcher as passed in by the caller into an internal
async def new_fetcher(url: str) -> Optional[str]: # fetcher enforcing certain behaviors:
#
# - wraps all exceptions in FetcherError
#
# - adapts 'str' outputs into io streams
#
async def new_fetcher(url: str) -> Optional[BinaryIO]:
try: try:
return await fetcher(url) out = await fetcher(url)
if isinstance(out, str):
out = io.BytesIO(out.encode())
return out
except Exception as exc: except Exception as exc:
raise FetcherError from exc raise FetcherError from exc
@ -79,9 +97,15 @@ async def autoindex(
- if the fetcher can determine, without error, that the requested content does not - if the fetcher can determine, without error, that the requested content does not
exist: it must return ``None``. exist: it must return ``None``.
- if the fetcher can retrieve the requested content, it must return the entire - if the fetcher can retrieve the requested content, it must return the
content at the given URL as a ``str``. This implies that, for example, content at the given URL as a file-like object.
decompressing a compressed file is the responsibility of the fetcher.
Returning a ``str`` is also possible, but not recommended since it
requires loading an entire file into memory at once, and some
repositories contain very large files.
Note that decompressing compressed files (such as bzipped XML in
yum repositories) is the responsibility of the fetcher.
- if the fetcher encounters an exception, it may allow the exception to - if the fetcher encounters an exception, it may allow the exception to
propagate. propagate.
@ -124,7 +148,7 @@ async def autoindex(
while url.endswith("/"): while url.endswith("/"):
url = url[:-1] url = url[:-1]
fetcher = with_error_handling(fetcher) fetcher = wrapped_fetcher(fetcher)
try: try:
for repo_type in REPO_TYPES: for repo_type in REPO_TYPES:

View file

@ -1,11 +1,14 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator, Awaitable, Callable from collections.abc import AsyncGenerator, Awaitable, Callable
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Type, TypeVar from typing import Optional, Type, TypeVar, BinaryIO, Union
T = TypeVar("T") T = TypeVar("T")
Fetcher = Callable[[str], Awaitable[Optional[str]]] Fetcher = Callable[[str], Awaitable[Optional[Union[str, BinaryIO]]]]
# Like public Fetcher type above but does not allow 'str' outputs.
IOFetcher = Callable[[str], Awaitable[Optional[BinaryIO]]]
ICON_FOLDER = "📂" ICON_FOLDER = "📂"
@ -59,7 +62,7 @@ class Repo(ABC):
self, self,
base_url: str, base_url: str,
entry_point_content: str, entry_point_content: str,
fetcher: Fetcher, fetcher: IOFetcher,
): ):
self.base_url = base_url self.base_url = base_url
self.entry_point_content = entry_point_content self.entry_point_content = entry_point_content
@ -73,7 +76,7 @@ class Repo(ABC):
@classmethod @classmethod
@abstractmethod @abstractmethod
async def probe(cls: Type[T], fetcher: Fetcher, url: str) -> Optional[T]: async def probe(cls: Type[T], fetcher: IOFetcher, url: str) -> Optional[T]:
"""Determine if a specified URL seems to point at a repository of this type. """Determine if a specified URL seems to point at a repository of this type.
If so, returns an initialized Repo of a concrete subtype. If not, returns None. If so, returns an initialized Repo of a concrete subtype. If not, returns None.

View file

@ -5,7 +5,7 @@ import configparser
import json import json
import os import os
from .base import Repo, GeneratedIndex, Fetcher, IndexEntry, ICON_OPTICAL, ICON_QCOW from .base import GeneratedIndex, IOFetcher, IndexEntry, ICON_OPTICAL
from .template import TemplateContext from .template import TemplateContext
from .tree import treeify from .tree import treeify
from .yum import YumRepo from .yum import YumRepo
@ -20,7 +20,7 @@ class KickstartRepo(YumRepo):
repomd_xml: str, repomd_xml: str,
extra_files: str, extra_files: str,
treeinfo: str, treeinfo: str,
fetcher: Fetcher, fetcher: IOFetcher,
): ):
super().__init__(base_url, repomd_xml, fetcher) super().__init__(base_url, repomd_xml, fetcher)
self.base_url = base_url self.base_url = base_url
@ -211,12 +211,12 @@ class KickstartRepo(YumRepo):
@classmethod @classmethod
async def probe( async def probe(
cls: Type["KickstartRepo"], fetcher: Fetcher, url: str cls: Type["KickstartRepo"], fetcher: IOFetcher, url: str
) -> Optional["KickstartRepo"]: ) -> Optional["KickstartRepo"]:
treeinfo_url = f"{url}/treeinfo" treeinfo_url = f"{url}/treeinfo"
treeinfo_content = await fetcher(treeinfo_url) treeinfo_content = await fetcher(treeinfo_url)
extra_files_url = f"{url}/extra_files.json" extra_files_url = f"{url}/extra_files.json"
extra_files_content = await fetcher(extra_files_url) or "" extra_files_content = await fetcher(extra_files_url)
repomd_xml_url = f"{url}/repodata/repomd.xml" repomd_xml_url = f"{url}/repodata/repomd.xml"
repomd_xml = await fetcher(repomd_xml_url) repomd_xml = await fetcher(repomd_xml_url)
@ -232,4 +232,10 @@ class KickstartRepo(YumRepo):
if treeinfo_content is None or repomd_xml is None: if treeinfo_content is None or repomd_xml is None:
return None return None
return cls(url, repomd_xml, extra_files_content, treeinfo_content, fetcher) return cls(
url,
repomd_xml.read().decode(),
extra_files_content.read().decode() if extra_files_content else "",
treeinfo_content.read().decode(),
fetcher,
)

View file

@ -2,7 +2,14 @@ from typing import Optional, Type
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
import logging import logging
from .base import Repo, GeneratedIndex, Fetcher, IndexEntry, ICON_OPTICAL, ICON_QCOW from .base import (
IOFetcher,
Repo,
GeneratedIndex,
IndexEntry,
ICON_OPTICAL,
ICON_QCOW,
)
from .template import TemplateContext from .template import TemplateContext
from .tree import treeify from .tree import treeify
@ -53,7 +60,7 @@ class PulpFileRepo(Repo):
@classmethod @classmethod
async def probe( async def probe(
cls: Type["PulpFileRepo"], fetcher: Fetcher, url: str cls: Type["PulpFileRepo"], fetcher: IOFetcher, url: str
) -> Optional["PulpFileRepo"]: ) -> Optional["PulpFileRepo"]:
manifest_url = f"{url}/PULP_MANIFEST" manifest_url = f"{url}/PULP_MANIFEST"
manifest_content = await fetcher(manifest_url) manifest_content = await fetcher(manifest_url)
@ -61,4 +68,4 @@ class PulpFileRepo(Repo):
if manifest_content is None: if manifest_content is None:
return None return None
return cls(url, manifest_content, fetcher) return cls(url, manifest_content.read().decode(), fetcher)

View file

@ -3,14 +3,21 @@ import logging
import os import os
from collections.abc import AsyncGenerator, Generator, Iterable, Mapping from collections.abc import AsyncGenerator, Generator, Iterable, Mapping
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Type, Any, TypeVar, NoReturn, overload from typing import BinaryIO, Optional, Type, Any
from xml.dom.minidom import Element from xml.dom.minidom import Element
from xml.dom.pulldom import END_ELEMENT, START_ELEMENT from xml.dom.pulldom import END_ELEMENT, START_ELEMENT
from xml.sax.handler import ContentHandler from xml.sax.handler import ContentHandler
from defusedxml import pulldom, sax # type: ignore from defusedxml import pulldom, sax # type: ignore
from .base import ICON_PACKAGE, Fetcher, GeneratedIndex, IndexEntry, Repo, ContentError from .base import (
ICON_PACKAGE,
GeneratedIndex,
IOFetcher,
IndexEntry,
Repo,
ContentError,
)
from .template import TemplateContext from .template import TemplateContext
from .tree import treeify from .tree import treeify
@ -85,12 +92,12 @@ class PackagesParser(ContentHandler):
self.current_package: Optional[Package] = None self.current_package: Optional[Package] = None
self.packages: list[Package] = [] self.packages: list[Package] = []
def parse(self, xmlstr: str) -> Iterable[Package]: def parse(self, xml: BinaryIO) -> Iterable[Package]:
self.packages = [] self.packages = []
# Parse the XML document; this will invoke our start/end element handlers # Parse the XML document; this will invoke our start/end element handlers
# which in turn will populate self.packages # which in turn will populate self.packages
sax.parseString(xmlstr.encode("utf-8"), self) sax.parse(xml, self)
return self.packages return self.packages
@ -190,7 +197,6 @@ class YumRepo(Repo):
return out return out
async def _package_entries(self) -> list[IndexEntry]: async def _package_entries(self) -> list[IndexEntry]:
primary_nodes = list( primary_nodes = list(
pulldom_elements( pulldom_elements(
self.entry_point_content, self.entry_point_content,
@ -215,8 +221,7 @@ class YumRepo(Repo):
key=lambda e: e.text, key=lambda e: e.text,
) )
def __packages_from_primary(self, primary_xml: str) -> Iterable[Package]: def __packages_from_primary(self, primary_xml: BinaryIO) -> Iterable[Package]:
LOG.debug("primary xml: %s", primary_xml)
return PackagesParser().parse(primary_xml) return PackagesParser().parse(primary_xml)
def __render_entries( def __render_entries(
@ -237,7 +242,7 @@ class YumRepo(Repo):
@classmethod @classmethod
async def probe( async def probe(
cls: Type["YumRepo"], cls: Type["YumRepo"],
fetcher: Fetcher, fetcher: IOFetcher,
url: str, url: str,
) -> Optional["YumRepo"]: ) -> Optional["YumRepo"]:
repomd_xml_url = f"{url}/repodata/repomd.xml" repomd_xml_url = f"{url}/repodata/repomd.xml"
@ -248,4 +253,4 @@ class YumRepo(Repo):
return None return None
# it is a yum repo # it is a yum repo
return cls(url, repomd_xml, fetcher) return cls(url, repomd_xml.read().decode(), fetcher)

View file

@ -7,6 +7,17 @@ from repo_autoindex._impl.api import http_fetcher
class FakeReader: class FakeReader:
def __init__(self, body: bytes): def __init__(self, body: bytes):
self.body = body self.body = body
self.iterating = False
def __aiter__(self):
self.iterating = True
return self
async def __anext__(self):
if not self.iterating:
raise StopAsyncIteration
self.iterating = False
return self.body
async def read(self): async def read(self):
return self.body return self.body
@ -53,4 +64,4 @@ async def test_http_fetcher_decompresses(content_type: str):
fetcher = http_fetcher(session) fetcher = http_fetcher(session)
response = await fetcher("/some/path.gz") response = await fetcher("/some/path.gz")
assert response == text assert response.read().decode() == text

View file

@ -1,4 +1,5 @@
from typing import Optional import io
from typing import BinaryIO, Optional
import textwrap import textwrap
from repo_autoindex import autoindex from repo_autoindex import autoindex
@ -445,7 +446,7 @@ repository = .
type = variant type = variant
uid = BaseOS""" uid = BaseOS"""
TREEINFO_APPSTREAM="""[general] TREEINFO_APPSTREAM = """[general]
; WARNING.0 = This section provides compatibility with pre-productmd treeinfos. ; WARNING.0 = This section provides compatibility with pre-productmd treeinfos.
; WARNING.1 = Read productmd documentation for details about new format. ; WARNING.1 = Read productmd documentation for details about new format.
arch = x86_64 arch = x86_64
@ -531,8 +532,14 @@ class StaticFetcher:
def __init__(self): def __init__(self):
self.content: dict[str, str] = {} self.content: dict[str, str] = {}
async def __call__(self, url: str) -> Optional[str]: async def __call__(self, url: str) -> Optional[BinaryIO]:
return self.content.get(url) out = self.content.get(url)
if out is not None:
# Since fetchers are allowed to return either str or an io stream,
# this test wraps the canned strings into a stream (while some other
# tests do not) to ensure both cases are covered.
out = io.BytesIO(out.encode())
return out
async def test_typical_index(): async def test_typical_index():