Compare commits

...

20 Commits
v3.1.0 ... main

View File

@ -18,6 +18,7 @@ from base64 import b64decode
from minio import Minio from minio import Minio
from http.client import IncompleteRead from http.client import IncompleteRead
from time import sleep from time import sleep
from random import uniform
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from dateutil.tz import gettz, UTC from dateutil.tz import gettz, UTC
@ -36,6 +37,7 @@ import json
import subprocess import subprocess
from select import PIPE_BUF
import select import select
import time import time
import math import math
@ -402,84 +404,200 @@ def main():
import_args += [archive_name, "-"] import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env}) logger.debug("%s", {"import_args": import_args, "env": env})
try: def download_iter():
try:
def download_iter():
offset = 0 offset = 0
retries = 10 retries = 10
download = None
while True: while True:
logger.info(
"Downloading %s (%s, %s)...",
backup.filename,
backup.creation,
backup.version_id,
)
if download:
download.close()
download.release_conn()
download = minio.get_object( download = minio.get_object(
bucket_name="gitlab-backups", bucket_name="gitlab-backups",
object_name=backup.filename, object_name=backup.filename,
version_id=backup.version_id, version_id=backup.version_id,
offset=offset, offset=offset,
) )
download_stream = download.stream(decode_content=True) download_stream = download.stream(amt=1024 * 1024)
try: try:
while chunk := next(download_stream, b""): while chunk := next(download_stream, b""):
# logger.debug("Read chunk of length %d", len(chunk))
offset += len(chunk) offset += len(chunk)
retries = 10 retries = 10
yield chunk yield chunk
else: else:
break break
except IncompleteRead as e: except IncompleteRead as e:
logger.warn(
"IncompleteRead, retries=%d, offset=%d",
retries,
offset,
exc_info=True,
)
if retries <= 0: if retries <= 0:
logger.error("Max retries exceeded")
raise e raise e
retries -= 1 retries -= 1
sleep(10) sleep(uniform(0, 10))
finally:
download.close()
download.release_conn()
download_stream = download_iter() download_stream = download_iter()
with subprocess.Popen( with subprocess.Popen(
import_args, import_args,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
env=env, env=env,
preexec_fn=lambda: as_borg(), preexec_fn=lambda: as_borg(),
text=True, text=True,
) as proc: ) as proc:
proc_logger = logger.getChild("borg") proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout") stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr") stderr_logger = proc_logger.getChild("stderr")
poll = select.poll() stdin_fd = proc.stdin.fileno()
poll.register(proc.stdin, select.POLLOUT | select.POLLHUP) stdout_fd = proc.stdout.fileno()
poll.register(proc.stdout, select.POLLIN | select.POLLHUP) stderr_fd = proc.stderr.fileno()
poll.register(proc.stderr, select.POLLIN | select.POLLHUP) os.set_blocking(stdin_fd, False)
pollc = 2 os.set_blocking(stdout_fd, False)
events = poll.poll() os.set_blocking(stderr_fd, False)
while pollc > 0 and len(events) > 0:
for rfd, event in events: poll = select.poll()
if event & select.POLLOUT: poll.register(
if rfd == proc.stdin.fileno(): proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
if chunk := next(download_stream, b""): )
proc.stdin.buffer.write(chunk) poll.register(
proc.stdout,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
poll.register(
proc.stderr,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
pollc = 2
# logger.debug("First poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
pipe_buffer = b""
stdout_line_buffer = bytearray(b"")
stderr_line_buffer = bytearray(b"")
newline_trans = bytearray.maketrans(b"\r", b"\n")
while pollc > 0 and len(events) > 0:
for rfd, event in events:
# logger.debug("rfd=%d, event=%x", rfd, event)
if event & select.POLLOUT:
if rfd == stdin_fd:
if chunk := pipe_buffer[:PIPE_BUF]:
# logger.debug(
# "Writing chunk of length %d...", len(chunk)
# )
proc.stdin.buffer.write(chunk)
# logger.debug("Done")
pipe_buffer = pipe_buffer[PIPE_BUF:]
elif pipe_buffer := next(download_stream, b""):
# logger.debug(
# "Accepted chunk of length %d",
# len(pipe_buffer),
# )
pass
else:
proc.stdin.close()
if event & select.POLLIN or event & select.POLLPRI:
if rfd == stdout_fd:
# logger.debug("Reading from stdout...")
if chunk := proc.stdout.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stdout_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stdout_line_buffer)
# )
while True:
line, sep, rest = stdout_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stdout_logger.info(
line.decode(errors="replace").rstrip()
)
stdout_line_buffer = rest
else: else:
proc.stdin.close() break
if event & select.POLLIN: if rfd == stderr_fd:
if rfd == proc.stdout.fileno(): # logger.debug("Reading from stderr...")
if line := proc.stdout.readline(): if chunk := proc.stderr.buffer.read(PIPE_BUF):
stdout_logger.info(line[:-1]) # logger.debug("Done, length %d", len(chunk))
if rfd == proc.stderr.fileno(): stderr_line_buffer.extend(
if line := proc.stderr.readline(): chunk.translate(newline_trans)
stderr_logger.info(line[:-1]) )
if event & select.POLLHUP: # logger.debug(
poll.unregister(rfd) # "Buffer at length %d", len(stderr_line_buffer)
# )
while True:
line, sep, rest = stderr_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stderr_logger.info(
line.decode(errors="replace").rstrip()
)
stderr_line_buffer = rest
else:
break
if event & select.POLLERR:
if rfd == stdin_fd:
logger.error("STDIN error")
if rfd == stdout_fd:
logger.error("STDOUT error")
if rfd == stderr_fd:
logger.error("STDERR error")
if event & select.POLLHUP:
if rfd == stdin_fd:
logger.debug("STDIN closed")
if rfd == stdout_fd:
logger.debug("STDOUT closed")
pollc -= 1 pollc -= 1
if rfd == stderr_fd:
logger.debug("STDERR closed")
pollc -= 1
poll.unregister(rfd)
if pollc > 0: if pollc > 0:
events = poll.poll() # logger.debug("Poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
else:
# logger.debug("Nothing left to poll")
pass
for handler in proc_logger.handlers: if stdout_line_buffer:
handler.flush() for line in stdout_line_buffer.split(b"\n"):
stdout_logger.info(line)
if stderr_line_buffer:
for line in stderr_line_buffer.split(b"\n"):
stderr_logger.info(line)
ret = proc.wait() for handler in proc_logger.handlers:
if ret != 0: handler.flush()
raise Exception(f"borg subprocess exited with returncode {ret}")
finally: # logger.debug("Waiting on subprocess...")
download.close() ret = proc.wait()
download.release_conn() # logger.debug("Done")
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
copied.add(backup) copied.add(backup)