implement functionality

This commit is contained in:
Gregor Kleen 2023-05-07 15:34:49 +02:00
parent 28c957e423
commit 91acea56c7
3 changed files with 418 additions and 2 deletions

View File

@ -4,6 +4,7 @@
import os, sys
from os import environ
from pwd import getpwnam
import kubernetes as k8s
@ -16,6 +17,92 @@ from base64 import b64decode
from minio import Minio
from datetime import datetime, timezone, timedelta
from dateutil.tz import gettz, tzutc
import pytimeparse
import re
from dataclasses import dataclass
from collections import OrderedDict, defaultdict, deque
from functools import cache
import toml
import json
import subprocess
import select
import time
import math
from math import floor
TIME_PATTERNS = OrderedDict(
[
("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")),
("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")),
("5m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 5) * 5)),
("15m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 15) * 15)),
("hourly", lambda t: t.strftime("%Y-%m-%d %H")),
("4h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 4) * 4)),
("12h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 12) * 12)),
("daily", lambda t: t.strftime("%Y-%m-%d")),
(
"halfweekly",
lambda t: (t.strftime("%G-%V"), floor(int(t.strftime("%u")) / 4) * 4),
),
("weekly", lambda t: t.strftime("%G-%V")),
("monthly", lambda t: t.strftime("%Y-%m")),
("yearly", lambda t: t.strftime("%Y")),
]
)
BACKUP_PATTERN = re.compile(r"^(?P<ts>[0-9]+)_.*_gitlab_backup\.tar$")
@dataclass(eq=True, order=True, frozen=True)
class GitlabBackup:
creation: datetime
filename: str
version_id: str
@dataclass(eq=True, order=True, frozen=True)
class KeptBecause:
rule: str
ix: int
period: str
def convert_timedelta(secs_str):
secs = pytimeparse.parse(secs_str)
if secs is None:
raise ValueError("Could not parse timedelta expression %s", secs_str)
return timedelta(seconds=secs)
def as_borg():
global logger, borg_pwd
try:
os.setgid(borg_pwd.pw_gid)
os.setuid(borg_pwd.pw_uid)
except Exception:
logger.error(format_exc())
raise
def _archive_name(backup, target, archive_prefix):
return f"{target}::{_archive_basename(backup, archive_prefix)}"
def _archive_basename(backup, archive_prefix):
creation_time = backup.creation.astimezone(timezone.utc)
return f'{archive_prefix}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'
def main():
global logger
@ -28,6 +115,38 @@ def main():
console_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s](%(name)s): %(message)s")
)
burst_max = 10000
burst = burst_max
last_use = None
inv_rate = 1e6
def consume_filter(record):
nonlocal burst, burst_max, inv_rate, last_use
delay = None
while True:
now = time.monotonic_ns()
burst = (
min(burst_max, burst + math.floor((now - last_use) / inv_rate))
if last_use
else burst_max
)
last_use = now
if burst > 0:
burst -= 1
if delay:
delay = now - delay
return True
if delay is None:
delay = now
time.sleep(inv_rate / 1e9)
console_handler.addFilter(consume_filter)
logger.addHandler(console_handler)
# log uncaught exceptions
@ -65,6 +184,9 @@ def main():
const=-1,
help="Decrease verbosity by one step",
)
parser.add_argument(
"--dry-run", "-n", action="store_true", help="Don't actually cause any changes"
)
parser.add_argument(
"--k8s-config",
type=Path,
@ -81,6 +203,14 @@ def main():
default="gitlab",
help="K8s namespace containing gitlab deployment(s)",
)
parser.add_argument("--target", metavar="REPO", help="Borg repository")
parser.add_argument(
"--archive-prefix",
metavar="STR",
default="gitlab",
help="Prefix for borg archive names",
)
parser.add_argument("config", type=Path, metavar="FILE", help="Configuration file")
args = parser.parse_args()
LOG_LEVELS = [
@ -97,6 +227,11 @@ def main():
log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0))
logger.setLevel(LOG_LEVELS[log_level])
global borg_pwd
borg_pwd = getpwnam("borg")
config = toml.load(args.config)
logger.debug("K8s config file: %s", args.k8s_config)
with k8s.config.new_client_from_config(str(args.k8s_config)) as client:
core_api = k8s.client.CoreV1Api(client)
@ -140,8 +275,263 @@ def main():
secret_key=minio_secretkey,
)
logger.info("Loading backup list from MinIO...")
backups = set()
for obj in minio.list_objects("gitlab-backups"):
logger.debug("%s", obj.object_name)
match = BACKUP_PATTERN.match(obj.object_name)
if not match:
logger.warn("Could not match object name %s", obj.object_name)
continue
creation_time = datetime.fromtimestamp(int(match.group("ts")))
backup = GitlabBackup(
filename=obj.object_name,
creation=creation_time,
version_id=obj.version_id,
)
logger.debug(backup)
backups.add(backup)
logger.info("Loading archive list from borg...")
archives = set()
with subprocess.Popen(
["borg", "list", "--info", "--lock-wait=600", "--json", args.target],
stdout=subprocess.PIPE,
preexec_fn=as_borg,
) as proc:
for archive in json.load(proc.stdout)["archives"]:
# logger.debug('Archive: %s', archive["barchive"])
archives.add(archive["barchive"])
kept_count = defaultdict(lambda: 0)
kept_because = OrderedDict()
def keep_because(backup, rule, period=None):
nonlocal kept_count, kept_because
kept_count[rule] += 1
if backup not in kept_because:
kept_because[backup] = deque()
kept_because[backup].append(
KeptBecause(rule=rule, ix=kept_count[rule], period=period)
)
pattern_timezone = config.get("k8s-gitlab-borg", {}).get("timezone", None)
if pattern_timezone:
pattern_timezone = gettz(pattern_timezone)
else:
pattern_timezone = tzutc
logger.debug("Rule timezone: %s", pattern_timezone)
copy_candidates = set()
for rule, pattern in TIME_PATTERNS.items():
desired_count = config.get("copy", {}).get(rule, {}).get("count", 0)
periods = OrderedDict()
for backup in sorted(backups, reverse=True):
period = pattern(backup.creation.astimezone(pattern_timezone))
if period not in periods:
periods[period] = deque()
periods[period].append(backup)
to_exec = desired_count
ordered_periods = periods.items()
for period, period_backups in ordered_periods:
if to_exec == 0:
break
for backup in period_backups:
copy_candidates.add(backup)
logger.debug(
"%s (%s) is candidate for copying",
backup.filename,
backup.creation,
)
to_exec -= 1
break
if to_exec > 0:
logger.debug(
"Missing %d to fulfill copy %s=%d", to_exec, rule, desired_count
)
already_copied = set()
for backup in copy_candidates:
archive = _archive_basename(backup, args.archive_prefix)
if archive in archives:
already_copied.add(backup)
logger.debug(
"%s (%s, %s) already copied",
backup.filename,
backup.creation,
archive,
)
else:
logger.debug(
"%s (%s, %s) to copy", backup.filename, backup.creation, archive
)
copy_candidates -= already_copied
copyCount = config.get("copy", {}).get("count", 1)
copied = set()
for backup in sorted(copy_candidates, reverse=True):
if copyCount > 0 and len(copied) >= copyCount:
logger.debug("copyCount of %d reached", copyCount)
break
logger.info("Copying %s (%s)...", backup.filename, backup.creation)
env = environ.copy()
import_args = [
"borg",
"import-tar",
"--lock-wait=600",
"--compression=auto,zstd,10",
"--chunker-params=10,23,16,4095",
"--show-rc",
"--upload-buffer=100",
"--progress",
"--list",
"--filter=AMEi-x?",
"--stats" if not args.dry_run else "--dry-run",
]
creation_time = backup.creation.astimezone(timezone.utc)
import_args += [
f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'
]
archive_name = _archive_name(backup, args.target, args.archive_prefix)
import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env})
try:
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
)
download_stream = download.stream(decode_content=True)
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
poll = select.poll()
poll.register(proc.stdin, select.POLLOUT | select.POLLHUP)
poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for rfd, event in events:
if event & select.POLLOUT:
if rfd == proc.stdin.fileno():
if chunk := next(download_stream, b""):
proc.stdin.buffer.write(chunk)
else:
proc.stdin.close()
if event & select.POLLIN:
if rfd == proc.stdout.fileno():
if line := proc.stdout.readline():
stdout_logger.info(line[:-1])
if rfd == proc.stderr.fileno():
if line := proc.stderr.readline():
stderr_logger.info(line[:-1])
if event & select.POLLHUP:
poll.unregister(rfd)
pollc -= 1
if pollc > 0:
events = poll.poll()
for handler in proc_logger.handlers:
handler.flush()
ret = proc.wait()
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
finally:
download.close()
download.release_conn()
copy_candidates -= copied
for candidate in copy_candidates:
keep_because(candidate, "copy-candidate")
within = convert_timedelta(config.get("keep", {}).get("within", "0s"))
logger.debug("Keep within: %s", within)
if within > timedelta(seconds=0):
time_ref = max(backups, key=lambda backup: backup.creation, default=None)
if not time_ref:
logger.warn("Nothing to keep")
else:
logger.info(
"Using %s (%s) as time reference",
time_ref.filename,
time_ref.creation,
)
within_cutoff = time_ref.creation - within
for backup in sorted(backups, reverse=True):
if backup.creation >= within_cutoff:
keep_because(backup, "within")
else:
logger.warn("Skipping rule within since retention period is zero")
for rule, pattern in TIME_PATTERNS.items():
desired_count = config.get("keep", {}).get(rule, {}).get("count", 0)
periods = OrderedDict()
for backup in sorted(backups, reverse=True):
period = pattern(backup.creation.astimezone(pattern_timezone))
if period not in periods:
periods[period] = deque()
periods[period].append(backup)
to_keep = desired_count
ordered_periods = periods.items()
for period, period_backups in ordered_periods:
if to_keep == 0:
break
for backup in period_backups:
keep_because(backup, rule, period=period)
to_keep -= 1
break
if to_keep > 0:
logger.debug(
"Missing %d to fulfill keep %s=%d", to_keep, rule, desired_count
)
for backup, reasons in kept_because.items():
logger.info(
"Keeping %s (%s) because: %s",
backup.filename,
backup.creation,
", ".join(map(str, reasons)),
)
to_destroy = backups - {*kept_because}
if not to_destroy:
logger.info("Nothing to prune")
else:
for backup in sorted(to_destroy):
if args.dry_run:
logger.info(
"Would have pruned %s (%s)", backup.filename, backup.creation
)
else:
minio.remove_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
)
logger.info("Pruned %s (%s)", backup.filename, backup.creation)
if __name__ == "__main__":

