k8s-gitlab-borg/k8s_gitlab_borg/__main__.py

681 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPDX-FileCopyrightText: 2023 Gregor Kleen
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os, sys
from os import environ
from pwd import getpwnam
import kubernetes as k8s
from pathlib import Path
import argparse
import logging
from base64 import b64decode
from minio import Minio
from http.client import IncompleteRead
from time import sleep
from random import uniform
from datetime import datetime, timezone, timedelta
from dateutil.tz import gettz, UTC
import pytimeparse
import re
from dataclasses import dataclass
from collections import OrderedDict, defaultdict, deque
from functools import cache
import toml
import json
import subprocess
from select import PIPE_BUF
import select
import time
import math
from math import floor
TIME_PATTERNS = OrderedDict(
[
("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")),
("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")),
("5m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 5) * 5)),
("15m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 15) * 15)),
("hourly", lambda t: t.strftime("%Y-%m-%d %H")),
("4h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 4) * 4)),
("12h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 12) * 12)),
("daily", lambda t: t.strftime("%Y-%m-%d")),
(
"halfweekly",
lambda t: (t.strftime("%G-%V"), floor(int(t.strftime("%u")) / 4) * 4),
),
("weekly", lambda t: t.strftime("%G-%V")),
("monthly", lambda t: t.strftime("%Y-%m")),
("yearly", lambda t: t.strftime("%Y")),
]
)
BACKUP_PATTERN = re.compile(r"^(?P<ts>[0-9]+)_.*_gitlab_backup\.tar$")
@dataclass(eq=True, order=True, frozen=True)
class GitlabBackup:
creation: datetime
filename: str
version_id: str
@dataclass(eq=True, order=True, frozen=True)
class KeptBecause:
rule: str
ix: int
period: str
def convert_timedelta(secs_str):
secs = pytimeparse.parse(secs_str)
if secs is None:
raise ValueError("Could not parse timedelta expression %s", secs_str)
return timedelta(seconds=secs)
def as_borg():
global logger, borg_pwd
try:
os.setgid(borg_pwd.pw_gid)
os.setuid(borg_pwd.pw_uid)
except Exception:
logger.error(format_exc())
raise
def _archive_name(backup, target, archive_prefix):
return f"{target}::{_archive_basename(backup, archive_prefix)}"
def _archive_basename(backup, archive_prefix):
creation_time = backup.creation.astimezone(timezone.utc)
return f'{archive_prefix}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'
def main():
global logger
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter("[%(levelname)s](%(name)s): %(message)s")
)
if sys.stderr.isatty():
console_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s](%(name)s): %(message)s")
)
burst_max = 10000
burst = burst_max
last_use = None
inv_rate = 1e6
def consume_filter(record):
nonlocal burst, burst_max, inv_rate, last_use
delay = None
while True:
now = time.monotonic_ns()
burst = (
min(burst_max, burst + math.floor((now - last_use) / inv_rate))
if last_use
else burst_max
)
last_use = now
if burst > 0:
burst -= 1
if delay:
delay = now - delay
return True
if delay is None:
delay = now
time.sleep(inv_rate / 1e9)
console_handler.addFilter(consume_filter)
logger.addHandler(console_handler)
# log uncaught exceptions
def log_exceptions(type, value, tb):
global logger
logger.error(value)
sys.__excepthook__(type, value, tb) # calls default excepthook
sys.excepthook = log_exceptions
parser = argparse.ArgumentParser(
prog="k8s-gitlab-borg", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--verbosity",
dest="log_level",
action="append",
type=int,
help="Set verbosity to specific level",
)
parser.add_argument(
"--verbose",
"-v",
dest="log_level",
action="append_const",
const=1,
help="Increase verbosity by one step",
)
parser.add_argument(
"--quiet",
"-q",
dest="log_level",
action="append_const",
const=-1,
help="Decrease verbosity by one step",
)
parser.add_argument(
"--dry-run", "-n", action="store_true", help="Don't actually cause any changes"
)
parser.add_argument(
"--k8s-config",
type=Path,
metavar="FILE",
default=Path(environ["CREDENTIALS_DIRECTORY"]) / "k8s.yaml"
if "CREDENTIALS_DIRECTORY" in environ
else k8s.config.KUBE_CONFIG_DEFAULT_LOCATION,
help="Path to YAML file encoding k8s credentials",
)
parser.add_argument(
"--namespace",
type=str,
metavar="STR",
default="gitlab",
help="K8s namespace containing gitlab deployment(s)",
)
parser.add_argument("--target", metavar="REPO", help="Borg repository")
parser.add_argument(
"--archive-prefix",
metavar="STR",
default="gitlab",
help="Prefix for borg archive names",
)
parser.add_argument("config", type=Path, metavar="FILE", help="Configuration file")
args = parser.parse_args()
LOG_LEVELS = [
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
logging.CRITICAL,
]
DEFAULT_LOG_LEVEL = logging.ERROR
log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL)
for adjustment in args.log_level or ():
log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0))
logger.setLevel(LOG_LEVELS[log_level])
global borg_pwd
borg_pwd = getpwnam("borg")
config = toml.load(args.config)
logger.debug("K8s config file: %s", args.k8s_config)
with k8s.config.new_client_from_config(str(args.k8s_config)) as client:
core_api = k8s.client.CoreV1Api(client)
services = core_api.list_namespaced_service(namespace=args.namespace).items
def is_minio(item):
selector = item.spec.selector
if "app" not in selector:
return False
if selector["app"] != "minio":
return False
if "release" not in selector:
return False
if selector["release"] != "gitlab":
return False
return True
[minio_svc] = list(filter(is_minio, services))
secrets = core_api.list_namespaced_secret(namespace=args.namespace).items
def is_minio(item):
return item.metadata.name == "gitlab-minio-secret"
[minio_secret] = list(filter(is_minio, secrets))
minio_host = minio_svc.spec.cluster_ip
[minio_port] = minio_svc.spec.ports
minio_port = minio_port.port
logger.debug("Determined minio host: %s:%d", minio_host, minio_port)
minio_accesskey = b64decode(minio_secret.data["accesskey"]).decode("utf-8")
minio_secretkey = b64decode(minio_secret.data["secretkey"]).decode("utf-8")
minio = Minio(
f"{minio_host}:{minio_port:d}",
secure=False,
access_key=minio_accesskey,
secret_key=minio_secretkey,
)
logger.info("Loading backup list from MinIO...")
backups = set()
for obj in minio.list_objects("gitlab-backups"):
match = BACKUP_PATTERN.match(obj.object_name)
if not match:
logger.warn("Could not match object name %s", obj.object_name)
continue
creation_time = datetime.fromtimestamp(int(match.group("ts")))
backup = GitlabBackup(
filename=obj.object_name,
creation=creation_time,
version_id=obj.version_id,
)
logger.debug(backup)
backups.add(backup)
logger.info("Loading archive list from borg...")
archives = set()
with subprocess.Popen(
["borg", "list", "--info", "--lock-wait=600", "--json", args.target],
stdout=subprocess.PIPE,
preexec_fn=as_borg,
) as proc:
for archive in json.load(proc.stdout)["archives"]:
# logger.debug('Archive: %s', archive["barchive"])
archives.add(archive["barchive"])
kept_count = defaultdict(lambda: 0)
kept_because = OrderedDict()
def keep_because(backup, rule, period=None):
nonlocal kept_count, kept_because
kept_count[rule] += 1
if backup not in kept_because:
kept_because[backup] = deque()
kept_because[backup].append(
KeptBecause(rule=rule, ix=kept_count[rule], period=period)
)
pattern_timezone = config.get("k8s-gitlab-borg", {}).get("timezone", None)
if pattern_timezone:
pattern_timezone = gettz(pattern_timezone)
else:
pattern_timezone = UTC
logger.debug("Rule timezone: %s", pattern_timezone)
copy_candidates = set()
for rule, pattern in TIME_PATTERNS.items():
desired_count = config.get("copy", {}).get(rule, {}).get("count", 0)
periods = OrderedDict()
for backup in sorted(backups):
period = pattern(backup.creation.astimezone(pattern_timezone))
if period not in periods:
periods[period] = deque()
periods[period].append(backup)
to_exec = desired_count
ordered_periods = reversed(periods.items())
for period, period_backups in ordered_periods:
if to_exec == 0:
break
for backup in period_backups:
copy_candidates.add(backup)
logger.debug(
"%s (%s) is candidate for copying",
backup.filename,
backup.creation,
)
to_exec -= 1
break
if to_exec > 0:
logger.debug(
"Missing %d to fulfill copy %s=%d", to_exec, rule, desired_count
)
already_copied = set()
for backup in copy_candidates:
archive = _archive_basename(backup, args.archive_prefix)
if archive in archives:
already_copied.add(backup)
logger.debug(
"%s (%s, %s) already copied",
backup.filename,
backup.creation,
archive,
)
else:
logger.debug(
"%s (%s, %s) to copy", backup.filename, backup.creation, archive
)
copy_candidates -= already_copied
copyCount = config.get("copy", {}).get("count", 1)
copied = set()
for backup in sorted(copy_candidates, reverse=True):
if copyCount > 0 and len(copied) >= copyCount:
logger.debug("copyCount of %d reached", copyCount)
break
logger.info("Copying %s (%s)...", backup.filename, backup.creation)
env = environ.copy()
import_args = [
"borg",
"import-tar",
"--lock-wait=600",
"--compression=auto,zstd,10",
"--chunker-params=10,23,16,4095",
"--show-rc",
"--upload-buffer=100",
"--progress",
"--list",
"--filter=AMEi-x?",
"--stats" if not args.dry_run else "--dry-run",
]
creation_time = backup.creation.astimezone(timezone.utc)
import_args += [
f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}'
]
archive_name = _archive_name(backup, args.target, args.archive_prefix)
import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env})
def download_iter():
try:
offset = 0
retries = 10
download = None
while True:
logger.info(
"Downloading %s (%s, %s)...",
backup.filename,
backup.creation,
backup.version_id,
)
if download:
download.close()
download.release_conn()
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
offset=offset,
)
download_stream = download.stream(amt=1024 * 1024)
try:
while chunk := next(download_stream, b""):
# logger.debug("Read chunk of length %d", len(chunk))
offset += len(chunk)
retries = 10
yield chunk
else:
break
except IncompleteRead as e:
logger.warn(
"IncompleteRead, retries=%d, offset=%d",
retries,
offset,
exc_info=True,
)
if retries <= 0:
logger.error("Max retries exceeded")
raise e
retries -= 1
sleep(uniform(0, 10))
finally:
download.close()
download.release_conn()
download_stream = download_iter()
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
stdin_fd = proc.stdin.fileno()
stdout_fd = proc.stdout.fileno()
stderr_fd = proc.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)
poll = select.poll()
poll.register(
proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
)
poll.register(
proc.stdout,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
poll.register(
proc.stderr,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
pollc = 2
# logger.debug("First poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
pipe_buffer = b""
stdout_line_buffer = bytearray(b"")
stderr_line_buffer = bytearray(b"")
newline_trans = bytearray.maketrans(b"\r", b"\n")
while pollc > 0 and len(events) > 0:
for rfd, event in events:
# logger.debug("rfd=%d, event=%x", rfd, event)
if event & select.POLLOUT:
if rfd == stdin_fd:
if chunk := pipe_buffer[:PIPE_BUF]:
# logger.debug(
# "Writing chunk of length %d...", len(chunk)
# )
proc.stdin.buffer.write(chunk)
# logger.debug("Done")
pipe_buffer = pipe_buffer[PIPE_BUF:]
elif pipe_buffer := next(download_stream, b""):
# logger.debug(
# "Accepted chunk of length %d",
# len(pipe_buffer),
# )
pass
else:
proc.stdin.close()
if event & select.POLLIN or event & select.POLLPRI:
if rfd == stdout_fd:
# logger.debug("Reading from stdout...")
if chunk := proc.stdout.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stdout_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stdout_line_buffer)
# )
while True:
line, sep, rest = stdout_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stdout_logger.info(
line.decode(errors="replace").rstrip()
)
stdout_line_buffer = rest
else:
break
if rfd == stderr_fd:
# logger.debug("Reading from stderr...")
if chunk := proc.stderr.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stderr_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stderr_line_buffer)
# )
while True:
line, sep, rest = stderr_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stderr_logger.info(
line.decode(errors="replace").rstrip()
)
stderr_line_buffer = rest
else:
break
if event & select.POLLERR:
if rfd == stdin_fd:
logger.error("STDIN error")
if rfd == stdout_fd:
logger.error("STDOUT error")
if rfd == stderr_fd:
logger.error("STDERR error")
if event & select.POLLHUP:
if rfd == stdin_fd:
logger.debug("STDIN closed")
if rfd == stdout_fd:
logger.debug("STDOUT closed")
pollc -= 1
if rfd == stderr_fd:
logger.debug("STDERR closed")
pollc -= 1
poll.unregister(rfd)
if pollc > 0:
# logger.debug("Poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
else:
# logger.debug("Nothing left to poll")
pass
if stdout_line_buffer:
for line in stdout_line_buffer.split(b"\n"):
stdout_logger.info(line)
if stderr_line_buffer:
for line in stderr_line_buffer.split(b"\n"):
stderr_logger.info(line)
for handler in proc_logger.handlers:
handler.flush()
# logger.debug("Waiting on subprocess...")
ret = proc.wait()
# logger.debug("Done")
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
copied.add(backup)
copy_candidates -= copied
for candidate in copy_candidates:
keep_because(candidate, "copy-candidate")
within = convert_timedelta(config.get("keep", {}).get("within", "0s"))
logger.debug("Keep within: %s", within)
if within > timedelta(seconds=0):
time_ref = max(backups, key=lambda backup: backup.creation, default=None)
if not time_ref:
logger.warn("Nothing to keep")
else:
logger.info(
"Using %s (%s) as time reference",
time_ref.filename,
time_ref.creation,
)
within_cutoff = time_ref.creation - within
for backup in sorted(backups, reverse=True):
if backup.creation >= within_cutoff:
keep_because(backup, "within")
else:
logger.warn("Skipping rule within since retention period is zero")
for rule, pattern in TIME_PATTERNS.items():
desired_count = config.get("keep", {}).get(rule, {}).get("count", 0)
periods = OrderedDict()
for backup in sorted(backups):
period = pattern(backup.creation.astimezone(pattern_timezone))
if period not in periods:
periods[period] = deque()
periods[period].append(backup)
to_keep = desired_count
ordered_periods = reversed(periods.items())
for period, period_backups in ordered_periods:
if to_keep == 0:
break
for backup in period_backups:
keep_because(backup, rule, period=period)
to_keep -= 1
break
if to_keep > 0:
logger.debug(
"Missing %d to fulfill keep %s=%d", to_keep, rule, desired_count
)
for backup, reasons in kept_because.items():
logger.info(
"Keeping %s (%s) because: %s",
backup.filename,
backup.creation,
", ".join(map(str, reasons)),
)
to_destroy = backups - {*kept_because}
if not to_destroy:
logger.info("Nothing to prune")
else:
for backup in sorted(to_destroy):
if args.dry_run:
logger.info(
"Would have pruned %s (%s)", backup.filename, backup.creation
)
else:
minio.remove_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
)
logger.info("Pruned %s (%s)", backup.filename, backup.creation)
if __name__ == "__main__":
sys.exit(main())