summaryrefslogtreecommitdiffstats
path: root/src/bin/pg_rewind/local_source.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:46:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-16 19:46:48 +0000
commit311bcfc6b3acdd6fd152798c7f287ddf74fa2a98 (patch)
tree0ec307299b1dada3701e42f4ca6eda57d708261e /src/bin/pg_rewind/local_source.c
parentInitial commit. (diff)
downloadpostgresql-15-311bcfc6b3acdd6fd152798c7f287ddf74fa2a98.tar.xz
postgresql-15-311bcfc6b3acdd6fd152798c7f287ddf74fa2a98.zip
Adding upstream version 15.4.upstream/15.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/bin/pg_rewind/local_source.c')
-rw-r--r--src/bin/pg_rewind/local_source.c187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c
new file mode 100644
index 0000000..2e50485
--- /dev/null
+++ b/src/bin/pg_rewind/local_source.c
@@ -0,0 +1,187 @@
+/*-------------------------------------------------------------------------
+ *
+ * local_source.c
+ * Functions for using a local data directory as the source.
+ *
+ * Portions Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "datapagemap.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "pg_rewind.h"
+#include "rewind_source.h"
+
+typedef struct
+{
+ rewind_source common; /* common interface functions */
+
+ const char *datadir; /* path to the source data directory */
+} local_source;
+
+static void local_traverse_files(rewind_source *source,
+ process_file_callback_t callback);
+static char *local_fetch_file(rewind_source *source, const char *path,
+ size_t *filesize);
+static void local_queue_fetch_file(rewind_source *source, const char *path,
+ size_t len);
+static void local_queue_fetch_range(rewind_source *source, const char *path,
+ off_t off, size_t len);
+static void local_finish_fetch(rewind_source *source);
+static void local_destroy(rewind_source *source);
+
+rewind_source *
+init_local_source(const char *datadir)
+{
+ local_source *src;
+
+ src = pg_malloc0(sizeof(local_source));
+
+ src->common.traverse_files = local_traverse_files;
+ src->common.fetch_file = local_fetch_file;
+ src->common.queue_fetch_file = local_queue_fetch_file;
+ src->common.queue_fetch_range = local_queue_fetch_range;
+ src->common.finish_fetch = local_finish_fetch;
+ src->common.get_current_wal_insert_lsn = NULL;
+ src->common.destroy = local_destroy;
+
+ src->datadir = datadir;
+
+ return &src->common;
+}
+
+static void
+local_traverse_files(rewind_source *source, process_file_callback_t callback)
+{
+ traverse_datadir(((local_source *) source)->datadir, &process_source_file);
+}
+
+static char *
+local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
+{
+ return slurpFile(((local_source *) source)->datadir, path, filesize);
+}
+
+/*
+ * Copy a file from source to target.
+ *
+ * 'len' is the expected length of the file.
+ */
+static void
+local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
+{
+ const char *datadir = ((local_source *) source)->datadir;
+ PGAlignedBlock buf;
+ char srcpath[MAXPGPATH];
+ int srcfd;
+ size_t written_len;
+
+ snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
+
+ /* Open source file for reading */
+ srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+ if (srcfd < 0)
+ pg_fatal("could not open source file \"%s\": %m",
+ srcpath);
+
+ /* Truncate and open the target file for writing */
+ open_target_file(path, true);
+
+ written_len = 0;
+ for (;;)
+ {
+ ssize_t read_len;
+
+ read_len = read(srcfd, buf.data, sizeof(buf));
+
+ if (read_len < 0)
+ pg_fatal("could not read file \"%s\": %m", srcpath);
+ else if (read_len == 0)
+ break; /* EOF reached */
+
+ write_target_range(buf.data, written_len, read_len);
+ written_len += read_len;
+ }
+
+ /*
+ * A local source is not expected to change while we're rewinding, so
+ * check that the size of the file matches our earlier expectation.
+ */
+ if (written_len != len)
+ pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
+ srcpath, (int) len, (int) written_len);
+
+ if (close(srcfd) != 0)
+ pg_fatal("could not close file \"%s\": %m", srcpath);
+}
+
+/*
+ * Copy a file from source to target, starting at 'off', for 'len' bytes.
+ */
+static void
+local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
+ size_t len)
+{
+ const char *datadir = ((local_source *) source)->datadir;
+ PGAlignedBlock buf;
+ char srcpath[MAXPGPATH];
+ int srcfd;
+ off_t begin = off;
+ off_t end = off + len;
+
+ snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
+
+ srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+ if (srcfd < 0)
+ pg_fatal("could not open source file \"%s\": %m",
+ srcpath);
+
+ if (lseek(srcfd, begin, SEEK_SET) == -1)
+ pg_fatal("could not seek in source file: %m");
+
+ open_target_file(path, false);
+
+ while (end - begin > 0)
+ {
+ ssize_t readlen;
+ size_t thislen;
+
+ if (end - begin > sizeof(buf))
+ thislen = sizeof(buf);
+ else
+ thislen = end - begin;
+
+ readlen = read(srcfd, buf.data, thislen);
+
+ if (readlen < 0)
+ pg_fatal("could not read file \"%s\": %m", srcpath);
+ else if (readlen == 0)
+ pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
+
+ write_target_range(buf.data, begin, readlen);
+ begin += readlen;
+ }
+
+ if (close(srcfd) != 0)
+ pg_fatal("could not close file \"%s\": %m", srcpath);
+}
+
+static void
+local_finish_fetch(rewind_source *source)
+{
+ /*
+ * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
+ */
+}
+
+static void
+local_destroy(rewind_source *source)
+{
+ pfree(source);
+}