26
poetry.lock generated
View File

@ -129,6 +129,14 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
[package.dependencies]
six = ">=1.5"
[[package]]
name = "pytimeparse"
version = "1.1.8"
description = "Time expression parser"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pyyaml"
version = "6.0"
@ -202,6 +210,14 @@ category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "toml"
version = "0.10.2"
description = "Python Library for Tom's Obvious, Minimal Language"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]]
name = "urllib3"
version = "2.0.2"
@ -232,7 +248,7 @@ test = ["websockets"]
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "5526daa4e12f128b550e8565ba48674ac113368e29b7124f99b6115e7087a6cb"
content-hash = "3529888cda7d20869283cefe056c3cc7a15d9b221178877ff88db918a1bd94bb"
[metadata.files]
cachetools = [
@ -352,6 +368,10 @@ python-dateutil = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
{file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
]
pytimeparse = [
{file = "pytimeparse-1.1.8-py2.py3-none-any.whl", hash = "sha256:04b7be6cc8bd9f5647a6325444926c3ac34ee6bc7e69da4367ba282f076036bd"},
{file = "pytimeparse-1.1.8.tar.gz", hash = "sha256:e86136477be924d7e670646a98561957e8ca7308d44841e21f5ddea757556a0a"},
]
pyyaml = [
{file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"},
{file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"},
@ -414,6 +434,10 @@ six = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
toml = [
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
{file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
]
urllib3 = [
{file = "urllib3-2.0.2-py3-none-any.whl", hash = "sha256:d055c2f9d38dc53c808f6fdc8eab7360b6fdbbde02340ed25cfbcd817c62469e"},
{file = "urllib3-2.0.2.tar.gz", hash = "sha256:61717a1095d7e155cdb737ac7bb2f4324a858a1e2e6466f6d03ff630ca68d3cc"},

View File

@ -12,6 +12,8 @@ authors = ["Gregor Kleen <gregor@kleen.consulting>"]
python = "^3.10"
kubernetes = "^26.1.0"
minio = "^7.1.14"
toml = "^0.10.2"
pytimeparse = "^1.1.8"
[tool.poetry.scripts]