summaryrefslogtreecommitdiffstats
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/logical/worker.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c3254
1 files changed, 3254 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
new file mode 100644
index 0000000..8c9a4b5
--- /dev/null
+++ b/src/backend/replication/logical/worker.c
@@ -0,0 +1,3254 @@
+/*-------------------------------------------------------------------------
+ * worker.c
+ * PostgreSQL logical replication worker (apply)
+ *
+ * Copyright (c) 2016-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/worker.c
+ *
+ * NOTES
+ * This file contains the worker which applies logical changes as they come
+ * from remote logical replication stream.
+ *
+ * The main worker (apply) is started by logical replication worker
+ * launcher for every enabled subscription in a database. It uses
+ * walsender protocol to communicate with publisher.
+ *
+ * This module includes server facing code and shares libpqwalreceiver
+ * module with walreceiver for providing the libpq specific functionality.
+ *
+ *
+ * STREAMED TRANSACTIONS
+ * ---------------------
+ * Streamed transactions (large transactions exceeding a memory limit on the
+ * upstream) are not applied immediately, but instead, the data is written
+ * to temporary files and then applied at once when the final commit arrives.
+ *
+ * Unlike the regular (non-streamed) case, handling streamed transactions has
+ * to handle aborts of both the toplevel transaction and subtransactions. This
+ * is achieved by tracking offsets for subtransactions, which is then used
+ * to truncate the file with serialized changes.
+ *
+ * The files are placed in tmp file directory by default, and the filenames
+ * include both the XID of the toplevel transaction and OID of the
+ * subscription. This is necessary so that different workers processing a
+ * remote transaction with the same XID doesn't interfere.
+ *
+ * We use BufFiles instead of using normal temporary files because (a) the
+ * BufFile infrastructure supports temporary files that exceed the OS file size
+ * limit, (b) provides a way for automatic clean up on the error and (c) provides
+ * a way to survive these files across local transactions and allow to open and
+ * close at stream start and close. We decided to use SharedFileSet
+ * infrastructure as without that it deletes the files on the closure of the
+ * file and if we decide to keep stream files open across the start/stop stream
+ * then it will consume a lot of memory (more than 8K for each BufFile and
+ * there could be multiple such BufFiles as the subscriber could receive
+ * multiple start/stop streams for different transactions before getting the
+ * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * a new way to pass filenames to BufFile APIs so that we are allowed to open
+ * the file we desired across multiple stream-open calls for the same
+ * transaction.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/xact.h"
+#include "access/xlog_internal.h"
+#include "catalog/catalog.h"
+#include "catalog/namespace.h"
+#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
+#include "catalog/pg_subscription.h"
+#include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/tablecmds.h"
+#include "commands/tablespace.h"
+#include "commands/trigger.h"
+#include "executor/executor.h"
+#include "executor/execPartition.h"
+#include "executor/nodeModifyTable.h"
+#include "funcapi.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqsignal.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/postmaster.h"
+#include "postmaster/walwriter.h"
+#include "replication/decode.h"
+#include "replication/logical.h"
+#include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
+#include "replication/origin.h"
+#include "replication/reorderbuffer.h"
+#include "replication/snapbuild.h"
+#include "replication/walreceiver.h"
+#include "replication/worker_internal.h"
+#include "rewrite/rewriteHandler.h"
+#include "storage/buffile.h"
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+#include "utils/catcache.h"
+#include "utils/dynahash.h"
+#include "utils/datum.h"
+#include "utils/fmgroids.h"
+#include "utils/guc.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/timeout.h"
+
+#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
+
+typedef struct FlushPosition
+{
+ dlist_node node;
+ XLogRecPtr local_end;
+ XLogRecPtr remote_end;
+} FlushPosition;
+
+static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
+
+typedef struct SlotErrCallbackArg
+{
+ LogicalRepRelMapEntry *rel;
+ int remote_attnum;
+} SlotErrCallbackArg;
+
+typedef struct ApplyExecutionData
+{
+ EState *estate; /* executor state, used to track resources */
+
+ LogicalRepRelMapEntry *targetRel; /* replication target rel */
+ ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
+
+ /* These fields are used when the target relation is partitioned: */
+ ModifyTableState *mtstate; /* dummy ModifyTable state */
+ PartitionTupleRouting *proute; /* partition routing info */
+} ApplyExecutionData;
+
+/*
+ * Stream xid hash entry. Whenever we see a new xid we create this entry in the
+ * xidhash and along with it create the streaming file and store the fileset handle.
+ * The subxact file is created iff there is any subxact info under this xid. This
+ * entry is used on the subsequent streams for the xid to get the corresponding
+ * fileset handles, so storing them in hash makes the search faster.
+ */
+typedef struct StreamXidHash
+{
+ TransactionId xid; /* xid is the hash key and must be first */
+ SharedFileSet *stream_fileset; /* shared file set for stream data */
+ SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+} StreamXidHash;
+
+static MemoryContext ApplyMessageContext = NULL;
+MemoryContext ApplyContext = NULL;
+
+/* per stream context for streaming transactions */
+static MemoryContext LogicalStreamingContext = NULL;
+
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
+
+Subscription *MySubscription = NULL;
+bool MySubscriptionValid = false;
+
+bool in_remote_transaction = false;
+static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+
+/* fields valid only when processing streamed transaction */
+static bool in_streamed_transaction = false;
+
+static TransactionId stream_xid = InvalidTransactionId;
+
+/*
+ * Hash table for storing the streaming xid information along with shared file
+ * set for streaming and subxact files.
+ */
+static HTAB *xidhash = NULL;
+
+/* BufFile handle of the current streaming file */
+static BufFile *stream_fd = NULL;
+
+typedef struct SubXactInfo
+{
+ TransactionId xid; /* XID of the subxact */
+ int fileno; /* file number in the buffile */
+ off_t offset; /* offset in the file */
+} SubXactInfo;
+
+/* Sub-transaction data for the current streaming transaction */
+typedef struct ApplySubXactData
+{
+ uint32 nsubxacts; /* number of sub-transactions */
+ uint32 nsubxacts_max; /* current capacity of subxacts */
+ TransactionId subxact_last; /* xid of the last sub-transaction */
+ SubXactInfo *subxacts; /* sub-xact offset in changes file */
+} ApplySubXactData;
+
+static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
+
+static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
+static inline void changes_filename(char *path, Oid subid, TransactionId xid);
+
+/*
+ * Information about subtransactions of a given toplevel transaction.
+ */
+static void subxact_info_write(Oid subid, TransactionId xid);
+static void subxact_info_read(Oid subid, TransactionId xid);
+static void subxact_info_add(TransactionId xid);
+static inline void cleanup_subxact_info(void);
+
+/*
+ * Serialize and deserialize changes for a toplevel transaction.
+ */
+static void stream_cleanup_files(Oid subid, TransactionId xid);
+static void stream_open_file(Oid subid, TransactionId xid, bool first);
+static void stream_write_change(char action, StringInfo s);
+static void stream_close_file(void);
+
+static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+
+static void store_flush_position(XLogRecPtr remote_lsn);
+
+static void maybe_reread_subscription(void);
+
+/* prototype needed because of stream_commit */
+static void apply_dispatch(StringInfo s);
+
+static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
+static void apply_handle_insert_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot);
+static void apply_handle_update_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup);
+static void apply_handle_delete_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot);
+static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
+ LogicalRepRelation *remoterel,
+ TupleTableSlot *remoteslot,
+ TupleTableSlot **localslot);
+static void apply_handle_tuple_routing(ApplyExecutionData *edata,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ CmdType operation);
+
+/*
+ * Should this worker apply changes for given relation.
+ *
+ * This is mainly needed for initial relation data sync as that runs in
+ * separate worker process running in parallel and we need some way to skip
+ * changes coming to the main apply worker during the sync of a table.
+ *
+ * Note we need to do smaller or equals comparison for SYNCDONE state because
+ * it might hold position of end of initial slot consistent point WAL
+ * record + 1 (ie start of next record) and next record can be COMMIT of
+ * transaction we are now processing (which is what we set remote_final_lsn
+ * to in apply_handle_begin).
+ */
+static bool
+should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
+{
+ if (am_tablesync_worker())
+ return MyLogicalRepWorker->relid == rel->localreloid;
+ else
+ return (rel->state == SUBREL_STATE_READY ||
+ (rel->state == SUBREL_STATE_SYNCDONE &&
+ rel->statelsn <= remote_final_lsn));
+}
+
+/*
+ * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
+ *
+ * Start a transaction, if this is the first step (else we keep using the
+ * existing transaction).
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
+ */
+static void
+begin_replication_step(void)
+{
+ SetCurrentStatementStartTimestamp();
+
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
+ }
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+
+ MemoryContextSwitchTo(ApplyMessageContext);
+}
+
+/*
+ * Finish up one step of a replication transaction.
+ * Callers of begin_replication_step() must also call this.
+ *
+ * We don't close out the transaction here, but we should increment
+ * the command counter to make the effects of this step visible.
+ */
+static void
+end_replication_step(void)
+{
+ PopActiveSnapshot();
+
+ CommandCounterIncrement();
+}
+
+/*
+ * Handle streamed transactions.
+ *
+ * If in streaming mode (receiving a block of streamed transaction), we
+ * simply redirect it to a file for the proper toplevel transaction.
+ *
+ * Returns true for streamed transactions, false otherwise (regular mode).
+ */
+static bool
+handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
+{
+ TransactionId xid;
+
+ /* not in streaming mode */
+ if (!in_streamed_transaction)
+ return false;
+
+ Assert(stream_fd != NULL);
+ Assert(TransactionIdIsValid(stream_xid));
+
+ /*
+ * We should have received XID of the subxact as the first part of the
+ * message, so extract it.
+ */
+ xid = pq_getmsgint(s, 4);
+
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid transaction ID in streamed replication transaction")));
+
+ /* Add the new subxact to the array (unless already there). */
+ subxact_info_add(xid);
+
+ /* write the change to the current file */
+ stream_write_change(action, s);
+
+ return true;
+}
+
+/*
+ * Executor state preparation for evaluation of constraint expressions,
+ * indexes and triggers for the specified relation.
+ *
+ * Note that the caller must open and close any indexes to be updated.
+ */
+static ApplyExecutionData *
+create_edata_for_relation(LogicalRepRelMapEntry *rel)
+{
+ ApplyExecutionData *edata;
+ EState *estate;
+ RangeTblEntry *rte;
+ ResultRelInfo *resultRelInfo;
+
+ edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
+ edata->targetRel = rel;
+
+ edata->estate = estate = CreateExecutorState();
+
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = RelationGetRelid(rel->localrel);
+ rte->relkind = rel->localrel->rd_rel->relkind;
+ rte->rellockmode = AccessShareLock;
+ ExecInitRangeTable(estate, list_make1(rte));
+
+ edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
+
+ /*
+ * Use Relation opened by logicalrep_rel_open() instead of opening it
+ * again.
+ */
+ InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
+
+ /*
+ * We put the ResultRelInfo in the es_opened_result_relations list, even
+ * though we don't populate the es_result_relations array. That's a bit
+ * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
+ *
+ * ExecOpenIndices() is not called here either, each execution path doing
+ * an apply operation being responsible for that.
+ */
+ estate->es_opened_result_relations =
+ lappend(estate->es_opened_result_relations, resultRelInfo);
+
+ estate->es_output_cid = GetCurrentCommandId(true);
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /* other fields of edata remain NULL for now */
+
+ return edata;
+}
+
+/*
+ * Finish any operations related to the executor state created by
+ * create_edata_for_relation().
+ */
+static void
+finish_edata(ApplyExecutionData *edata)
+{
+ EState *estate = edata->estate;
+
+ /* Handle any queued AFTER triggers. */
+ AfterTriggerEndQuery(estate);
+
+ /* Shut down tuple routing, if any was done. */
+ if (edata->proute)
+ ExecCleanupTupleRouting(edata->mtstate, edata->proute);
+
+ /*
+ * Cleanup. It might seem that we should call ExecCloseResultRelations()
+ * here, but we intentionally don't. It would close the rel we added to
+ * es_opened_result_relations above, which is wrong because we took no
+ * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
+ * any other relations opened during execution.
+ */
+ ExecResetTupleTable(estate->es_tupleTable, false);
+ FreeExecutorState(estate);
+ pfree(edata);
+}
+
+/*
+ * Executes default values for columns for which we can't map to remote
+ * relation columns.
+ *
+ * This allows us to support tables which have more columns on the downstream
+ * than on the upstream.
+ */
+static void
+slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
+ TupleTableSlot *slot)
+{
+ TupleDesc desc = RelationGetDescr(rel->localrel);
+ int num_phys_attrs = desc->natts;
+ int i;
+ int attnum,
+ num_defaults = 0;
+ int *defmap;
+ ExprState **defexprs;
+ ExprContext *econtext;
+
+ econtext = GetPerTupleExprContext(estate);
+
+ /* We got all the data via replication, no need to evaluate anything. */
+ if (num_phys_attrs == rel->remoterel.natts)
+ return;
+
+ defmap = (int *) palloc(num_phys_attrs * sizeof(int));
+ defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
+
+ Assert(rel->attrmap->maplen == num_phys_attrs);
+ for (attnum = 0; attnum < num_phys_attrs; attnum++)
+ {
+ Expr *defexpr;
+
+ if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
+ continue;
+
+ if (rel->attrmap->attnums[attnum] >= 0)
+ continue;
+
+ defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
+
+ if (defexpr != NULL)
+ {
+ /* Run the expression through planner */
+ defexpr = expression_planner(defexpr);
+
+ /* Initialize executable expression in copycontext */
+ defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
+ defmap[num_defaults] = attnum;
+ num_defaults++;
+ }
+
+ }
+
+ for (i = 0; i < num_defaults; i++)
+ slot->tts_values[defmap[i]] =
+ ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
+}
+
+/*
+ * Error callback to give more context info about data conversion failures
+ * while reading data from the remote server.
+ */
+static void
+slot_store_error_callback(void *arg)
+{
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ LogicalRepRelMapEntry *rel;
+
+ /* Nothing to do if remote attribute number is not set */
+ if (errarg->remote_attnum < 0)
+ return;
+
+ rel = errarg->rel;
+ errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname,
+ rel->remoterel.attnames[errarg->remote_attnum]);
+}
+
+/*
+ * Store tuple data into slot.
+ *
+ * Incoming data can be either text or binary format.
+ */
+static void
+slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+ LogicalRepTupleData *tupleData)
+{
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
+
+ ExecClearTuple(slot);
+
+ /* Push callback + info on the error context stack */
+ errarg.rel = rel;
+ errarg.remote_attnum = -1;
+ errcallback.callback = slot_store_error_callback;
+ errcallback.arg = (void *) &errarg;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Call the "in" function for each non-dropped, non-null attribute */
+ Assert(natts == rel->attrmap->maplen);
+ for (i = 0; i < natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
+ int remoteattnum = rel->attrmap->attnums[i];
+
+ if (!att->attisdropped && remoteattnum >= 0)
+ {
+ StringInfo colvalue = &tupleData->colvalues[remoteattnum];
+
+ Assert(remoteattnum < tupleData->ncols);
+
+ errarg.remote_attnum = remoteattnum;
+
+ if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ slot->tts_values[i] =
+ OidInputFunctionCall(typinput, colvalue->data,
+ typioparam, att->atttypmod);
+ slot->tts_isnull[i] = false;
+ }
+ else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
+ {
+ Oid typreceive;
+ Oid typioparam;
+
+ /*
+ * In some code paths we may be asked to re-parse the same
+ * tuple data. Reset the StringInfo's cursor so that works.
+ */
+ colvalue->cursor = 0;
+
+ getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+ slot->tts_values[i] =
+ OidReceiveFunctionCall(typreceive, colvalue,
+ typioparam, att->atttypmod);
+
+ /* Trouble if it didn't eat the whole buffer */
+ if (colvalue->cursor != colvalue->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("incorrect binary data format in logical replication column %d",
+ remoteattnum + 1)));
+ slot->tts_isnull[i] = false;
+ }
+ else
+ {
+ /*
+ * NULL value from remote. (We don't expect to see
+ * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
+ * NULL.)
+ */
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
+
+ errarg.remote_attnum = -1;
+ }
+ else
+ {
+ /*
+ * We assign NULL to dropped attributes and missing values
+ * (missing values should be later filled using
+ * slot_fill_defaults).
+ */
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
+ }
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ ExecStoreVirtualTuple(slot);
+}
+
+/*
+ * Replace updated columns with data from the LogicalRepTupleData struct.
+ * This is somewhat similar to heap_modify_tuple but also calls the type
+ * input functions on the user data.
+ *
+ * "slot" is filled with a copy of the tuple in "srcslot", replacing
+ * columns provided in "tupleData" and leaving others as-is.
+ *
+ * Caution: unreplaced pass-by-ref columns in "slot" will point into the
+ * storage for "srcslot". This is OK for current usage, but someday we may
+ * need to materialize "slot" at the end to make it independent of "srcslot".
+ */
+static void
+slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
+ LogicalRepRelMapEntry *rel,
+ LogicalRepTupleData *tupleData)
+{
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
+
+ /* We'll fill "slot" with a virtual tuple, so we must start with ... */
+ ExecClearTuple(slot);
+
+ /*
+ * Copy all the column data from srcslot, so that we'll have valid values
+ * for unreplaced columns.
+ */
+ Assert(natts == srcslot->tts_tupleDescriptor->natts);
+ slot_getallattrs(srcslot);
+ memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
+ memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
+
+ /* For error reporting, push callback + info on the error context stack */
+ errarg.rel = rel;
+ errarg.remote_attnum = -1;
+ errcallback.callback = slot_store_error_callback;
+ errcallback.arg = (void *) &errarg;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Call the "in" function for each replaced attribute */
+ Assert(natts == rel->attrmap->maplen);
+ for (i = 0; i < natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
+ int remoteattnum = rel->attrmap->attnums[i];
+
+ if (remoteattnum < 0)
+ continue;
+
+ Assert(remoteattnum < tupleData->ncols);
+
+ if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
+ {
+ StringInfo colvalue = &tupleData->colvalues[remoteattnum];
+
+ errarg.remote_attnum = remoteattnum;
+
+ if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
+ {
+ Oid typinput;
+ Oid typioparam;
+
+ getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+ slot->tts_values[i] =
+ OidInputFunctionCall(typinput, colvalue->data,
+ typioparam, att->atttypmod);
+ slot->tts_isnull[i] = false;
+ }
+ else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
+ {
+ Oid typreceive;
+ Oid typioparam;
+
+ /*
+ * In some code paths we may be asked to re-parse the same
+ * tuple data. Reset the StringInfo's cursor so that works.
+ */
+ colvalue->cursor = 0;
+
+ getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+ slot->tts_values[i] =
+ OidReceiveFunctionCall(typreceive, colvalue,
+ typioparam, att->atttypmod);
+
+ /* Trouble if it didn't eat the whole buffer */
+ if (colvalue->cursor != colvalue->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("incorrect binary data format in logical replication column %d",
+ remoteattnum + 1)));
+ slot->tts_isnull[i] = false;
+ }
+ else
+ {
+ /* must be LOGICALREP_COLUMN_NULL */
+ slot->tts_values[i] = (Datum) 0;
+ slot->tts_isnull[i] = true;
+ }
+
+ errarg.remote_attnum = -1;
+ }
+ }
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ /* And finally, declare that "slot" contains a valid virtual tuple */
+ ExecStoreVirtualTuple(slot);
+}
+
+/*
+ * Handle BEGIN message.
+ */
+static void
+apply_handle_begin(StringInfo s)
+{
+ LogicalRepBeginData begin_data;
+
+ logicalrep_read_begin(s, &begin_data);
+
+ remote_final_lsn = begin_data.final_lsn;
+
+ in_remote_transaction = true;
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle COMMIT message.
+ *
+ * TODO, support tracking of multiple origins
+ */
+static void
+apply_handle_commit(StringInfo s)
+{
+ LogicalRepCommitData commit_data;
+
+ logicalrep_read_commit(s, &commit_data);
+
+ if (commit_data.commit_lsn != remote_final_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ LSN_FORMAT_ARGS(commit_data.commit_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn))));
+
+ apply_handle_commit_internal(&commit_data);
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle ORIGIN message.
+ *
+ * TODO, support tracking of multiple origins
+ */
+static void
+apply_handle_origin(StringInfo s)
+{
+ /*
+ * ORIGIN message can only come inside streaming transaction or inside
+ * remote transaction and before any actual writes.
+ */
+ if (!in_streamed_transaction &&
+ (!in_remote_transaction ||
+ (IsTransactionState() && !am_tablesync_worker())))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("ORIGIN message sent out of order")));
+}
+
+/*
+ * Handle STREAM START message.
+ */
+static void
+apply_handle_stream_start(StringInfo s)
+{
+ bool first_segment;
+ HASHCTL hash_ctl;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("duplicate STREAM START message")));
+
+ /*
+ * Start a transaction on stream start, this transaction will be committed
+ * on the stream stop unless it is a tablesync worker in which case it
+ * will be committed after processing all the messages. We need the
+ * transaction for handling the buffile, used for serializing the
+ * streaming data and subxact info.
+ */
+ begin_replication_step();
+
+ /* notify handle methods we're processing a remote transaction */
+ in_streamed_transaction = true;
+
+ /* extract XID of the top-level transaction */
+ stream_xid = logicalrep_read_stream_start(s, &first_segment);
+
+ if (!TransactionIdIsValid(stream_xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid transaction ID in streamed replication transaction")));
+
+ /*
+ * Initialize the xidhash table if we haven't yet. This will be used for
+ * the entire duration of the apply worker so create it in permanent
+ * context.
+ */
+ if (xidhash == NULL)
+ {
+ hash_ctl.keysize = sizeof(TransactionId);
+ hash_ctl.entrysize = sizeof(StreamXidHash);
+ hash_ctl.hcxt = ApplyContext;
+ xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ /* open the spool file for this transaction */
+ stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+
+ /* if this is not the first segment, open existing subxact file */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+
+ end_replication_step();
+}
+
+/*
+ * Handle STREAM STOP message.
+ */
+static void
+apply_handle_stream_stop(StringInfo s)
+{
+ if (!in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM STOP message without STREAM START")));
+
+ /*
+ * Close the file with serialized changes, and serialize information about
+ * subxacts for the toplevel transaction.
+ */
+ subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
+ stream_close_file();
+
+ /* We must be in a valid transaction state */
+ Assert(IsTransactionState());
+
+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();
+
+ in_streamed_transaction = false;
+
+ /* Reset per-stream context */
+ MemoryContextReset(LogicalStreamingContext);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle STREAM abort message.
+ */
+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM ABORT message without STREAM STOP")));
+
+ logicalrep_read_stream_abort(s, &xid, &subxid);
+
+ /*
+ * If the two XIDs are the same, it's in fact abort of toplevel xact, so
+ * just delete the files with serialized info.
+ */
+ if (xid == subxid)
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ else
+ {
+ /*
+ * OK, so it's a subxact. We need to read the subxact file for the
+ * toplevel transaction, determine the offset tracked for the subxact,
+ * and truncate the file with changes. We also remove the subxacts
+ * with higher offsets (or rather higher XIDs).
+ *
+ * We intentionally scan the array from the tail, because we're likely
+ * aborting a change for the most recent subtransactions.
+ *
+ * We can't use the binary search here as subxact XIDs won't
+ * necessarily arrive in sorted order, consider the case where we have
+ * released the savepoint for multiple subtransactions and then
+ * performed rollback to savepoint for one of the earlier
+ * sub-transaction.
+ */
+ int64 i;
+ int64 subidx;
+ BufFile *fd;
+ bool found = false;
+ char path[MAXPGPATH];
+ StreamXidHash *ent;
+
+ subidx = -1;
+ begin_replication_step();
+ subxact_info_read(MyLogicalRepWorker->subid, xid);
+
+ for (i = subxact_data.nsubxacts; i > 0; i--)
+ {
+ if (subxact_data.subxacts[i - 1].xid == subxid)
+ {
+ subidx = (i - 1);
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * If it's an empty sub-transaction then we will not find the subxid
+ * here so just cleanup the subxact info and return.
+ */
+ if (!found)
+ {
+ /* Cleanup the subxact info */
+ cleanup_subxact_info();
+ end_replication_step();
+ CommitTransactionCommand();
+ return;
+ }
+
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
+
+ /* open the changes file */
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+ fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+
+ /* OK, truncate the file at the right offset */
+ BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
+ subxact_data.subxacts[subidx].offset);
+ BufFileClose(fd);
+
+ /* discard the subxacts added later */
+ subxact_data.nsubxacts = subidx;
+
+ /* write the updated subxact list */
+ subxact_info_write(MyLogicalRepWorker->subid, xid);
+
+ end_replication_step();
+ CommitTransactionCommand();
+ }
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ TransactionId xid;
+ StringInfoData s2;
+ int nchanges;
+ char path[MAXPGPATH];
+ char *buffer = NULL;
+ LogicalRepCommitData commit_data;
+ StreamXidHash *ent;
+ MemoryContext oldcxt;
+ BufFile *fd;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+ xid = logicalrep_read_stream_commit(s, &commit_data);
+
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ /* Make sure we have an open transaction */
+ begin_replication_step();
+
+ /*
+ * Allocate file handle and memory required to process all the messages in
+ * TopTransactionContext to avoid them getting reset after each message is
+ * processed.
+ */
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* open the spool file for the committed transaction */
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+ elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
+
+ fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+
+ buffer = palloc(BLCKSZ);
+ initStringInfo(&s2);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ remote_final_lsn = commit_data.commit_lsn;
+
+ /*
+ * Make sure the handle apply_dispatch methods are aware we're in a remote
+ * transaction.
+ */
+ in_remote_transaction = true;
+ pgstat_report_activity(STATE_RUNNING, NULL);
+
+ end_replication_step();
+
+ /*
+ * Read the entries one by one and pass them through the same logic as in
+ * apply_dispatch.
+ */
+ nchanges = 0;
+ while (true)
+ {
+ int nbytes;
+ int len;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* read length of the on-disk record */
+ nbytes = BufFileRead(fd, &len, sizeof(len));
+
+ /* have we reached end of the file? */
+ if (nbytes == 0)
+ break;
+
+ /* do we have a correct length? */
+ if (nbytes != sizeof(len))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's changes file \"%s\": %m",
+ path)));
+
+ if (len <= 0)
+ elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+ len, path);
+
+ /* make sure we have sufficiently large buffer */
+ buffer = repalloc(buffer, len);
+
+ /* and finally read the data into the buffer */
+ if (BufFileRead(fd, buffer, len) != len)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's changes file \"%s\": %m",
+ path)));
+
+ /* copy the buffer to the stringinfo and call apply_dispatch */
+ resetStringInfo(&s2);
+ appendBinaryStringInfo(&s2, buffer, len);
+
+ /* Ensure we are reading the data into our memory context. */
+ oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
+
+ apply_dispatch(&s2);
+
+ MemoryContextReset(ApplyMessageContext);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ nchanges++;
+
+ if (nchanges % 1000 == 0)
+ elog(DEBUG1, "replayed %d changes from file \"%s\"",
+ nchanges, path);
+ }
+
+ BufFileClose(fd);
+
+ pfree(buffer);
+ pfree(s2.data);
+
+ elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
+ nchanges, path);
+
+ apply_handle_commit_internal(&commit_data);
+
+ /* unlink the files with serialized changes and subxact info */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Helper function for apply_handle_commit and apply_handle_stream_commit.
+ */
+static void
+apply_handle_commit_internal(LogicalRepCommitData *commit_data)
+{
+ if (IsTransactionState())
+ {
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data->end_lsn;
+ replorigin_session_origin_timestamp = commit_data->committime;
+
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data->end_lsn);
+ }
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
+
+ in_remote_transaction = false;
+}
+
+/*
+ * Handle RELATION message.
+ *
+ * Note we don't do validation against local schema here. The validation
+ * against local schema is postponed until first change for given relation
+ * comes as we only care about it when applying changes for it anyway and we
+ * do less locking this way.
+ */
+static void
+apply_handle_relation(StringInfo s)
+{
+ LogicalRepRelation *rel;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
+ return;
+
+ rel = logicalrep_read_rel(s);
+ logicalrep_relmap_update(rel);
+
+ /* Also reset all entries in the partition map that refer to remoterel. */
+ logicalrep_partmap_reset_relmap(rel);
+}
+
+/*
+ * Handle TYPE message.
+ *
+ * This implementation pays no attention to TYPE messages; we expect the user
+ * to have set things up so that the incoming data is acceptable to the input
+ * functions for the locally subscribed tables. Hence, we just read and
+ * discard the message.
+ */
+static void
+apply_handle_type(StringInfo s)
+{
+ LogicalRepTyp typ;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
+ return;
+
+ logicalrep_read_typ(s, &typ);
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+static Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+ Oid idxoid;
+
+ idxoid = RelationGetReplicaIndex(rel);
+
+ if (!OidIsValid(idxoid))
+ idxoid = RelationGetPrimaryKeyIndex(rel);
+
+ return idxoid;
+}
+
+/*
+ * Handle INSERT message.
+ */
+
+static void
+apply_handle_insert(StringInfo s)
+{
+ LogicalRepRelMapEntry *rel;
+ LogicalRepTupleData newtup;
+ LogicalRepRelId relid;
+ ApplyExecutionData *edata;
+ EState *estate;
+ TupleTableSlot *remoteslot;
+ MemoryContext oldctx;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+ return;
+
+ begin_replication_step();
+
+ relid = logicalrep_read_insert(s, &newtup);
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ end_replication_step();
+ return;
+ }
+
+ /* Initialize the executor state. */
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
+ remoteslot = ExecInitExtraTupleSlot(estate,
+ RelationGetDescr(rel->localrel),
+ &TTSOpsVirtual);
+
+ /* Process and store remote tuple in the slot */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_store_data(remoteslot, rel, &newtup);
+ slot_fill_defaults(rel, estate, remoteslot);
+ MemoryContextSwitchTo(oldctx);
+
+ /* For a partitioned table, insert the tuple into a partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_INSERT);
+ else
+ apply_handle_insert_internal(edata, edata->targetRelInfo,
+ remoteslot);
+
+ finish_edata(edata);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ end_replication_step();
+}
+
+/*
+ * Workhorse for apply_handle_insert()
+ * relinfo is for the relation we're actually inserting into
+ * (could be a child partition of edata->targetRelInfo)
+ */
+static void
+apply_handle_insert_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot)
+{
+ EState *estate = edata->estate;
+
+ /* We must open indexes here. */
+ ExecOpenIndices(relinfo, false);
+
+ /* Do the insert. */
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+
+ /* Cleanup. */
+ ExecCloseIndices(relinfo);
+}
+
+/*
+ * Check if the logical replication relation is updatable and throw
+ * appropriate error if it isn't.
+ */
+static void
+check_relation_updatable(LogicalRepRelMapEntry *rel)
+{
+ /*
+ * For partitioned tables, we only need to care if the target partition is
+ * updatable (aka has PK or RI defined for it).
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ return;
+
+ /* Updatable, no error. */
+ if (rel->updatable)
+ return;
+
+ /*
+ * We are in error mode so it's fine this is somewhat slow. It's better to
+ * give user correct error.
+ */
+ if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publisher did not send replica identity column "
+ "expected by the logical replication target relation \"%s.%s\"",
+ rel->remoterel.nspname, rel->remoterel.relname)));
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" has "
+ "neither REPLICA IDENTITY index nor PRIMARY "
+ "KEY and published relation does not have "
+ "REPLICA IDENTITY FULL",
+ rel->remoterel.nspname, rel->remoterel.relname)));
+}
+
+/*
+ * Handle UPDATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_update(StringInfo s)
+{
+ LogicalRepRelMapEntry *rel;
+ LogicalRepRelId relid;
+ ApplyExecutionData *edata;
+ EState *estate;
+ LogicalRepTupleData oldtup;
+ LogicalRepTupleData newtup;
+ bool has_oldtup;
+ TupleTableSlot *remoteslot;
+ RangeTblEntry *target_rte;
+ MemoryContext oldctx;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
+ return;
+
+ begin_replication_step();
+
+ relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
+ &newtup);
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ end_replication_step();
+ return;
+ }
+
+ /* Check if we can do the update. */
+ check_relation_updatable(rel);
+
+ /* Initialize the executor state. */
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
+ remoteslot = ExecInitExtraTupleSlot(estate,
+ RelationGetDescr(rel->localrel),
+ &TTSOpsVirtual);
+
+ /*
+ * Populate updatedCols so that per-column triggers can fire, and so
+ * executor can correctly pass down indexUnchanged hint. This could
+ * include more columns than were actually changed on the publisher
+ * because the logical replication protocol doesn't contain that
+ * information. But it would for example exclude columns that only exist
+ * on the subscriber, since we are not touching those.
+ */
+ target_rte = list_nth(estate->es_range_table, 0);
+ for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
+ int remoteattnum = rel->attrmap->attnums[i];
+
+ if (!att->attisdropped && remoteattnum >= 0)
+ {
+ Assert(remoteattnum < newtup.ncols);
+ if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
+ target_rte->updatedCols =
+ bms_add_member(target_rte->updatedCols,
+ i + 1 - FirstLowInvalidHeapAttributeNumber);
+ }
+ }
+
+ /* Also populate extraUpdatedCols, in case we have generated columns */
+ fill_extraUpdatedCols(target_rte, rel->localrel);
+
+ /* Build the search tuple. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_store_data(remoteslot, rel,
+ has_oldtup ? &oldtup : &newtup);
+ MemoryContextSwitchTo(oldctx);
+
+ /* For a partitioned table, apply update to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(edata,
+ remoteslot, &newtup, CMD_UPDATE);
+ else
+ apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup);
+
+ finish_edata(edata);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ end_replication_step();
+}
+
+/*
+ * Workhorse for apply_handle_update()
+ * relinfo is for the relation we're actually updating in
+ * (could be a child partition of edata->targetRelInfo)
+ */
+static void
+apply_handle_update_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup)
+{
+ EState *estate = edata->estate;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ Relation localrel = relinfo->ri_RelationDesc;
+ EPQState epqstate;
+ TupleTableSlot *localslot;
+ bool found;
+ MemoryContext oldctx;
+
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+ ExecOpenIndices(relinfo, false);
+
+ found = FindReplTupleInLocalRel(estate, localrel,
+ &relmapentry->remoterel,
+ remoteslot, &localslot);
+ ExecClearTuple(remoteslot);
+
+ /*
+ * Tuple found.
+ *
+ * Note this will fail if there are other conflicting unique indexes.
+ */
+ if (found)
+ {
+ /* Process and store remote tuple in the slot */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_modify_data(remoteslot, localslot, relmapentry, newtup);
+ MemoryContextSwitchTo(oldctx);
+
+ EvalPlanQualSetSlot(&epqstate, remoteslot);
+
+ /* Do the actual update. */
+ ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
+ remoteslot);
+ }
+ else
+ {
+ /*
+ * The tuple to be updated could not be found. Do nothing except for
+ * emitting a log message.
+ *
+ * XXX should this be promoted to ereport(LOG) perhaps?
+ */
+ elog(DEBUG1,
+ "logical replication did not find row to be updated "
+ "in replication target relation \"%s\"",
+ RelationGetRelationName(localrel));
+ }
+
+ /* Cleanup. */
+ ExecCloseIndices(relinfo);
+ EvalPlanQualEnd(&epqstate);
+}
+
+/*
+ * Handle DELETE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_delete(StringInfo s)
+{
+ LogicalRepRelMapEntry *rel;
+ LogicalRepTupleData oldtup;
+ LogicalRepRelId relid;
+ ApplyExecutionData *edata;
+ EState *estate;
+ TupleTableSlot *remoteslot;
+ MemoryContext oldctx;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
+ return;
+
+ begin_replication_step();
+
+ relid = logicalrep_read_delete(s, &oldtup);
+ rel = logicalrep_rel_open(relid, RowExclusiveLock);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, RowExclusiveLock);
+ end_replication_step();
+ return;
+ }
+
+ /* Check if we can do the delete. */
+ check_relation_updatable(rel);
+
+ /* Initialize the executor state. */
+ edata = create_edata_for_relation(rel);
+ estate = edata->estate;
+ remoteslot = ExecInitExtraTupleSlot(estate,
+ RelationGetDescr(rel->localrel),
+ &TTSOpsVirtual);
+
+ /* Build the search tuple. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_store_data(remoteslot, rel, &oldtup);
+ MemoryContextSwitchTo(oldctx);
+
+ /* For a partitioned table, apply delete to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_DELETE);
+ else
+ apply_handle_delete_internal(edata, edata->targetRelInfo,
+ remoteslot);
+
+ finish_edata(edata);
+
+ logicalrep_rel_close(rel, NoLock);
+
+ end_replication_step();
+}
+
+/*
+ * Workhorse for apply_handle_delete()
+ * relinfo is for the relation we're actually deleting from
+ * (could be a child partition of edata->targetRelInfo)
+ */
+static void
+apply_handle_delete_internal(ApplyExecutionData *edata,
+ ResultRelInfo *relinfo,
+ TupleTableSlot *remoteslot)
+{
+ EState *estate = edata->estate;
+ Relation localrel = relinfo->ri_RelationDesc;
+ LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
+ EPQState epqstate;
+ TupleTableSlot *localslot;
+ bool found;
+
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+ ExecOpenIndices(relinfo, false);
+
+ found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+ remoteslot, &localslot);
+
+ /* If found delete it. */
+ if (found)
+ {
+ EvalPlanQualSetSlot(&epqstate, localslot);
+
+ /* Do the actual delete. */
+ ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
+ }
+ else
+ {
+ /*
+ * The tuple to be deleted could not be found. Do nothing except for
+ * emitting a log message.
+ *
+ * XXX should this be promoted to ereport(LOG) perhaps?
+ */
+ elog(DEBUG1,
+ "logical replication did not find row to be deleted "
+ "in replication target relation \"%s\"",
+ RelationGetRelationName(localrel));
+ }
+
+ /* Cleanup. */
+ ExecCloseIndices(relinfo);
+ EvalPlanQualEnd(&epqstate);
+}
+
+/*
+ * Try to find a tuple received from the publication side (in 'remoteslot') in
+ * the corresponding local relation using either replica identity index,
+ * primary key or if needed, sequential scan.
+ *
+ * Local tuple, if found, is returned in '*localslot'.
+ */
+static bool
+FindReplTupleInLocalRel(EState *estate, Relation localrel,
+ LogicalRepRelation *remoterel,
+ TupleTableSlot *remoteslot,
+ TupleTableSlot **localslot)
+{
+ Oid idxoid;
+ bool found;
+
+ *localslot = table_slot_create(localrel, &estate->es_tupleTable);
+
+ idxoid = GetRelationIdentityOrPK(localrel);
+ Assert(OidIsValid(idxoid) ||
+ (remoterel->replident == REPLICA_IDENTITY_FULL));
+
+ if (OidIsValid(idxoid))
+ found = RelationFindReplTupleByIndex(localrel, idxoid,
+ LockTupleExclusive,
+ remoteslot, *localslot);
+ else
+ found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+ remoteslot, *localslot);
+
+ return found;
+}
+
+/*
+ * This handles insert, update, delete on a partitioned table.
+ */
+static void
+apply_handle_tuple_routing(ApplyExecutionData *edata,
+ TupleTableSlot *remoteslot,
+ LogicalRepTupleData *newtup,
+ CmdType operation)
+{
+ EState *estate = edata->estate;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ ResultRelInfo *relinfo = edata->targetRelInfo;
+ Relation parentrel = relinfo->ri_RelationDesc;
+ ModifyTableState *mtstate;
+ PartitionTupleRouting *proute;
+ ResultRelInfo *partrelinfo;
+ Relation partrel;
+ TupleTableSlot *remoteslot_part;
+ TupleConversionMap *map;
+ MemoryContext oldctx;
+ LogicalRepRelMapEntry *part_entry = NULL;
+ AttrMap *attrmap = NULL;
+
+ /* ModifyTableState is needed for ExecFindPartition(). */
+ edata->mtstate = mtstate = makeNode(ModifyTableState);
+ mtstate->ps.plan = NULL;
+ mtstate->ps.state = estate;
+ mtstate->operation = operation;
+ mtstate->resultRelInfo = relinfo;
+
+ /* ... as is PartitionTupleRouting. */
+ edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
+
+ /*
+ * Find the partition to which the "search tuple" belongs.
+ */
+ Assert(remoteslot != NULL);
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
+ remoteslot, estate);
+ Assert(partrelinfo != NULL);
+ partrel = partrelinfo->ri_RelationDesc;
+
+ /*
+ * To perform any of the operations below, the tuple must match the
+ * partition's rowtype. Convert if needed or just copy, using a dedicated
+ * slot to store the tuple in any case.
+ */
+ remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
+ if (remoteslot_part == NULL)
+ remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
+ map = partrelinfo->ri_RootToPartitionMap;
+ if (map != NULL)
+ {
+ attrmap = map->attrMap;
+ remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
+ remoteslot_part);
+ }
+ else
+ {
+ remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
+ slot_getallattrs(remoteslot_part);
+ }
+ MemoryContextSwitchTo(oldctx);
+
+ /* Check if we can do the update or delete on the leaf partition. */
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ part_entry = logicalrep_partition_open(relmapentry, partrel,
+ attrmap);
+ check_relation_updatable(part_entry);
+ }
+
+ switch (operation)
+ {
+ case CMD_INSERT:
+ apply_handle_insert_internal(edata, partrelinfo,
+ remoteslot_part);
+ break;
+
+ case CMD_DELETE:
+ apply_handle_delete_internal(edata, partrelinfo,
+ remoteslot_part);
+ break;
+
+ case CMD_UPDATE:
+
+ /*
+ * For UPDATE, depending on whether or not the updated tuple
+ * satisfies the partition's constraint, perform a simple UPDATE
+ * of the partition or move the updated tuple into a different
+ * suitable partition.
+ */
+ {
+ TupleTableSlot *localslot;
+ ResultRelInfo *partrelinfo_new;
+ bool found;
+
+ /* Get the matching local tuple from the partition. */
+ found = FindReplTupleInLocalRel(estate, partrel,
+ &part_entry->remoterel,
+ remoteslot_part, &localslot);
+ if (!found)
+ {
+ /*
+ * The tuple to be updated could not be found. Do nothing
+ * except for emitting a log message.
+ *
+ * XXX should this be promoted to ereport(LOG) perhaps?
+ */
+ elog(DEBUG1,
+ "logical replication did not find row to be updated "
+ "in replication target relation's partition \"%s\"",
+ RelationGetRelationName(partrel));
+ return;
+ }
+
+ /*
+ * Apply the update to the local tuple, putting the result in
+ * remoteslot_part.
+ */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ slot_modify_data(remoteslot_part, localslot, part_entry,
+ newtup);
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Does the updated tuple still satisfy the current
+ * partition's constraint?
+ */
+ if (!partrel->rd_rel->relispartition ||
+ ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
+ false))
+ {
+ /*
+ * Yes, so simply UPDATE the partition. We don't call
+ * apply_handle_update_internal() here, which would
+ * normally do the following work, to avoid repeating some
+ * work already done above to find the local tuple in the
+ * partition.
+ */
+ EPQState epqstate;
+
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+ ExecOpenIndices(partrelinfo, false);
+
+ EvalPlanQualSetSlot(&epqstate, remoteslot_part);
+ ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
+ localslot, remoteslot_part);
+ ExecCloseIndices(partrelinfo);
+ EvalPlanQualEnd(&epqstate);
+ }
+ else
+ {
+ /* Move the tuple into the new partition. */
+
+ /*
+ * New partition will be found using tuple routing, which
+ * can only occur via the parent table. We might need to
+ * convert the tuple to the parent's rowtype. Note that
+ * this is the tuple found in the partition, not the
+ * original search tuple received by this function.
+ */
+ if (map)
+ {
+ TupleConversionMap *PartitionToRootMap =
+ convert_tuples_by_name(RelationGetDescr(partrel),
+ RelationGetDescr(parentrel));
+
+ remoteslot =
+ execute_attr_map_slot(PartitionToRootMap->attrMap,
+ remoteslot_part, remoteslot);
+ }
+ else
+ {
+ remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
+ slot_getallattrs(remoteslot);
+ }
+
+
+ /* Find the new partition. */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrelinfo_new = ExecFindPartition(mtstate, relinfo,
+ proute, remoteslot,
+ estate);
+ MemoryContextSwitchTo(oldctx);
+ Assert(partrelinfo_new != partrelinfo);
+
+ /* DELETE old tuple found in the old partition. */
+ apply_handle_delete_internal(edata, partrelinfo,
+ localslot);
+
+ /* INSERT new tuple into the new partition. */
+
+ /*
+ * Convert the replacement tuple to match the destination
+ * partition rowtype.
+ */
+ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+ partrel = partrelinfo_new->ri_RelationDesc;
+ remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
+ if (remoteslot_part == NULL)
+ remoteslot_part = table_slot_create(partrel,
+ &estate->es_tupleTable);
+ map = partrelinfo_new->ri_RootToPartitionMap;
+ if (map != NULL)
+ {
+ remoteslot_part = execute_attr_map_slot(map->attrMap,
+ remoteslot,
+ remoteslot_part);
+ }
+ else
+ {
+ remoteslot_part = ExecCopySlot(remoteslot_part,
+ remoteslot);
+ slot_getallattrs(remoteslot);
+ }
+ MemoryContextSwitchTo(oldctx);
+ apply_handle_insert_internal(edata, partrelinfo_new,
+ remoteslot_part);
+ }
+ }
+ break;
+
+ default:
+ elog(ERROR, "unrecognized CmdType: %d", (int) operation);
+ break;
+ }
+}
+
+/*
+ * Handle TRUNCATE message.
+ *
+ * TODO: FDW support
+ */
+static void
+apply_handle_truncate(StringInfo s)
+{
+ bool cascade = false;
+ bool restart_seqs = false;
+ List *remote_relids = NIL;
+ List *remote_rels = NIL;
+ List *rels = NIL;
+ List *part_rels = NIL;
+ List *relids = NIL;
+ List *relids_logged = NIL;
+ ListCell *lc;
+ LOCKMODE lockmode = AccessExclusiveLock;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
+ return;
+
+ begin_replication_step();
+
+ remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
+
+ foreach(lc, remote_relids)
+ {
+ LogicalRepRelId relid = lfirst_oid(lc);
+ LogicalRepRelMapEntry *rel;
+
+ rel = logicalrep_rel_open(relid, lockmode);
+ if (!should_apply_changes_for_rel(rel))
+ {
+ /*
+ * The relation can't become interesting in the middle of the
+ * transaction so it's safe to unlock it.
+ */
+ logicalrep_rel_close(rel, lockmode);
+ continue;
+ }
+
+ remote_rels = lappend(remote_rels, rel);
+ rels = lappend(rels, rel->localrel);
+ relids = lappend_oid(relids, rel->localreloid);
+ if (RelationIsLogicallyLogged(rel->localrel))
+ relids_logged = lappend_oid(relids_logged, rel->localreloid);
+
+ /*
+ * Truncate partitions if we got a message to truncate a partitioned
+ * table.
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ ListCell *child;
+ List *children = find_all_inheritors(rel->localreloid,
+ lockmode,
+ NULL);
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ if (list_member_oid(relids, childrelid))
+ continue;
+
+ /* find_all_inheritors already got lock */
+ childrel = table_open(childrelid, NoLock);
+
+ /*
+ * Ignore temp tables of other backends. See similar code in
+ * ExecuteTruncate().
+ */
+ if (RELATION_IS_OTHER_TEMP(childrel))
+ {
+ table_close(childrel, lockmode);
+ continue;
+ }
+
+ rels = lappend(rels, childrel);
+ part_rels = lappend(part_rels, childrel);
+ relids = lappend_oid(relids, childrelid);
+ /* Log this relation only if needed for logical decoding */
+ if (RelationIsLogicallyLogged(childrel))
+ relids_logged = lappend_oid(relids_logged, childrelid);
+ }
+ }
+ }
+
+ /*
+ * Even if we used CASCADE on the upstream primary we explicitly default
+ * to replaying changes without further cascading. This might be later
+ * changeable with a user specified option.
+ */
+ ExecuteTruncateGuts(rels,
+ relids,
+ relids_logged,
+ DROP_RESTRICT,
+ restart_seqs);
+ foreach(lc, remote_rels)
+ {
+ LogicalRepRelMapEntry *rel = lfirst(lc);
+
+ logicalrep_rel_close(rel, NoLock);
+ }
+ foreach(lc, part_rels)
+ {
+ Relation rel = lfirst(lc);
+
+ table_close(rel, NoLock);
+ }
+
+ end_replication_step();
+}
+
+
+/*
+ * Logical replication protocol message dispatcher.
+ */
+static void
+apply_dispatch(StringInfo s)
+{
+ LogicalRepMsgType action = pq_getmsgbyte(s);
+
+ switch (action)
+ {
+ case LOGICAL_REP_MSG_BEGIN:
+ apply_handle_begin(s);
+ return;
+
+ case LOGICAL_REP_MSG_COMMIT:
+ apply_handle_commit(s);
+ return;
+
+ case LOGICAL_REP_MSG_INSERT:
+ apply_handle_insert(s);
+ return;
+
+ case LOGICAL_REP_MSG_UPDATE:
+ apply_handle_update(s);
+ return;
+
+ case LOGICAL_REP_MSG_DELETE:
+ apply_handle_delete(s);
+ return;
+
+ case LOGICAL_REP_MSG_TRUNCATE:
+ apply_handle_truncate(s);
+ return;
+
+ case LOGICAL_REP_MSG_RELATION:
+ apply_handle_relation(s);
+ return;
+
+ case LOGICAL_REP_MSG_TYPE:
+ apply_handle_type(s);
+ return;
+
+ case LOGICAL_REP_MSG_ORIGIN:
+ apply_handle_origin(s);
+ return;
+
+ case LOGICAL_REP_MSG_MESSAGE:
+
+ /*
+ * Logical replication does not use generic logical messages yet.
+ * Although, it could be used by other applications that use this
+ * output plugin.
+ */
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_START:
+ apply_handle_stream_start(s);
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_END:
+ apply_handle_stream_stop(s);
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_ABORT:
+ apply_handle_stream_abort(s);
+ return;
+
+ case LOGICAL_REP_MSG_STREAM_COMMIT:
+ apply_handle_stream_commit(s);
+ return;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid logical replication message type \"%c\"",
+ action)));
+}
+
+/*
+ * Figure out which write/flush positions to report to the walsender process.
+ *
+ * We can't simply report back the last LSN the walsender sent us because the
+ * local transaction might not yet be flushed to disk locally. Instead we
+ * build a list that associates local with remote LSNs for every commit. When
+ * reporting back the flush position to the sender we iterate that list and
+ * check which entries on it are already locally flushed. Those we can report
+ * as having been flushed.
+ *
+ * The have_pending_txes is true if there are outstanding transactions that
+ * need to be flushed.
+ */
+static void
+get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
+ bool *have_pending_txes)
+{
+ dlist_mutable_iter iter;
+ XLogRecPtr local_flush = GetFlushRecPtr();
+
+ *write = InvalidXLogRecPtr;
+ *flush = InvalidXLogRecPtr;
+
+ dlist_foreach_modify(iter, &lsn_mapping)
+ {
+ FlushPosition *pos =
+ dlist_container(FlushPosition, node, iter.cur);
+
+ *write = pos->remote_end;
+
+ if (pos->local_end <= local_flush)
+ {
+ *flush = pos->remote_end;
+ dlist_delete(iter.cur);
+ pfree(pos);
+ }
+ else
+ {
+ /*
+ * Don't want to uselessly iterate over the rest of the list which
+ * could potentially be long. Instead get the last element and
+ * grab the write position from there.
+ */
+ pos = dlist_tail_element(FlushPosition, node,
+ &lsn_mapping);
+ *write = pos->remote_end;
+ *have_pending_txes = true;
+ return;
+ }
+ }
+
+ *have_pending_txes = !dlist_is_empty(&lsn_mapping);
+}
+
+/*
+ * Store current remote/local lsn pair in the tracking list.
+ */
+static void
+store_flush_position(XLogRecPtr remote_lsn)
+{
+ FlushPosition *flushpos;
+
+ /* Need to do this in permanent context */
+ MemoryContextSwitchTo(ApplyContext);
+
+ /* Track commit lsn */
+ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
+ flushpos->local_end = XactLastCommitEnd;
+ flushpos->remote_end = remote_lsn;
+
+ dlist_push_tail(&lsn_mapping, &flushpos->node);
+ MemoryContextSwitchTo(ApplyMessageContext);
+}
+
+
+/* Update statistics of the worker. */
+static void
+UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
+{
+ MyLogicalRepWorker->last_lsn = last_lsn;
+ MyLogicalRepWorker->last_send_time = send_time;
+ MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
+ if (reply)
+ {
+ MyLogicalRepWorker->reply_lsn = last_lsn;
+ MyLogicalRepWorker->reply_time = send_time;
+ }
+}
+
+/*
+ * Apply main loop.
+ */
+static void
+LogicalRepApplyLoop(XLogRecPtr last_received)
+{
+ TimestampTz last_recv_timestamp = GetCurrentTimestamp();
+ bool ping_sent = false;
+ TimeLineID tli;
+
+ /*
+ * Init the ApplyMessageContext which we clean up after each replication
+ * protocol message.
+ */
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /*
+ * This memory context is used for per-stream data when the streaming mode
+ * is enabled. This context is reset on each stream stop.
+ */
+ LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
+ "LogicalStreamingContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /* mark as idle, before starting to loop */
+ pgstat_report_activity(STATE_IDLE, NULL);
+
+ /* This outer loop iterates once per wait. */
+ for (;;)
+ {
+ pgsocket fd = PGINVALID_SOCKET;
+ int rc;
+ int len;
+ char *buf = NULL;
+ bool endofstream = false;
+ long wait_time;
+
+ CHECK_FOR_INTERRUPTS();
+
+ MemoryContextSwitchTo(ApplyMessageContext);
+
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+
+ if (len != 0)
+ {
+ /* Loop to process all available data (without blocking). */
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
+
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
+
+ s.data = buf;
+ s.len = len;
+ s.cursor = 0;
+ s.maxlen = -1;
+
+ c = pq_getmsgbyte(&s);
+
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
+
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
+
+ if (last_received < start_lsn)
+ last_received = start_lsn;
+
+ if (last_received < end_lsn)
+ last_received = end_lsn;
+
+ UpdateWorkerStats(last_received, send_time, false);
+
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
+
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
+
+ if (last_received < end_lsn)
+ last_received = end_lsn;
+
+ send_feedback(last_received, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
+ }
+ /* other message types are purposefully ignored */
+
+ MemoryContextReset(ApplyMessageContext);
+ }
+
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ }
+ }
+
+ /* confirm all writes so far */
+ send_feedback(last_received, false, false);
+
+ if (!in_remote_transaction && !in_streamed_transaction)
+ {
+ /*
+ * If we didn't get any transactions for a while there might be
+ * unconsumed invalidation messages in the queue, consume them
+ * now.
+ */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+
+ /* Process any table synchronization changes. */
+ process_syncing_tables(last_received);
+ }
+
+ /* Cleanup the memory. */
+ MemoryContextResetAndDeleteChildren(ApplyMessageContext);
+ MemoryContextSwitchTo(TopMemoryContext);
+
+ /* Check if we need to exit the streaming loop. */
+ if (endofstream)
+ break;
+
+ /*
+ * Wait for more data or latch. If we have unflushed transactions,
+ * wake up after WalWriterDelay to see if they've been flushed yet (in
+ * which case we should send a feedback message). Otherwise, there's
+ * no particular urgency about waking up unless we get data or a
+ * signal.
+ */
+ if (!dlist_is_empty(&lsn_mapping))
+ wait_time = WalWriterDelay;
+ else
+ wait_time = NAPTIME_PER_CYCLE;
+
+ rc = WaitLatchOrSocket(MyLatch,
+ WL_SOCKET_READABLE | WL_LATCH_SET |
+ WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ fd, wait_time,
+ WAIT_EVENT_LOGICAL_APPLY_MAIN);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (rc & WL_TIMEOUT)
+ {
+ /*
+ * We didn't receive anything new. If we haven't heard anything
+ * from the server for more than wal_receiver_timeout / 2, ping
+ * the server. Also, if it's been longer than
+ * wal_receiver_status_interval since the last update we sent,
+ * send a status update to the primary anyway, to report any
+ * progress in applying WAL.
+ */
+ bool requestReply = false;
+
+ /*
+ * Check if time since last receive from primary has reached the
+ * configured limit.
+ */
+ if (wal_receiver_timeout > 0)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz timeout;
+
+ timeout =
+ TimestampTzPlusMilliseconds(last_recv_timestamp,
+ wal_receiver_timeout);
+
+ if (now >= timeout)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating logical replication worker due to timeout")));
+
+ /* Check to see if it's time for a ping. */
+ if (!ping_sent)
+ {
+ timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+ (wal_receiver_timeout / 2));
+ if (now >= timeout)
+ {
+ requestReply = true;
+ ping_sent = true;
+ }
+ }
+ }
+
+ send_feedback(last_received, requestReply, requestReply);
+ }
+ }
+
+ /* All done */
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+}
+
+/*
+ * Send a Standby Status Update message to server.
+ *
+ * 'recvpos' is the latest LSN we've received data to, force is set if we need
+ * to send a response to avoid timeouts.
+ */
+static void
+send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
+{
+ static StringInfo reply_message = NULL;
+ static TimestampTz send_time = 0;
+
+ static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
+ static XLogRecPtr last_writepos = InvalidXLogRecPtr;
+ static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ TimestampTz now;
+ bool have_pending_txes;
+
+ /*
+ * If the user doesn't want status to be reported to the publisher, be
+ * sure to exit before doing anything at all.
+ */
+ if (!force && wal_receiver_status_interval <= 0)
+ return;
+
+ /* It's legal to not pass a recvpos */
+ if (recvpos < last_recvpos)
+ recvpos = last_recvpos;
+
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ /*
+ * No outstanding transactions to flush, we can report the latest received
+ * position. This is important for synchronous replication.
+ */
+ if (!have_pending_txes)
+ flushpos = writepos = recvpos;
+
+ if (writepos < last_writepos)
+ writepos = last_writepos;
+
+ if (flushpos < last_flushpos)
+ flushpos = last_flushpos;
+
+ now = GetCurrentTimestamp();
+
+ /* if we've already reported everything we're good */
+ if (!force &&
+ writepos == last_writepos &&
+ flushpos == last_flushpos &&
+ !TimestampDifferenceExceeds(send_time, now,
+ wal_receiver_status_interval * 1000))
+ return;
+ send_time = now;
+
+ if (!reply_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ reply_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(reply_message);
+
+ pq_sendbyte(reply_message, 'r');
+ pq_sendint64(reply_message, recvpos); /* write */
+ pq_sendint64(reply_message, flushpos); /* flush */
+ pq_sendint64(reply_message, writepos); /* apply */
+ pq_sendint64(reply_message, now); /* sendTime */
+ pq_sendbyte(reply_message, requestReply); /* replyRequested */
+
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ force,
+ LSN_FORMAT_ARGS(recvpos),
+ LSN_FORMAT_ARGS(writepos),
+ LSN_FORMAT_ARGS(flushpos));
+
+ walrcv_send(LogRepWorkerWalRcvConn,
+ reply_message->data, reply_message->len);
+
+ if (recvpos > last_recvpos)
+ last_recvpos = recvpos;
+ if (writepos > last_writepos)
+ last_writepos = writepos;
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+}
+
+/*
+ * Reread subscription info if needed. Most changes will be exit.
+ */
+static void
+maybe_reread_subscription(void)
+{
+ MemoryContext oldctx;
+ Subscription *newsub;
+ bool started_tx = false;
+
+ /* When cache state is valid there is nothing to do here. */
+ if (MySubscriptionValid)
+ return;
+
+ /* This function might be called inside or outside of transaction. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ /* Ensure allocations in permanent context. */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+
+ /*
+ * Exit if the subscription was removed. This normally should not happen
+ * as the worker gets killed during DROP SUBSCRIPTION.
+ */
+ if (!newsub)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /*
+ * Exit if the subscription was disabled. This normally should not happen
+ * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
+ */
+ if (!newsub->enabled)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /* !slotname should never happen when enabled is true. */
+ Assert(newsub->slotname);
+
+ /*
+ * Exit if any parameter that affects the remote connection was changed.
+ * The launcher will start a new worker.
+ */
+ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
+ strcmp(newsub->name, MySubscription->name) != 0 ||
+ strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
+ newsub->binary != MySubscription->binary ||
+ newsub->stream != MySubscription->stream ||
+ !equal(newsub->publications, MySubscription->publications))
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /* Check for other changes that should never happen too. */
+ if (newsub->dbid != MySubscription->dbid)
+ {
+ elog(ERROR, "subscription %u changed unexpectedly",
+ MyLogicalRepWorker->subid);
+ }
+
+ /* Clean old subscription info and switch to new one. */
+ FreeSubscription(MySubscription);
+ MySubscription = newsub;
+
+ MemoryContextSwitchTo(oldctx);
+
+ /* Change synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
+ if (started_tx)
+ CommitTransactionCommand();
+
+ MySubscriptionValid = true;
+}
+
+/*
+ * Callback from subscription syscache invalidation.
+ */
+static void
+subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+ MySubscriptionValid = false;
+}
+
+/*
+ * subxact_info_write
+ * Store information about subxacts for a toplevel transaction.
+ *
+ * For each subxact we store offset of it's first change in the main file.
+ * The file is always over-written as a whole.
+ *
+ * XXX We should only store subxacts that were not aborted yet.
+ */
+static void
+subxact_info_write(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ Size len;
+ StreamXidHash *ent;
+ BufFile *fd;
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* Find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ NULL);
+ /* By this time we must have created the transaction entry */
+ Assert(ent);
+
+ /*
+ * If there is no subtransaction then nothing to do, but if already have
+ * subxact file then delete that.
+ */
+ if (subxact_data.nsubxacts == 0)
+ {
+ if (ent->subxact_fileset)
+ {
+ cleanup_subxact_info();
+ SharedFileSetDeleteAll(ent->subxact_fileset);
+ pfree(ent->subxact_fileset);
+ ent->subxact_fileset = NULL;
+ }
+ return;
+ }
+
+ subxact_filename(path, subid, xid);
+
+ /*
+ * Create the subxact file if it not already created, otherwise open the
+ * existing file.
+ */
+ if (ent->subxact_fileset == NULL)
+ {
+ MemoryContext oldctx;
+
+ /*
+ * We need to maintain shared fileset across multiple stream
+ * start/stop calls. So, need to allocate it in a persistent context.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ ent->subxact_fileset = palloc(sizeof(SharedFileSet));
+ SharedFileSetInit(ent->subxact_fileset, NULL);
+ MemoryContextSwitchTo(oldctx);
+
+ fd = BufFileCreateShared(ent->subxact_fileset, path);
+ }
+ else
+ fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+
+ len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+ /* Write the subxact count and subxact info */
+ BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
+ BufFileWrite(fd, subxact_data.subxacts, len);
+
+ BufFileClose(fd);
+
+ /* free the memory allocated for subxact info */
+ cleanup_subxact_info();
+}
+
+/*
+ * subxact_info_read
+ * Restore information about subxacts of a streamed transaction.
+ *
+ * Read information about subxacts into the structure subxact_data that can be
+ * used later.
+ */
+static void
+subxact_info_read(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ Size len;
+ BufFile *fd;
+ StreamXidHash *ent;
+ MemoryContext oldctx;
+
+ Assert(!subxact_data.subxacts);
+ Assert(subxact_data.nsubxacts == 0);
+ Assert(subxact_data.nsubxacts_max == 0);
+
+ /* Find the stream xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
+
+ /*
+ * If subxact_fileset is not valid that mean we don't have any subxact
+ * info
+ */
+ if (ent->subxact_fileset == NULL)
+ return;
+
+ subxact_filename(path, subid, xid);
+
+ fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+
+ /* read number of subxact items */
+ if (BufFileRead(fd, &subxact_data.nsubxacts,
+ sizeof(subxact_data.nsubxacts)) !=
+ sizeof(subxact_data.nsubxacts))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
+ path)));
+
+ len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+ /* we keep the maximum as a power of 2 */
+ subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
+
+ /*
+ * Allocate subxact information in the logical streaming context. We need
+ * this information during the complete stream so that we can add the sub
+ * transaction info to this. On stream stop we will flush this information
+ * to the subxact file and reset the logical streaming context.
+ */
+ oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
+ subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
+ sizeof(SubXactInfo));
+ MemoryContextSwitchTo(oldctx);
+
+ if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from streaming transaction's subxact file \"%s\": %m",
+ path)));
+
+ BufFileClose(fd);
+}
+
+/*
+ * subxact_info_add
+ * Add information about a subxact (offset in the main file).
+ */
+static void
+subxact_info_add(TransactionId xid)
+{
+ SubXactInfo *subxacts = subxact_data.subxacts;
+ int64 i;
+
+ /* We must have a valid top level stream xid and a stream fd. */
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ /*
+ * If the XID matches the toplevel transaction, we don't want to add it.
+ */
+ if (stream_xid == xid)
+ return;
+
+ /*
+ * In most cases we're checking the same subxact as we've already seen in
+ * the last call, so make sure to ignore it (this change comes later).
+ */
+ if (subxact_data.subxact_last == xid)
+ return;
+
+ /* OK, remember we're processing this XID. */
+ subxact_data.subxact_last = xid;
+
+ /*
+ * Check if the transaction is already present in the array of subxact. We
+ * intentionally scan the array from the tail, because we're likely adding
+ * a change for the most recent subtransactions.
+ *
+ * XXX Can we rely on the subxact XIDs arriving in sorted order? That
+ * would allow us to use binary search here.
+ */
+ for (i = subxact_data.nsubxacts; i > 0; i--)
+ {
+ /* found, so we're done */
+ if (subxacts[i - 1].xid == xid)
+ return;
+ }
+
+ /* This is a new subxact, so we need to add it to the array. */
+ if (subxact_data.nsubxacts == 0)
+ {
+ MemoryContext oldctx;
+
+ subxact_data.nsubxacts_max = 128;
+
+ /*
+ * Allocate this memory for subxacts in per-stream context, see
+ * subxact_info_read.
+ */
+ oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
+ subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+ MemoryContextSwitchTo(oldctx);
+ }
+ else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
+ {
+ subxact_data.nsubxacts_max *= 2;
+ subxacts = repalloc(subxacts,
+ subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+ }
+
+ subxacts[subxact_data.nsubxacts].xid = xid;
+
+ /*
+ * Get the current offset of the stream file and store it as offset of
+ * this subxact.
+ */
+ BufFileTell(stream_fd,
+ &subxacts[subxact_data.nsubxacts].fileno,
+ &subxacts[subxact_data.nsubxacts].offset);
+
+ subxact_data.nsubxacts++;
+ subxact_data.subxacts = subxacts;
+}
+
+/* format filename for file containing the info about subxacts */
+static inline void
+subxact_filename(char *path, Oid subid, TransactionId xid)
+{
+ snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
+}
+
+/* format filename for file containing serialized changes */
+static inline void
+changes_filename(char *path, Oid subid, TransactionId xid)
+{
+ snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
+}
+
+/*
+ * stream_cleanup_files
+ * Cleanup files for a subscription / toplevel transaction.
+ *
+ * Remove files with serialized changes and subxact info for a particular
+ * toplevel transaction. Each subscription has a separate set of files.
+ */
+static void
+stream_cleanup_files(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ StreamXidHash *ent;
+
+ /* Find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
+
+ /* Delete the change file and release the stream fileset memory */
+ changes_filename(path, subid, xid);
+ SharedFileSetDeleteAll(ent->stream_fileset);
+ pfree(ent->stream_fileset);
+ ent->stream_fileset = NULL;
+
+ /* Delete the subxact file and release the memory, if it exist */
+ if (ent->subxact_fileset)
+ {
+ subxact_filename(path, subid, xid);
+ SharedFileSetDeleteAll(ent->subxact_fileset);
+ pfree(ent->subxact_fileset);
+ ent->subxact_fileset = NULL;
+ }
+
+ /* Remove the xid entry from the stream xid hash */
+ hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+}
+
+/*
+ * stream_open_file
+ * Open a file that we'll use to serialize changes for a toplevel
+ * transaction.
+ *
+ * Open a file for streamed changes from a toplevel transaction identified
+ * by stream_xid (global variable). If it's the first chunk of streamed
+ * changes for this transaction, initialize the shared fileset and create the
+ * buffile, otherwise open the previously created file.
+ *
+ * This can only be called at the beginning of a "streaming" block, i.e.
+ * between stream_start/stream_stop messages from the upstream.
+ */
+static void
+stream_open_file(Oid subid, TransactionId xid, bool first_segment)
+{
+ char path[MAXPGPATH];
+ bool found;
+ MemoryContext oldcxt;
+ StreamXidHash *ent;
+
+ Assert(in_streamed_transaction);
+ Assert(OidIsValid(subid));
+ Assert(TransactionIdIsValid(xid));
+ Assert(stream_fd == NULL);
+
+ /* create or find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_ENTER,
+ &found);
+
+ changes_filename(path, subid, xid);
+ elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
+
+ /*
+ * Create/open the buffiles under the logical streaming context so that we
+ * have those files until stream stop.
+ */
+ oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
+
+ /*
+ * If this is the first streamed segment, the file must not exist, so make
+ * sure we're the ones creating it. Otherwise just open the file for
+ * writing, in append mode.
+ */
+ if (first_segment)
+ {
+ MemoryContext savectx;
+ SharedFileSet *fileset;
+
+ if (found)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
+ /*
+ * We need to maintain shared fileset across multiple stream
+ * start/stop calls. So, need to allocate it in a persistent context.
+ */
+ savectx = MemoryContextSwitchTo(ApplyContext);
+ fileset = palloc(sizeof(SharedFileSet));
+
+ SharedFileSetInit(fileset, NULL);
+ MemoryContextSwitchTo(savectx);
+
+ stream_fd = BufFileCreateShared(fileset, path);
+
+ /* Remember the fileset for the next stream of the same transaction */
+ ent->xid = xid;
+ ent->stream_fileset = fileset;
+ ent->subxact_fileset = NULL;
+ }
+ else
+ {
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
+ /*
+ * Open the file and seek to the end of the file because we always
+ * append the changes file.
+ */
+ stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ BufFileSeek(stream_fd, 0, 0, SEEK_END);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * stream_close_file
+ * Close the currently open file with streamed changes.
+ *
+ * This can only be called at the end of a streaming block, i.e. at stream_stop
+ * message from the upstream.
+ */
+static void
+stream_close_file(void)
+{
+ Assert(in_streamed_transaction);
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ BufFileClose(stream_fd);
+
+ stream_xid = InvalidTransactionId;
+ stream_fd = NULL;
+}
+
+/*
+ * stream_write_change
+ * Serialize a change to a file for the current toplevel transaction.
+ *
+ * The change is serialized in a simple format, with length (not including
+ * the length), action code (identifying the message type) and message
+ * contents (without the subxact TransactionId value).
+ */
+static void
+stream_write_change(char action, StringInfo s)
+{
+ int len;
+
+ Assert(in_streamed_transaction);
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ /* total on-disk size, including the action type character */
+ len = (s->len - s->cursor) + sizeof(char);
+
+ /* first write the size */
+ BufFileWrite(stream_fd, &len, sizeof(len));
+
+ /* then the action */
+ BufFileWrite(stream_fd, &action, sizeof(action));
+
+ /* and finally the remaining part of the buffer (after the XID) */
+ len = (s->len - s->cursor);
+
+ BufFileWrite(stream_fd, &s->data[s->cursor], len);
+}
+
+/*
+ * Cleanup the memory for subxacts and reset the related variables.
+ */
+static inline void
+cleanup_subxact_info()
+{
+ if (subxact_data.subxacts)
+ pfree(subxact_data.subxacts);
+
+ subxact_data.subxacts = NULL;
+ subxact_data.subxact_last = InvalidTransactionId;
+ subxact_data.nsubxacts = 0;
+ subxact_data.nsubxacts_max = 0;
+}
+
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+ MemoryContext oldctx;
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos;
+ char *myslotname;
+ WalRcvStreamOptions options;
+
+ /* Attach to slot */
+ logicalrep_worker_attach(worker_slot);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ /*
+ * We don't currently need any ResourceOwner in a walreceiver process, but
+ * if we did, we could call CreateAuxProcessResourceOwner here.
+ */
+
+ /* Initialise stats to a sanish value */
+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ /* Run as replica session replication role. */
+ SetConfigOption("session_replication_role", "replica",
+ PGC_SUSET, PGC_S_OVERRIDE);
+
+ /* Connect to our database. */
+ BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
+ MyLogicalRepWorker->userid,
+ 0);
+
+ /*
+ * Set always-secure search path, so malicious users can't redirect user
+ * code (e.g. pg_index.indexprs).
+ */
+ SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /* Load the subscription into persistent memory context. */
+ ApplyContext = AllocSetContextCreate(TopMemoryContext,
+ "ApplyContext",
+ ALLOCSET_DEFAULT_SIZES);
+ StartTransactionCommand();
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+ if (!MySubscription)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
+ proc_exit(0);
+ }
+
+ MySubscriptionValid = true;
+ MemoryContextSwitchTo(oldctx);
+
+ if (!MySubscription->enabled)
+ {
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will not "
+ "start because the subscription was disabled during startup",
+ MySubscription->name)));
+
+ proc_exit(0);
+ }
+
+ /* Setup synchronous commit according to the user's wishes */
+ SetConfigOption("synchronous_commit", MySubscription->synccommit,
+ PGC_BACKEND, PGC_S_OVERRIDE);
+
+ /* Keep us informed about subscription changes. */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
+ subscription_change_cb,
+ (Datum) 0);
+
+ if (am_tablesync_worker())
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+ MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" has started",
+ MySubscription->name)));
+
+ CommitTransactionCommand();
+
+ /* Connect to the origin and start the replication. */
+ elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
+ MySubscription->conninfo);
+
+ if (am_tablesync_worker())
+ {
+ char *syncslotname;
+
+ /* This is table synchronization worker, call initial sync. */
+ syncslotname = LogicalRepSyncTableStart(&origin_startpos);
+
+ /* allocate slot name in long-lived context */
+ myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+
+ pfree(syncslotname);
+ }
+ else
+ {
+ /* This is main apply worker */
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char *err;
+
+ myslotname = MySubscription->slotname;
+
+ /*
+ * This shouldn't happen if the subscription is enabled, but guard
+ * against DDL bugs or manual catalog changes. (libpqwalreceiver will
+ * crash if slot is NULL.)
+ */
+ if (!myslotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription has no replication slot set")));
+
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
+ originid = replorigin_by_name(originname, true);
+ if (!OidIsValid(originid))
+ originid = replorigin_create(originname);
+ replorigin_session_setup(originid);
+ replorigin_session_origin = originid;
+ origin_startpos = replorigin_session_get_progress(false);
+ CommitTransactionCommand();
+
+ LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+ MySubscription->name, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
+
+ /*
+ * We don't really use the output identify_system for anything but it
+ * does some initializations on the upstream so let's still call it.
+ */
+ (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+ }
+
+ /*
+ * Setup callback for syscache so that we know when something changes in
+ * the subscription relation state.
+ */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+ invalidate_syncing_table_states,
+ (Datum) 0);
+
+ /* Build logical replication streaming options. */
+ options.logical = true;
+ options.startpoint = origin_startpos;
+ options.slotname = myslotname;
+ options.proto.logical.proto_version =
+ walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
+ LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
+ options.proto.logical.publication_names = MySubscription->publications;
+ options.proto.logical.binary = MySubscription->binary;
+ options.proto.logical.streaming = MySubscription->stream;
+
+ /* Start normal logical streaming replication. */
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+
+ /* Run the main loop. */
+ LogicalRepApplyLoop(origin_startpos);
+
+ proc_exit(0);
+}
+
+/*
+ * Is current process a logical replication worker?
+ */
+bool
+IsLogicalWorker(void)
+{
+ return MyLogicalRepWorker != NULL;
+}