From 491a8a5480e07606f9e3b27c2772330c9ee9d642 Mon Sep 17 00:00:00 2001 From: ghreprimand Date: Mon, 1 Jun 2026 15:55:03 -0500 Subject: [PATCH] Harden backup restore tar extraction Co-authored-by: ghreprimand <203024559+ghreprimand@users.noreply.github.com> --- scripts/odysseus-backup | 57 ++++++++++---- tests/test_backup_cli_security.py | 120 ++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 14 deletions(-) create mode 100644 tests/test_backup_cli_security.py diff --git a/scripts/odysseus-backup b/scripts/odysseus-backup index b71d08a..28f187f 100755 --- a/scripts/odysseus-backup +++ b/scripts/odysseus-backup @@ -24,9 +24,9 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "_lib")) from cli import quiet_logs, emit, fail, common_parser, run, REPO_ROOT as _REPO_ROOT quiet_logs() -import argparse, json, logging, os, sqlite3, subprocess, sys, tarfile, tempfile +import argparse, json, logging, os, shutil, sqlite3, subprocess, sys, tarfile, tempfile from datetime import datetime -from pathlib import Path +from pathlib import Path, PurePosixPath _DATA_DIR = _REPO_ROOT / "data" _BACKUP_DIR = _REPO_ROOT / "backups" @@ -70,7 +70,7 @@ def cmd_snapshot(args): ) out_path.parent.mkdir(parents=True, exist_ok=True) - sqlite_dbs = [p for p in _DATA_DIR.rglob("*.db") if p.is_file()] + sqlite_dbs = [p for p in _DATA_DIR.rglob("*.db") if p.is_file() and not p.is_symlink()] files_added = 0 total_bytes = 0 @@ -87,7 +87,7 @@ def cmd_snapshot(args): with tarfile.open(out_path, "w:gz") as tar: for p in sorted(_DATA_DIR.rglob("*")): - if not p.is_file(): + if not p.is_file() or p.is_symlink(): continue rel = p.relative_to(_DATA_DIR.parent) # Skip user-asked-to-skip categories @@ -143,6 +143,7 @@ def cmd_verify(args): try: with tarfile.open(path, "r:gz") as tar: members = tar.getmembers() + _validate_restore_members(members) except (tarfile.TarError, OSError) as e: fail(f"tarball is corrupt: {e}") emit({ @@ -154,6 +155,35 @@ def cmd_verify(args): }, args) +def _validate_restore_members(members): + """Reject archive entries that can escape data/ during restore.""" + for m in members: + rel = PurePosixPath(m.name) + if rel.is_absolute() or ".." in rel.parts: + fail(f"refusing tarball with absolute/parent path: {m.name!r}") + if not rel.parts or rel.parts[0] != "data": + fail(f"refusing tarball with entry outside data/: {m.name!r}") + if m.issym() or m.islnk(): + fail(f"refusing tarball with link entry: {m.name!r}") + if not (m.isdir() or m.isfile()): + fail(f"refusing tarball with special file entry: {m.name!r}") + + +def _extract_restore_members(tar, members, root: Path) -> None: + """Extract only regular files/directories after validation.""" + for m in members: + target = root.joinpath(*PurePosixPath(m.name).parts) + if m.isdir(): + target.mkdir(parents=True, exist_ok=True) + continue + target.parent.mkdir(parents=True, exist_ok=True) + src = tar.extractfile(m) + if src is None: + fail(f"extract failed: could not read {m.name!r}") + with src, open(target, "wb") as dst: + shutil.copyfileobj(src, dst) + + def cmd_restore(args): """Overwrite `data/` from a tarball. Destructive; requires --yes.""" path = Path(args.path) @@ -161,26 +191,25 @@ def cmd_restore(args): fail(f"no file at {path}") if not args.yes: fail("restore is destructive — pass --yes to confirm overwriting data/") - # Sanity check: tarball entries must all be under `data/`. If anyone - # crafted a malicious tarball with `../etc/passwd`, refuse. + # Sanity check: tarball entries must all be safe, regular files/dirs under + # `data/`. Avoid extractall() so symlink/hardlink entries can't redirect a + # later write outside the repo. + stash = None with tarfile.open(path, "r:gz") as tar: - for m in tar.getmembers(): - if m.name.startswith("/") or ".." in Path(m.name).parts: - fail(f"refusing tarball with absolute/parent path: {m.name!r}") - if not m.name.startswith("data/") and m.name != "data": - fail(f"refusing tarball with entry outside data/: {m.name!r}") + members = tar.getmembers() + _validate_restore_members(members) # Save a safety copy of current data/ before extracting. - if _DATA_DIR.exists(): + if _DATA_DIR.exists() or _DATA_DIR.is_symlink(): stash = _REPO_ROOT / f"data.before-restore-{datetime.now().strftime('%Y%m%d-%H%M%S')}" os.rename(_DATA_DIR, stash) try: - tar.extractall(path=_REPO_ROOT) + _extract_restore_members(tar, members, _REPO_ROOT) except Exception as e: fail(f"extract failed: {e}") emit({ "ok": True, "restored_from": str(path), - "previous_data_stashed_at": str(stash) if _DATA_DIR.exists() else None, + "previous_data_stashed_at": str(stash) if stash else None, }, args) diff --git a/tests/test_backup_cli_security.py b/tests/test_backup_cli_security.py new file mode 100644 index 0000000..b10aee3 --- /dev/null +++ b/tests/test_backup_cli_security.py @@ -0,0 +1,120 @@ +import importlib.machinery +import importlib.util +import io +import tarfile +from pathlib import Path +from types import SimpleNamespace + +import pytest + + +def _load_backup_cli(): + path = Path(__file__).resolve().parent.parent / "scripts" / "odysseus-backup" + loader = importlib.machinery.SourceFileLoader("odysseus_backup_under_test", str(path)) + spec = importlib.util.spec_from_loader(loader.name, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + return module + + +def _patch_repo(module, monkeypatch, root: Path): + monkeypatch.setattr(module, "_REPO_ROOT", root) + monkeypatch.setattr(module, "_DATA_DIR", root / "data") + + +def _restore_args(path: Path): + return SimpleNamespace(path=str(path), yes=True, pretty=False) + + +def _verify_args(path: Path): + return SimpleNamespace(path=str(path), pretty=False) + + +def test_restore_rejects_symlink_escape(tmp_path, monkeypatch): + backup = _load_backup_cli() + repo = tmp_path / "repo" + data = repo / "data" + outside = tmp_path / "outside" + data.mkdir(parents=True) + outside.mkdir() + (data / "keep.txt").write_text("still here", encoding="utf-8") + _patch_repo(backup, monkeypatch, repo) + + tar_path = tmp_path / "malicious.tar.gz" + with tarfile.open(tar_path, "w:gz") as tar: + data_dir = tarfile.TarInfo("data") + data_dir.type = tarfile.DIRTYPE + tar.addfile(data_dir) + + link = tarfile.TarInfo("data/link") + link.type = tarfile.SYMTYPE + link.linkname = str(outside) + tar.addfile(link) + + payload = b"escaped" + escaped = tarfile.TarInfo("data/link/pwned.txt") + escaped.size = len(payload) + tar.addfile(escaped, io.BytesIO(payload)) + + with pytest.raises(SystemExit): + backup.cmd_restore(_restore_args(tar_path)) + + assert not (outside / "pwned.txt").exists() + assert (data / "keep.txt").read_text(encoding="utf-8") == "still here" + + +def test_verify_rejects_symlink_escape(tmp_path): + backup = _load_backup_cli() + + tar_path = tmp_path / "malicious.tar.gz" + with tarfile.open(tar_path, "w:gz") as tar: + link = tarfile.TarInfo("data/link") + link.type = tarfile.SYMTYPE + link.linkname = "/tmp" + tar.addfile(link) + + with pytest.raises(SystemExit): + backup.cmd_verify(_verify_args(tar_path)) + + +def test_restore_rejects_hardlink_entries(tmp_path, monkeypatch): + backup = _load_backup_cli() + repo = tmp_path / "repo" + (repo / "data").mkdir(parents=True) + _patch_repo(backup, monkeypatch, repo) + + tar_path = tmp_path / "hardlink.tar.gz" + with tarfile.open(tar_path, "w:gz") as tar: + link = tarfile.TarInfo("data/hardlink") + link.type = tarfile.LNKTYPE + link.linkname = "../outside.txt" + tar.addfile(link) + + with pytest.raises(SystemExit): + backup.cmd_restore(_restore_args(tar_path)) + + +def test_restore_extracts_regular_files_without_extractall(tmp_path, monkeypatch): + backup = _load_backup_cli() + repo = tmp_path / "repo" + data = repo / "data" + data.mkdir(parents=True) + (data / "old.txt").write_text("old", encoding="utf-8") + _patch_repo(backup, monkeypatch, repo) + + tar_path = tmp_path / "valid.tar.gz" + with tarfile.open(tar_path, "w:gz") as tar: + folder = tarfile.TarInfo("data/nested") + folder.type = tarfile.DIRTYPE + tar.addfile(folder) + + payload = b"new" + item = tarfile.TarInfo("data/nested/new.txt") + item.size = len(payload) + tar.addfile(item, io.BytesIO(payload)) + + backup.cmd_restore(_restore_args(tar_path)) + + assert (repo / "data" / "nested" / "new.txt").read_text(encoding="utf-8") == "new" + assert not (repo / "data" / "old.txt").exists() + assert list(repo.glob("data.before-restore-*"))