diff --git a/k8s_gitlab_borg/__main__.py b/k8s_gitlab_borg/__main__.py index f724e8a..6d68d87 100644 --- a/k8s_gitlab_borg/__main__.py +++ b/k8s_gitlab_borg/__main__.py @@ -4,6 +4,7 @@ import os, sys from os import environ +from pwd import getpwnam import kubernetes as k8s @@ -16,6 +17,92 @@ from base64 import b64decode from minio import Minio +from datetime import datetime, timezone, timedelta +from dateutil.tz import gettz, tzutc +import pytimeparse + +import re + +from dataclasses import dataclass + +from collections import OrderedDict, defaultdict, deque + +from functools import cache + +import toml +import json + +import subprocess + +import select +import time +import math +from math import floor + + +TIME_PATTERNS = OrderedDict( + [ + ("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")), + ("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")), + ("5m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 5) * 5)), + ("15m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 15) * 15)), + ("hourly", lambda t: t.strftime("%Y-%m-%d %H")), + ("4h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 4) * 4)), + ("12h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 12) * 12)), + ("daily", lambda t: t.strftime("%Y-%m-%d")), + ( + "halfweekly", + lambda t: (t.strftime("%G-%V"), floor(int(t.strftime("%u")) / 4) * 4), + ), + ("weekly", lambda t: t.strftime("%G-%V")), + ("monthly", lambda t: t.strftime("%Y-%m")), + ("yearly", lambda t: t.strftime("%Y")), + ] +) + +BACKUP_PATTERN = re.compile(r"^(?P[0-9]+)_.*_gitlab_backup\.tar$") + + +@dataclass(eq=True, order=True, frozen=True) +class GitlabBackup: + creation: datetime + filename: str + version_id: str + + +@dataclass(eq=True, order=True, frozen=True) +class KeptBecause: + rule: str + ix: int + period: str + + +def convert_timedelta(secs_str): + secs = pytimeparse.parse(secs_str) + if secs is None: + raise ValueError("Could not parse timedelta expression ‘%s’", secs_str) + return timedelta(seconds=secs) + + +def as_borg(): + global logger, borg_pwd + + try: + os.setgid(borg_pwd.pw_gid) + os.setuid(borg_pwd.pw_uid) + except Exception: + logger.error(format_exc()) + raise + + +def _archive_name(backup, target, archive_prefix): + return f"{target}::{_archive_basename(backup, archive_prefix)}" + + +def _archive_basename(backup, archive_prefix): + creation_time = backup.creation.astimezone(timezone.utc) + return f'{archive_prefix}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' + def main(): global logger @@ -28,6 +115,38 @@ def main(): console_handler.setFormatter( logging.Formatter("%(asctime)s [%(levelname)s](%(name)s): %(message)s") ) + + burst_max = 10000 + burst = burst_max + last_use = None + inv_rate = 1e6 + + def consume_filter(record): + nonlocal burst, burst_max, inv_rate, last_use + + delay = None + while True: + now = time.monotonic_ns() + burst = ( + min(burst_max, burst + math.floor((now - last_use) / inv_rate)) + if last_use + else burst_max + ) + last_use = now + + if burst > 0: + burst -= 1 + if delay: + delay = now - delay + + return True + + if delay is None: + delay = now + time.sleep(inv_rate / 1e9) + + console_handler.addFilter(consume_filter) + logger.addHandler(console_handler) # log uncaught exceptions @@ -65,6 +184,9 @@ def main(): const=-1, help="Decrease verbosity by one step", ) + parser.add_argument( + "--dry-run", "-n", action="store_true", help="Don't actually cause any changes" + ) parser.add_argument( "--k8s-config", type=Path, @@ -81,6 +203,14 @@ def main(): default="gitlab", help="K8s namespace containing gitlab deployment(s)", ) + parser.add_argument("--target", metavar="REPO", help="Borg repository") + parser.add_argument( + "--archive-prefix", + metavar="STR", + default="gitlab", + help="Prefix for borg archive names", + ) + parser.add_argument("config", type=Path, metavar="FILE", help="Configuration file") args = parser.parse_args() LOG_LEVELS = [ @@ -97,6 +227,11 @@ def main(): log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) logger.setLevel(LOG_LEVELS[log_level]) + global borg_pwd + borg_pwd = getpwnam("borg") + + config = toml.load(args.config) + logger.debug("K8s config file: ‘%s’", args.k8s_config) with k8s.config.new_client_from_config(str(args.k8s_config)) as client: core_api = k8s.client.CoreV1Api(client) @@ -140,8 +275,263 @@ def main(): secret_key=minio_secretkey, ) + logger.info("Loading backup list from MinIO...") + backups = set() for obj in minio.list_objects("gitlab-backups"): - logger.debug("‘%s’", obj.object_name) + match = BACKUP_PATTERN.match(obj.object_name) + if not match: + logger.warn("Could not match object name ‘%s’", obj.object_name) + continue + creation_time = datetime.fromtimestamp(int(match.group("ts"))) + backup = GitlabBackup( + filename=obj.object_name, + creation=creation_time, + version_id=obj.version_id, + ) + logger.debug(backup) + backups.add(backup) + + logger.info("Loading archive list from borg...") + archives = set() + with subprocess.Popen( + ["borg", "list", "--info", "--lock-wait=600", "--json", args.target], + stdout=subprocess.PIPE, + preexec_fn=as_borg, + ) as proc: + for archive in json.load(proc.stdout)["archives"]: + # logger.debug('Archive: ‘%s’', archive["barchive"]) + archives.add(archive["barchive"]) + + kept_count = defaultdict(lambda: 0) + kept_because = OrderedDict() + + def keep_because(backup, rule, period=None): + nonlocal kept_count, kept_because + kept_count[rule] += 1 + if backup not in kept_because: + kept_because[backup] = deque() + kept_because[backup].append( + KeptBecause(rule=rule, ix=kept_count[rule], period=period) + ) + + pattern_timezone = config.get("k8s-gitlab-borg", {}).get("timezone", None) + if pattern_timezone: + pattern_timezone = gettz(pattern_timezone) + else: + pattern_timezone = tzutc + logger.debug("Rule timezone: %s", pattern_timezone) + + copy_candidates = set() + for rule, pattern in TIME_PATTERNS.items(): + desired_count = config.get("copy", {}).get(rule, {}).get("count", 0) + + periods = OrderedDict() + for backup in sorted(backups, reverse=True): + period = pattern(backup.creation.astimezone(pattern_timezone)) + if period not in periods: + periods[period] = deque() + periods[period].append(backup) + + to_exec = desired_count + ordered_periods = periods.items() + for period, period_backups in ordered_periods: + if to_exec == 0: + break + + for backup in period_backups: + copy_candidates.add(backup) + logger.debug( + "‘%s’ (%s) is candidate for copying", + backup.filename, + backup.creation, + ) + to_exec -= 1 + break + + if to_exec > 0: + logger.debug( + "Missing %d to fulfill copy %s=%d", to_exec, rule, desired_count + ) + already_copied = set() + for backup in copy_candidates: + archive = _archive_basename(backup, args.archive_prefix) + if archive in archives: + already_copied.add(backup) + logger.debug( + "‘%s’ (%s, ‘%s’) already copied", + backup.filename, + backup.creation, + archive, + ) + else: + logger.debug( + "‘%s’ (%s, ‘%s’) to copy", backup.filename, backup.creation, archive + ) + copy_candidates -= already_copied + + copyCount = config.get("copy", {}).get("count", 1) + copied = set() + for backup in sorted(copy_candidates, reverse=True): + if copyCount > 0 and len(copied) >= copyCount: + logger.debug("copyCount of %d reached", copyCount) + break + + logger.info("Copying ‘%s’ (%s)...", backup.filename, backup.creation) + + env = environ.copy() + import_args = [ + "borg", + "import-tar", + "--lock-wait=600", + "--compression=auto,zstd,10", + "--chunker-params=10,23,16,4095", + "--show-rc", + "--upload-buffer=100", + "--progress", + "--list", + "--filter=AMEi-x?", + "--stats" if not args.dry_run else "--dry-run", + ] + creation_time = backup.creation.astimezone(timezone.utc) + import_args += [ + f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' + ] + archive_name = _archive_name(backup, args.target, args.archive_prefix) + import_args += [archive_name, "-"] + logger.debug("%s", {"import_args": import_args, "env": env}) + + try: + download = minio.get_object( + bucket_name="gitlab-backups", + object_name=backup.filename, + version_id=backup.version_id, + ) + download_stream = download.stream(decode_content=True) + + with subprocess.Popen( + import_args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + preexec_fn=lambda: as_borg(), + text=True, + ) as proc: + proc_logger = logger.getChild("borg") + stdout_logger = proc_logger.getChild("stdout") + stderr_logger = proc_logger.getChild("stderr") + + poll = select.poll() + poll.register(proc.stdin, select.POLLOUT | select.POLLHUP) + poll.register(proc.stdout, select.POLLIN | select.POLLHUP) + poll.register(proc.stderr, select.POLLIN | select.POLLHUP) + pollc = 2 + events = poll.poll() + while pollc > 0 and len(events) > 0: + for rfd, event in events: + if event & select.POLLOUT: + if rfd == proc.stdin.fileno(): + if chunk := next(download_stream, b""): + proc.stdin.buffer.write(chunk) + else: + proc.stdin.close() + if event & select.POLLIN: + if rfd == proc.stdout.fileno(): + if line := proc.stdout.readline(): + stdout_logger.info(line[:-1]) + if rfd == proc.stderr.fileno(): + if line := proc.stderr.readline(): + stderr_logger.info(line[:-1]) + if event & select.POLLHUP: + poll.unregister(rfd) + pollc -= 1 + + if pollc > 0: + events = poll.poll() + + for handler in proc_logger.handlers: + handler.flush() + + ret = proc.wait() + if ret != 0: + raise Exception(f"borg subprocess exited with returncode {ret}") + finally: + download.close() + download.release_conn() + + copy_candidates -= copied + for candidate in copy_candidates: + keep_because(candidate, "copy-candidate") + + within = convert_timedelta(config.get("keep", {}).get("within", "0s")) + logger.debug("Keep within: %s", within) + if within > timedelta(seconds=0): + time_ref = max(backups, key=lambda backup: backup.creation, default=None) + if not time_ref: + logger.warn("Nothing to keep") + else: + logger.info( + "Using ‘%s’ (%s) as time reference", + time_ref.filename, + time_ref.creation, + ) + within_cutoff = time_ref.creation - within + + for backup in sorted(backups, reverse=True): + if backup.creation >= within_cutoff: + keep_because(backup, "within") + else: + logger.warn("Skipping rule ‘within’ since retention period is zero") + + for rule, pattern in TIME_PATTERNS.items(): + desired_count = config.get("keep", {}).get(rule, {}).get("count", 0) + + periods = OrderedDict() + for backup in sorted(backups, reverse=True): + period = pattern(backup.creation.astimezone(pattern_timezone)) + if period not in periods: + periods[period] = deque() + periods[period].append(backup) + + to_keep = desired_count + ordered_periods = periods.items() + for period, period_backups in ordered_periods: + if to_keep == 0: + break + + for backup in period_backups: + keep_because(backup, rule, period=period) + to_keep -= 1 + break + + if to_keep > 0: + logger.debug( + "Missing %d to fulfill keep %s=%d", to_keep, rule, desired_count + ) + + for backup, reasons in kept_because.items(): + logger.info( + "Keeping ‘%s’ (%s) because: %s", + backup.filename, + backup.creation, + ", ".join(map(str, reasons)), + ) + to_destroy = backups - {*kept_because} + if not to_destroy: + logger.info("Nothing to prune") + else: + for backup in sorted(to_destroy): + if args.dry_run: + logger.info( + "Would have pruned ‘%s’ (%s)", backup.filename, backup.creation + ) + else: + minio.remove_object( + bucket_name="gitlab-backups", + object_name=backup.filename, + version_id=backup.version_id, + ) + logger.info("Pruned ‘%s’ (%s)", backup.filename, backup.creation) if __name__ == "__main__": diff --git a/poetry.lock b/poetry.lock index 63d7bba..a9aaeec 100644 --- a/poetry.lock +++ b/poetry.lock @@ -129,6 +129,14 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" [package.dependencies] six = ">=1.5" +[[package]] +name = "pytimeparse" +version = "1.1.8" +description = "Time expression parser" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "pyyaml" version = "6.0" @@ -202,6 +210,14 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" +[[package]] +name = "toml" +version = "0.10.2" +description = "Python Library for Tom's Obvious, Minimal Language" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + [[package]] name = "urllib3" version = "2.0.2" @@ -232,7 +248,7 @@ test = ["websockets"] [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "5526daa4e12f128b550e8565ba48674ac113368e29b7124f99b6115e7087a6cb" +content-hash = "3529888cda7d20869283cefe056c3cc7a15d9b221178877ff88db918a1bd94bb" [metadata.files] cachetools = [ @@ -352,6 +368,10 @@ python-dateutil = [ {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, ] +pytimeparse = [ + {file = "pytimeparse-1.1.8-py2.py3-none-any.whl", hash = "sha256:04b7be6cc8bd9f5647a6325444926c3ac34ee6bc7e69da4367ba282f076036bd"}, + {file = "pytimeparse-1.1.8.tar.gz", hash = "sha256:e86136477be924d7e670646a98561957e8ca7308d44841e21f5ddea757556a0a"}, +] pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"}, {file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"}, @@ -414,6 +434,10 @@ six = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"}, ] +toml = [ + {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, + {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, +] urllib3 = [ {file = "urllib3-2.0.2-py3-none-any.whl", hash = "sha256:d055c2f9d38dc53c808f6fdc8eab7360b6fdbbde02340ed25cfbcd817c62469e"}, {file = "urllib3-2.0.2.tar.gz", hash = "sha256:61717a1095d7e155cdb737ac7bb2f4324a858a1e2e6466f6d03ff630ca68d3cc"}, diff --git a/pyproject.toml b/pyproject.toml index 3d806a6..28da142 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,8 @@ authors = ["Gregor Kleen "] python = "^3.10" kubernetes = "^26.1.0" minio = "^7.1.14" +toml = "^0.10.2" +pytimeparse = "^1.1.8" [tool.poetry.scripts]