diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
commit | 311bcfc6b3acdd6fd152798c7f287ddf74fa2a98 (patch) | |
tree | 0ec307299b1dada3701e42f4ca6eda57d708261e /src/backend/replication/logical/origin.c | |
parent | Initial commit. (diff) | |
download | postgresql-15-upstream.tar.xz postgresql-15-upstream.zip |
Adding upstream version 15.4.upstream/15.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/origin.c')
-rw-r--r-- | src/backend/replication/logical/origin.c | 1549 |
1 files changed, 1549 insertions, 0 deletions
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c new file mode 100644 index 0000000..cd86007 --- /dev/null +++ b/src/backend/replication/logical/origin.c @@ -0,0 +1,1549 @@ +/*------------------------------------------------------------------------- + * + * origin.c + * Logical replication progress tracking support. + * + * Copyright (c) 2013-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/origin.c + * + * NOTES + * + * This file provides the following: + * * An infrastructure to name nodes in a replication setup + * * A facility to efficiently store and persist replication progress in an + * efficient and durable manner. + * + * Replication origin consist out of a descriptive, user defined, external + * name and a short, thus space efficient, internal 2 byte one. This split + * exists because replication origin have to be stored in WAL and shared + * memory and long descriptors would be inefficient. For now only use 2 bytes + * for the internal id of a replication origin as it seems unlikely that there + * soon will be more than 65k nodes in one replication setup; and using only + * two bytes allow us to be more space efficient. + * + * Replication progress is tracked in a shared memory table + * (ReplicationState) that's dumped to disk every checkpoint. Entries + * ('slots') in this table are identified by the internal id. That's the case + * because it allows to increase replication progress during crash + * recovery. To allow doing so we store the original LSN (from the originating + * system) of a transaction in the commit record. That allows to recover the + * precise replayed state after crash recovery; without requiring synchronous + * commits. Allowing logical replication to use asynchronous commit is + * generally good for performance, but especially important as it allows a + * single threaded replay process to keep up with a source that has multiple + * backends generating changes concurrently. For efficiency and simplicity + * reasons a backend can setup one replication origin that's from then used as + * the source of changes produced by the backend, until reset again. + * + * This infrastructure is intended to be used in cooperation with logical + * decoding. When replaying from a remote system the configured origin is + * provided to output plugins, allowing prevention of replication loops and + * other filtering. + * + * There are several levels of locking at work: + * + * * To create and drop replication origins an exclusive lock on + * pg_replication_slot is required for the duration. That allows us to + * safely and conflict free assign new origins using a dirty snapshot. + * + * * When creating an in-memory replication progress slot the ReplicationOrigin + * LWLock has to be held exclusively; when iterating over the replication + * progress a shared lock has to be held, the same when advancing the + * replication progress of an individual backend that has not setup as the + * session's replication origin. + * + * * When manipulating or looking at the remote_lsn and local_lsn fields of a + * replication progress slot that slot's lwlock has to be held. That's + * primarily because we do not assume 8 byte writes (the LSN) is atomic on + * all our platforms, but it also simplifies memory ordering concerns + * between the remote and local lsn. We use a lwlock instead of a spinlock + * so it's less harmful to hold the lock over a WAL write + * (cf. AdvanceReplicationProgress). + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <unistd.h> +#include <sys/stat.h> + +#include "access/genam.h" +#include "access/htup_details.h" +#include "access/table.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "pgstat.h" +#include "replication/logical.h" +#include "replication/origin.h" +#include "storage/condition_variable.h" +#include "storage/copydir.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/pg_lsn.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + +/* + * Replay progress of a single remote node. + */ +typedef struct ReplicationState +{ + /* + * Local identifier for the remote node. + */ + RepOriginId roident; + + /* + * Location of the latest commit from the remote side. + */ + XLogRecPtr remote_lsn; + + /* + * Remember the local lsn of the commit record so we can XLogFlush() to it + * during a checkpoint so we know the commit record actually is safe on + * disk. + */ + XLogRecPtr local_lsn; + + /* + * PID of backend that's acquired slot, or 0 if none. + */ + int acquired_by; + + /* + * Condition variable that's signaled when acquired_by changes. + */ + ConditionVariable origin_cv; + + /* + * Lock protecting remote_lsn and local_lsn. + */ + LWLock lock; +} ReplicationState; + +/* + * On disk version of ReplicationState. + */ +typedef struct ReplicationStateOnDisk +{ + RepOriginId roident; + XLogRecPtr remote_lsn; +} ReplicationStateOnDisk; + + +typedef struct ReplicationStateCtl +{ + /* Tranche to use for per-origin LWLocks */ + int tranche_id; + /* Array of length max_replication_slots */ + ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; +} ReplicationStateCtl; + +/* external variables */ +RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ +XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; +TimestampTz replorigin_session_origin_timestamp = 0; + +/* + * Base address into a shared memory array of replication states of size + * max_replication_slots. + * + * XXX: Should we use a separate variable to size this rather than + * max_replication_slots? + */ +static ReplicationState *replication_states; + +/* + * Actual shared memory block (replication_states[] is now part of this). + */ +static ReplicationStateCtl *replication_states_ctl; + +/* + * Backend-local, cached element from ReplicationState for use in a backend + * replaying remote commits, so we don't have to search ReplicationState for + * the backends current RepOriginId. + */ +static ReplicationState *session_replication_state = NULL; + +/* Magic for on disk files. */ +#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE) + +static void +replorigin_check_prerequisites(bool check_slots, bool recoveryOK) +{ + if (check_slots && max_replication_slots == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot query or manipulate replication origin when max_replication_slots = 0"))); + + if (!recoveryOK && RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("cannot manipulate replication origins during recovery"))); +} + + +/* --------------------------------------------------------------------------- + * Functions for working with replication origins themselves. + * --------------------------------------------------------------------------- + */ + +/* + * Check for a persistent replication origin identified by name. + * + * Returns InvalidOid if the node isn't known yet and missing_ok is true. + */ +RepOriginId +replorigin_by_name(const char *roname, bool missing_ok) +{ + Form_pg_replication_origin ident; + Oid roident = InvalidOid; + HeapTuple tuple; + Datum roname_d; + + roname_d = CStringGetTextDatum(roname); + + tuple = SearchSysCache1(REPLORIGNAME, roname_d); + if (HeapTupleIsValid(tuple)) + { + ident = (Form_pg_replication_origin) GETSTRUCT(tuple); + roident = ident->roident; + ReleaseSysCache(tuple); + } + else if (!missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication origin \"%s\" does not exist", + roname))); + + return roident; +} + +/* + * Create a replication origin. + * + * Needs to be called in a transaction. + */ +RepOriginId +replorigin_create(const char *roname) +{ + Oid roident; + HeapTuple tuple = NULL; + Relation rel; + Datum roname_d; + SnapshotData SnapshotDirty; + SysScanDesc scan; + ScanKeyData key; + + roname_d = CStringGetTextDatum(roname); + + Assert(IsTransactionState()); + + /* + * We need the numeric replication origin to be 16bit wide, so we cannot + * rely on the normal oid allocation. Instead we simply scan + * pg_replication_origin for the first unused id. That's not particularly + * efficient, but this should be a fairly infrequent operation - we can + * easily spend a bit more code on this when it turns out it needs to be + * faster. + * + * We handle concurrency by taking an exclusive lock (allowing reads!) + * over the table for the duration of the search. Because we use a "dirty + * snapshot" we can read rows that other in-progress sessions have + * written, even though they would be invisible with normal snapshots. Due + * to the exclusive lock there's no danger that new rows can appear while + * we're checking. + */ + InitDirtySnapshot(SnapshotDirty); + + rel = table_open(ReplicationOriginRelationId, ExclusiveLock); + + for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++) + { + bool nulls[Natts_pg_replication_origin]; + Datum values[Natts_pg_replication_origin]; + bool collides; + + CHECK_FOR_INTERRUPTS(); + + ScanKeyInit(&key, + Anum_pg_replication_origin_roident, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(roident)); + + scan = systable_beginscan(rel, ReplicationOriginIdentIndex, + true /* indexOK */ , + &SnapshotDirty, + 1, &key); + + collides = HeapTupleIsValid(systable_getnext(scan)); + + systable_endscan(scan); + + if (!collides) + { + /* + * Ok, found an unused roident, insert the new row and do a CCI, + * so our callers can look it up if they want to. + */ + memset(&nulls, 0, sizeof(nulls)); + + values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident); + values[Anum_pg_replication_origin_roname - 1] = roname_d; + + tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); + CatalogTupleInsert(rel, tuple); + CommandCounterIncrement(); + break; + } + } + + /* now release lock again, */ + table_close(rel, ExclusiveLock); + + if (tuple == NULL) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("could not find free replication origin ID"))); + + heap_freetuple(tuple); + return roident; +} + +/* + * Helper function to drop a replication origin. + */ +static void +replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait) +{ + HeapTuple tuple; + int i; + + /* + * First, clean up the slot state info, if there is any matching slot. + */ +restart: + tuple = NULL; + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *state = &replication_states[i]; + + if (state->roident == roident) + { + /* found our slot, is it busy? */ + if (state->acquired_by != 0) + { + ConditionVariable *cv; + + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not drop replication origin with ID %d, in use by PID %d", + state->roident, + state->acquired_by))); + + /* + * We must wait and then retry. Since we don't know which CV + * to wait on until here, we can't readily use + * ConditionVariablePrepareToSleep (calling it here would be + * wrong, since we could miss the signal if we did so); just + * use ConditionVariableSleep directly. + */ + cv = &state->origin_cv; + + LWLockRelease(ReplicationOriginLock); + + ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); + goto restart; + } + + /* first make a WAL log entry */ + { + xl_replorigin_drop xlrec; + + xlrec.node_id = roident; + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); + XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP); + } + + /* then clear the in-memory slot */ + state->roident = InvalidRepOriginId; + state->remote_lsn = InvalidXLogRecPtr; + state->local_lsn = InvalidXLogRecPtr; + break; + } + } + LWLockRelease(ReplicationOriginLock); + ConditionVariableCancelSleep(); + + /* + * Now, we can delete the catalog entry. + */ + tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for replication origin with ID %d", + roident); + + CatalogTupleDelete(rel, &tuple->t_self); + ReleaseSysCache(tuple); + + CommandCounterIncrement(); +} + +/* + * Drop replication origin (by name). + * + * Needs to be called in a transaction. + */ +void +replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) +{ + RepOriginId roident; + Relation rel; + + Assert(IsTransactionState()); + + /* + * To interlock against concurrent drops, we hold ExclusiveLock on + * pg_replication_origin till xact commit. + * + * XXX We can optimize this by acquiring the lock on a specific origin by + * using LockSharedObject if required. However, for that, we first to + * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock + * the specific origin and then re-check if the origin still exists. + */ + rel = table_open(ReplicationOriginRelationId, ExclusiveLock); + + roident = replorigin_by_name(name, missing_ok); + + if (OidIsValid(roident)) + replorigin_drop_guts(rel, roident, nowait); + + /* We keep the lock on pg_replication_origin until commit */ + table_close(rel, NoLock); +} + +/* + * Lookup replication origin via its oid and return the name. + * + * The external name is palloc'd in the calling context. + * + * Returns true if the origin is known, false otherwise. + */ +bool +replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname) +{ + HeapTuple tuple; + Form_pg_replication_origin ric; + + Assert(OidIsValid((Oid) roident)); + Assert(roident != InvalidRepOriginId); + Assert(roident != DoNotReplicateId); + + tuple = SearchSysCache1(REPLORIGIDENT, + ObjectIdGetDatum((Oid) roident)); + + if (HeapTupleIsValid(tuple)) + { + ric = (Form_pg_replication_origin) GETSTRUCT(tuple); + *roname = text_to_cstring(&ric->roname); + ReleaseSysCache(tuple); + + return true; + } + else + { + *roname = NULL; + + if (!missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication origin with ID %d does not exist", + roident))); + + return false; + } +} + + +/* --------------------------------------------------------------------------- + * Functions for handling replication progress. + * --------------------------------------------------------------------------- + */ + +Size +ReplicationOriginShmemSize(void) +{ + Size size = 0; + + /* + * XXX: max_replication_slots is arguably the wrong thing to use, as here + * we keep the replay state of *remote* transactions. But for now it seems + * sufficient to reuse it, rather than introduce a separate GUC. + */ + if (max_replication_slots == 0) + return size; + + size = add_size(size, offsetof(ReplicationStateCtl, states)); + + size = add_size(size, + mul_size(max_replication_slots, sizeof(ReplicationState))); + return size; +} + +void +ReplicationOriginShmemInit(void) +{ + bool found; + + if (max_replication_slots == 0) + return; + + replication_states_ctl = (ReplicationStateCtl *) + ShmemInitStruct("ReplicationOriginState", + ReplicationOriginShmemSize(), + &found); + replication_states = replication_states_ctl->states; + + if (!found) + { + int i; + + MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize()); + + replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE; + + for (i = 0; i < max_replication_slots; i++) + { + LWLockInitialize(&replication_states[i].lock, + replication_states_ctl->tranche_id); + ConditionVariableInit(&replication_states[i].origin_cv); + } + } +} + +/* --------------------------------------------------------------------------- + * Perform a checkpoint of each replication origin's progress with respect to + * the replayed remote_lsn. Make sure that all transactions we refer to in the + * checkpoint (local_lsn) are actually on-disk. This might not yet be the case + * if the transactions were originally committed asynchronously. + * + * We store checkpoints in the following format: + * +-------+------------------------+------------------+-----+--------+ + * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF + * +-------+------------------------+------------------+-----+--------+ + * + * So its just the magic, followed by the statically sized + * ReplicationStateOnDisk structs. Note that the maximum number of + * ReplicationState is determined by max_replication_slots. + * --------------------------------------------------------------------------- + */ +void +CheckPointReplicationOrigin(void) +{ + const char *tmppath = "pg_logical/replorigin_checkpoint.tmp"; + const char *path = "pg_logical/replorigin_checkpoint"; + int tmpfd; + int i; + uint32 magic = REPLICATION_STATE_MAGIC; + pg_crc32c crc; + + if (max_replication_slots == 0) + return; + + INIT_CRC32C(crc); + + /* make sure no old temp file is remaining */ + if (unlink(tmppath) < 0 && errno != ENOENT) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", + tmppath))); + + /* + * no other backend can perform this at the same time; only one checkpoint + * can happen at a time. + */ + tmpfd = OpenTransientFile(tmppath, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); + if (tmpfd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + tmppath))); + + /* write magic */ + errno = 0; + if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic)) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + tmppath))); + } + COMP_CRC32C(crc, &magic, sizeof(magic)); + + /* prevent concurrent creations/drops */ + LWLockAcquire(ReplicationOriginLock, LW_SHARED); + + /* write actual data */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationStateOnDisk disk_state; + ReplicationState *curstate = &replication_states[i]; + XLogRecPtr local_lsn; + + if (curstate->roident == InvalidRepOriginId) + continue; + + /* zero, to avoid uninitialized padding bytes */ + memset(&disk_state, 0, sizeof(disk_state)); + + LWLockAcquire(&curstate->lock, LW_SHARED); + + disk_state.roident = curstate->roident; + + disk_state.remote_lsn = curstate->remote_lsn; + local_lsn = curstate->local_lsn; + + LWLockRelease(&curstate->lock); + + /* make sure we only write out a commit that's persistent */ + XLogFlush(local_lsn); + + errno = 0; + if ((write(tmpfd, &disk_state, sizeof(disk_state))) != + sizeof(disk_state)) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + tmppath))); + } + + COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); + } + + LWLockRelease(ReplicationOriginLock); + + /* write out the CRC */ + FIN_CRC32C(crc); + errno = 0; + if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc)) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + tmppath))); + } + + if (CloseTransientFile(tmpfd) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + tmppath))); + + /* fsync, rename to permanent file, fsync file and directory */ + durable_rename(tmppath, path, PANIC); +} + +/* + * Recover replication replay status from checkpoint data saved earlier by + * CheckPointReplicationOrigin. + * + * This only needs to be called at startup and *not* during every checkpoint + * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All + * state thereafter can be recovered by looking at commit records. + */ +void +StartupReplicationOrigin(void) +{ + const char *path = "pg_logical/replorigin_checkpoint"; + int fd; + int readBytes; + uint32 magic = REPLICATION_STATE_MAGIC; + int last_state = 0; + pg_crc32c file_crc; + pg_crc32c crc; + + /* don't want to overwrite already existing state */ +#ifdef USE_ASSERT_CHECKING + static bool already_started = false; + + Assert(!already_started); + already_started = true; +#endif + + if (max_replication_slots == 0) + return; + + INIT_CRC32C(crc); + + elog(DEBUG2, "starting up replication origin progress state"); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + + /* + * might have had max_replication_slots == 0 last run, or we just brought + * up a standby. + */ + if (fd < 0 && errno == ENOENT) + return; + else if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + + /* verify magic, that is written even if nothing was active */ + readBytes = read(fd, &magic, sizeof(magic)); + if (readBytes != sizeof(magic)) + { + if (readBytes < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + else + ereport(PANIC, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(magic)))); + } + COMP_CRC32C(crc, &magic, sizeof(magic)); + + if (magic != REPLICATION_STATE_MAGIC) + ereport(PANIC, + (errmsg("replication checkpoint has wrong magic %u instead of %u", + magic, REPLICATION_STATE_MAGIC))); + + /* we can skip locking here, no other access is possible */ + + /* recover individual states, until there are no more to be found */ + while (true) + { + ReplicationStateOnDisk disk_state; + + readBytes = read(fd, &disk_state, sizeof(disk_state)); + + /* no further data */ + if (readBytes == sizeof(crc)) + { + /* not pretty, but simple ... */ + file_crc = *(pg_crc32c *) &disk_state; + break; + } + + if (readBytes < 0) + { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + } + + if (readBytes != sizeof(disk_state)) + { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(disk_state)))); + } + + COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); + + if (last_state == max_replication_slots) + ereport(PANIC, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("could not find free replication state, increase max_replication_slots"))); + + /* copy data to shared memory */ + replication_states[last_state].roident = disk_state.roident; + replication_states[last_state].remote_lsn = disk_state.remote_lsn; + last_state++; + + ereport(LOG, + (errmsg("recovered replication state of node %d to %X/%X", + disk_state.roident, + LSN_FORMAT_ARGS(disk_state.remote_lsn)))); + } + + /* now check checksum */ + FIN_CRC32C(crc); + if (file_crc != crc) + ereport(PANIC, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("replication slot checkpoint has wrong checksum %u, expected %u", + crc, file_crc))); + + if (CloseTransientFile(fd) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + path))); +} + +void +replorigin_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_REPLORIGIN_SET: + { + xl_replorigin_set *xlrec = + (xl_replorigin_set *) XLogRecGetData(record); + + replorigin_advance(xlrec->node_id, + xlrec->remote_lsn, record->EndRecPtr, + xlrec->force /* backward */ , + false /* WAL log */ ); + break; + } + case XLOG_REPLORIGIN_DROP: + { + xl_replorigin_drop *xlrec; + int i; + + xlrec = (xl_replorigin_drop *) XLogRecGetData(record); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *state = &replication_states[i]; + + /* found our slot */ + if (state->roident == xlrec->node_id) + { + /* reset entry */ + state->roident = InvalidRepOriginId; + state->remote_lsn = InvalidXLogRecPtr; + state->local_lsn = InvalidXLogRecPtr; + break; + } + } + break; + } + default: + elog(PANIC, "replorigin_redo: unknown op code %u", info); + } +} + + +/* + * Tell the replication origin progress machinery that a commit from 'node' + * that originated at the LSN remote_commit on the remote node was replayed + * successfully and that we don't need to do so again. In combination with + * setting up replorigin_session_origin_lsn and replorigin_session_origin + * that ensures we won't lose knowledge about that after a crash if the + * transaction had a persistent effect (think of asynchronous commits). + * + * local_commit needs to be a local LSN of the commit so that we can make sure + * upon a checkpoint that enough WAL has been persisted to disk. + * + * Needs to be called with a RowExclusiveLock on pg_replication_origin, + * unless running in recovery. + */ +void +replorigin_advance(RepOriginId node, + XLogRecPtr remote_commit, XLogRecPtr local_commit, + bool go_backward, bool wal_log) +{ + int i; + ReplicationState *replication_state = NULL; + ReplicationState *free_state = NULL; + + Assert(node != InvalidRepOriginId); + + /* we don't track DoNotReplicateId */ + if (node == DoNotReplicateId) + return; + + /* + * XXX: For the case where this is called by WAL replay, it'd be more + * efficient to restore into a backend local hashtable and only dump into + * shmem after recovery is finished. Let's wait with implementing that + * till it's shown to be a measurable expense + */ + + /* Lock exclusively, as we may have to create a new table entry. */ + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + + /* + * Search for either an existing slot for the origin, or a free one we can + * use. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *curstate = &replication_states[i]; + + /* remember where to insert if necessary */ + if (curstate->roident == InvalidRepOriginId && + free_state == NULL) + { + free_state = curstate; + continue; + } + + /* not our slot */ + if (curstate->roident != node) + { + continue; + } + + /* ok, found slot */ + replication_state = curstate; + + LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); + + /* Make sure it's not used by somebody else */ + if (replication_state->acquired_by != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication origin with ID %d is already active for PID %d", + replication_state->roident, + replication_state->acquired_by))); + } + + break; + } + + if (replication_state == NULL && free_state == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("could not find free replication state slot for replication origin with ID %d", + node), + errhint("Increase max_replication_slots and try again."))); + + if (replication_state == NULL) + { + /* initialize new slot */ + LWLockAcquire(&free_state->lock, LW_EXCLUSIVE); + replication_state = free_state; + Assert(replication_state->remote_lsn == InvalidXLogRecPtr); + Assert(replication_state->local_lsn == InvalidXLogRecPtr); + replication_state->roident = node; + } + + Assert(replication_state->roident != InvalidRepOriginId); + + /* + * If somebody "forcefully" sets this slot, WAL log it, so it's durable + * and the standby gets the message. Primarily this will be called during + * WAL replay (of commit records) where no WAL logging is necessary. + */ + if (wal_log) + { + xl_replorigin_set xlrec; + + xlrec.remote_lsn = remote_commit; + xlrec.node_id = node; + xlrec.force = go_backward; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); + + XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET); + } + + /* + * Due to - harmless - race conditions during a checkpoint we could see + * values here that are older than the ones we already have in memory. We + * could also see older values for prepared transactions when the prepare + * is sent at a later point of time along with commit prepared and there + * are other transactions commits between prepare and commit prepared. See + * ReorderBufferFinishPrepared. Don't overwrite those. + */ + if (go_backward || replication_state->remote_lsn < remote_commit) + replication_state->remote_lsn = remote_commit; + if (local_commit != InvalidXLogRecPtr && + (go_backward || replication_state->local_lsn < local_commit)) + replication_state->local_lsn = local_commit; + LWLockRelease(&replication_state->lock); + + /* + * Release *after* changing the LSNs, slot isn't acquired and thus could + * otherwise be dropped anytime. + */ + LWLockRelease(ReplicationOriginLock); +} + + +XLogRecPtr +replorigin_get_progress(RepOriginId node, bool flush) +{ + int i; + XLogRecPtr local_lsn = InvalidXLogRecPtr; + XLogRecPtr remote_lsn = InvalidXLogRecPtr; + + /* prevent slots from being concurrently dropped */ + LWLockAcquire(ReplicationOriginLock, LW_SHARED); + + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *state; + + state = &replication_states[i]; + + if (state->roident == node) + { + LWLockAcquire(&state->lock, LW_SHARED); + + remote_lsn = state->remote_lsn; + local_lsn = state->local_lsn; + + LWLockRelease(&state->lock); + + break; + } + } + + LWLockRelease(ReplicationOriginLock); + + if (flush && local_lsn != InvalidXLogRecPtr) + XLogFlush(local_lsn); + + return remote_lsn; +} + +/* + * Tear down a (possibly) configured session replication origin during process + * exit. + */ +static void +ReplicationOriginExitCleanup(int code, Datum arg) +{ + ConditionVariable *cv = NULL; + + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + + if (session_replication_state != NULL && + session_replication_state->acquired_by == MyProcPid) + { + cv = &session_replication_state->origin_cv; + + session_replication_state->acquired_by = 0; + session_replication_state = NULL; + } + + LWLockRelease(ReplicationOriginLock); + + if (cv) + ConditionVariableBroadcast(cv); +} + +/* + * Setup a replication origin in the shared memory struct if it doesn't + * already exist and cache access to the specific ReplicationSlot so the + * array doesn't have to be searched when calling + * replorigin_session_advance(). + * + * Obviously only one such cached origin can exist per process and the current + * cached value can only be set again after the previous value is torn down + * with replorigin_session_reset(). + */ +void +replorigin_session_setup(RepOriginId node) +{ + static bool registered_cleanup; + int i; + int free_slot = -1; + + if (!registered_cleanup) + { + on_shmem_exit(ReplicationOriginExitCleanup, 0); + registered_cleanup = true; + } + + Assert(max_replication_slots > 0); + + if (session_replication_state != NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot setup replication origin when one is already setup"))); + + /* Lock exclusively, as we may have to create a new table entry. */ + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + + /* + * Search for either an existing slot for the origin, or a free one we can + * use. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *curstate = &replication_states[i]; + + /* remember where to insert if necessary */ + if (curstate->roident == InvalidRepOriginId && + free_slot == -1) + { + free_slot = i; + continue; + } + + /* not our slot */ + if (curstate->roident != node) + continue; + + else if (curstate->acquired_by != 0) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication origin with ID %d is already active for PID %d", + curstate->roident, curstate->acquired_by))); + } + + /* ok, found slot */ + session_replication_state = curstate; + } + + + if (session_replication_state == NULL && free_slot == -1) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("could not find free replication state slot for replication origin with ID %d", + node), + errhint("Increase max_replication_slots and try again."))); + else if (session_replication_state == NULL) + { + /* initialize new slot */ + session_replication_state = &replication_states[free_slot]; + Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); + Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); + session_replication_state->roident = node; + } + + + Assert(session_replication_state->roident != InvalidRepOriginId); + + session_replication_state->acquired_by = MyProcPid; + + LWLockRelease(ReplicationOriginLock); + + /* probably this one is pointless */ + ConditionVariableBroadcast(&session_replication_state->origin_cv); +} + +/* + * Reset replay state previously setup in this session. + * + * This function may only be called if an origin was setup with + * replorigin_session_setup(). + */ +void +replorigin_session_reset(void) +{ + ConditionVariable *cv; + + Assert(max_replication_slots != 0); + + if (session_replication_state == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("no replication origin is configured"))); + + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); + + session_replication_state->acquired_by = 0; + cv = &session_replication_state->origin_cv; + session_replication_state = NULL; + + LWLockRelease(ReplicationOriginLock); + + ConditionVariableBroadcast(cv); +} + +/* + * Do the same work replorigin_advance() does, just on the session's + * configured origin. + * + * This is noticeably cheaper than using replorigin_advance(). + */ +void +replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit) +{ + Assert(session_replication_state != NULL); + Assert(session_replication_state->roident != InvalidRepOriginId); + + LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE); + if (session_replication_state->local_lsn < local_commit) + session_replication_state->local_lsn = local_commit; + if (session_replication_state->remote_lsn < remote_commit) + session_replication_state->remote_lsn = remote_commit; + LWLockRelease(&session_replication_state->lock); +} + +/* + * Ask the machinery about the point up to which we successfully replayed + * changes from an already setup replication origin. + */ +XLogRecPtr +replorigin_session_get_progress(bool flush) +{ + XLogRecPtr remote_lsn; + XLogRecPtr local_lsn; + + Assert(session_replication_state != NULL); + + LWLockAcquire(&session_replication_state->lock, LW_SHARED); + remote_lsn = session_replication_state->remote_lsn; + local_lsn = session_replication_state->local_lsn; + LWLockRelease(&session_replication_state->lock); + + if (flush && local_lsn != InvalidXLogRecPtr) + XLogFlush(local_lsn); + + return remote_lsn; +} + + + +/* --------------------------------------------------------------------------- + * SQL functions for working with replication origin. + * + * These mostly should be fairly short wrappers around more generic functions. + * --------------------------------------------------------------------------- + */ + +/* + * Create replication origin for the passed in name, and return the assigned + * oid. + */ +Datum +pg_replication_origin_create(PG_FUNCTION_ARGS) +{ + char *name; + RepOriginId roident; + + replorigin_check_prerequisites(false, false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + + /* Replication origins "pg_xxx" are reserved for internal use */ + if (IsReservedName(name)) + ereport(ERROR, + (errcode(ERRCODE_RESERVED_NAME), + errmsg("replication origin name \"%s\" is reserved", + name), + errdetail("Origin names starting with \"pg_\" are reserved."))); + + /* + * If built with appropriate switch, whine when regression-testing + * conventions for replication origin names are violated. + */ +#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS + if (strncmp(name, "regress_", 8) != 0) + elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\""); +#endif + + roident = replorigin_create(name); + + pfree(name); + + PG_RETURN_OID(roident); +} + +/* + * Drop replication origin. + */ +Datum +pg_replication_origin_drop(PG_FUNCTION_ARGS) +{ + char *name; + + replorigin_check_prerequisites(false, false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + + replorigin_drop_by_name(name, false, true); + + pfree(name); + + PG_RETURN_VOID(); +} + +/* + * Return oid of a replication origin. + */ +Datum +pg_replication_origin_oid(PG_FUNCTION_ARGS) +{ + char *name; + RepOriginId roident; + + replorigin_check_prerequisites(false, false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + roident = replorigin_by_name(name, true); + + pfree(name); + + if (OidIsValid(roident)) + PG_RETURN_OID(roident); + PG_RETURN_NULL(); +} + +/* + * Setup a replication origin for this session. + */ +Datum +pg_replication_origin_session_setup(PG_FUNCTION_ARGS) +{ + char *name; + RepOriginId origin; + + replorigin_check_prerequisites(true, false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + origin = replorigin_by_name(name, false); + replorigin_session_setup(origin); + + replorigin_session_origin = origin; + + pfree(name); + + PG_RETURN_VOID(); +} + +/* + * Reset previously setup origin in this session + */ +Datum +pg_replication_origin_session_reset(PG_FUNCTION_ARGS) +{ + replorigin_check_prerequisites(true, false); + + replorigin_session_reset(); + + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + PG_RETURN_VOID(); +} + +/* + * Has a replication origin been setup for this session. + */ +Datum +pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) +{ + replorigin_check_prerequisites(false, false); + + PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId); +} + + +/* + * Return the replication progress for origin setup in the current session. + * + * If 'flush' is set to true it is ensured that the returned value corresponds + * to a local transaction that has been flushed. This is useful if asynchronous + * commits are used when replaying replicated transactions. + */ +Datum +pg_replication_origin_session_progress(PG_FUNCTION_ARGS) +{ + XLogRecPtr remote_lsn = InvalidXLogRecPtr; + bool flush = PG_GETARG_BOOL(0); + + replorigin_check_prerequisites(true, false); + + if (session_replication_state == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("no replication origin is configured"))); + + remote_lsn = replorigin_session_get_progress(flush); + + if (remote_lsn == InvalidXLogRecPtr) + PG_RETURN_NULL(); + + PG_RETURN_LSN(remote_lsn); +} + +Datum +pg_replication_origin_xact_setup(PG_FUNCTION_ARGS) +{ + XLogRecPtr location = PG_GETARG_LSN(0); + + replorigin_check_prerequisites(true, false); + + if (session_replication_state == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("no replication origin is configured"))); + + replorigin_session_origin_lsn = location; + replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); + + PG_RETURN_VOID(); +} + +Datum +pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) +{ + replorigin_check_prerequisites(true, false); + + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + + PG_RETURN_VOID(); +} + + +Datum +pg_replication_origin_advance(PG_FUNCTION_ARGS) +{ + text *name = PG_GETARG_TEXT_PP(0); + XLogRecPtr remote_commit = PG_GETARG_LSN(1); + RepOriginId node; + + replorigin_check_prerequisites(true, false); + + /* lock to prevent the replication origin from vanishing */ + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + + node = replorigin_by_name(text_to_cstring(name), false); + + /* + * Can't sensibly pass a local commit to be flushed at checkpoint - this + * xact hasn't committed yet. This is why this function should be used to + * set up the initial replication state, but not for replay. + */ + replorigin_advance(node, remote_commit, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + + PG_RETURN_VOID(); +} + + +/* + * Return the replication progress for an individual replication origin. + * + * If 'flush' is set to true it is ensured that the returned value corresponds + * to a local transaction that has been flushed. This is useful if asynchronous + * commits are used when replaying replicated transactions. + */ +Datum +pg_replication_origin_progress(PG_FUNCTION_ARGS) +{ + char *name; + bool flush; + RepOriginId roident; + XLogRecPtr remote_lsn = InvalidXLogRecPtr; + + replorigin_check_prerequisites(true, true); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + flush = PG_GETARG_BOOL(1); + + roident = replorigin_by_name(name, false); + Assert(OidIsValid(roident)); + + remote_lsn = replorigin_get_progress(roident, flush); + + if (remote_lsn == InvalidXLogRecPtr) + PG_RETURN_NULL(); + + PG_RETURN_LSN(remote_lsn); +} + + +Datum +pg_show_replication_origin_status(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + int i; +#define REPLICATION_ORIGIN_PROGRESS_COLS 4 + + /* we want to return 0 rows if slot is set to zero */ + replorigin_check_prerequisites(false, true); + + InitMaterializedSRF(fcinfo, 0); + + /* prevent slots from being concurrently dropped */ + LWLockAcquire(ReplicationOriginLock, LW_SHARED); + + /* + * Iterate through all possible replication_states, display if they are + * filled. Note that we do not take any locks, so slightly corrupted/out + * of date values are a possibility. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *state; + Datum values[REPLICATION_ORIGIN_PROGRESS_COLS]; + bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS]; + char *roname; + + state = &replication_states[i]; + + /* unused slot, nothing to display */ + if (state->roident == InvalidRepOriginId) + continue; + + memset(values, 0, sizeof(values)); + memset(nulls, 1, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(state->roident); + nulls[0] = false; + + /* + * We're not preventing the origin to be dropped concurrently, so + * silently accept that it might be gone. + */ + if (replorigin_by_oid(state->roident, true, + &roname)) + { + values[1] = CStringGetTextDatum(roname); + nulls[1] = false; + } + + LWLockAcquire(&state->lock, LW_SHARED); + + values[2] = LSNGetDatum(state->remote_lsn); + nulls[2] = false; + + values[3] = LSNGetDatum(state->local_lsn); + nulls[3] = false; + + LWLockRelease(&state->lock); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, + values, nulls); + } + + LWLockRelease(ReplicationOriginLock); + +#undef REPLICATION_ORIGIN_PROGRESS_COLS + + return (Datum) 0; +} |