diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 3254 |
1 files changed, 3254 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c new file mode 100644 index 0000000..8c9a4b5 --- /dev/null +++ b/src/backend/replication/logical/worker.c @@ -0,0 +1,3254 @@ +/*------------------------------------------------------------------------- + * worker.c + * PostgreSQL logical replication worker (apply) + * + * Copyright (c) 2016-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/worker.c + * + * NOTES + * This file contains the worker which applies logical changes as they come + * from remote logical replication stream. + * + * The main worker (apply) is started by logical replication worker + * launcher for every enabled subscription in a database. It uses + * walsender protocol to communicate with publisher. + * + * This module includes server facing code and shares libpqwalreceiver + * module with walreceiver for providing the libpq specific functionality. + * + * + * STREAMED TRANSACTIONS + * --------------------- + * Streamed transactions (large transactions exceeding a memory limit on the + * upstream) are not applied immediately, but instead, the data is written + * to temporary files and then applied at once when the final commit arrives. + * + * Unlike the regular (non-streamed) case, handling streamed transactions has + * to handle aborts of both the toplevel transaction and subtransactions. This + * is achieved by tracking offsets for subtransactions, which is then used + * to truncate the file with serialized changes. + * + * The files are placed in tmp file directory by default, and the filenames + * include both the XID of the toplevel transaction and OID of the + * subscription. This is necessary so that different workers processing a + * remote transaction with the same XID doesn't interfere. + * + * We use BufFiles instead of using normal temporary files because (a) the + * BufFile infrastructure supports temporary files that exceed the OS file size + * limit, (b) provides a way for automatic clean up on the error and (c) provides + * a way to survive these files across local transactions and allow to open and + * close at stream start and close. We decided to use SharedFileSet + * infrastructure as without that it deletes the files on the closure of the + * file and if we decide to keep stream files open across the start/stop stream + * then it will consume a lot of memory (more than 8K for each BufFile and + * there could be multiple such BufFiles as the subscriber could receive + * multiple start/stop streams for different transactions before getting the + * commit). Moreover, if we don't use SharedFileSet then we also need to invent + * a new way to pass filenames to BufFile APIs so that we are allowed to open + * the file we desired across multiple stream-open calls for the same + * transaction. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <sys/stat.h> +#include <unistd.h> + +#include "access/table.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "access/xlog_internal.h" +#include "catalog/catalog.h" +#include "catalog/namespace.h" +#include "catalog/partition.h" +#include "catalog/pg_inherits.h" +#include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" +#include "catalog/pg_tablespace.h" +#include "commands/tablecmds.h" +#include "commands/tablespace.h" +#include "commands/trigger.h" +#include "executor/executor.h" +#include "executor/execPartition.h" +#include "executor/nodeModifyTable.h" +#include "funcapi.h" +#include "libpq/pqformat.h" +#include "libpq/pqsignal.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" +#include "postmaster/walwriter.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/logicalproto.h" +#include "replication/logicalrelation.h" +#include "replication/logicalworker.h" +#include "replication/origin.h" +#include "replication/reorderbuffer.h" +#include "replication/snapbuild.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "rewrite/rewriteHandler.h" +#include "storage/buffile.h" +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/catcache.h" +#include "utils/dynahash.h" +#include "utils/datum.h" +#include "utils/fmgroids.h" +#include "utils/guc.h" +#include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/syscache.h" +#include "utils/timeout.h" + +#define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */ + +typedef struct FlushPosition +{ + dlist_node node; + XLogRecPtr local_end; + XLogRecPtr remote_end; +} FlushPosition; + +static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); + +typedef struct SlotErrCallbackArg +{ + LogicalRepRelMapEntry *rel; + int remote_attnum; +} SlotErrCallbackArg; + +typedef struct ApplyExecutionData +{ + EState *estate; /* executor state, used to track resources */ + + LogicalRepRelMapEntry *targetRel; /* replication target rel */ + ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */ + + /* These fields are used when the target relation is partitioned: */ + ModifyTableState *mtstate; /* dummy ModifyTable state */ + PartitionTupleRouting *proute; /* partition routing info */ +} ApplyExecutionData; + +/* + * Stream xid hash entry. Whenever we see a new xid we create this entry in the + * xidhash and along with it create the streaming file and store the fileset handle. + * The subxact file is created iff there is any subxact info under this xid. This + * entry is used on the subsequent streams for the xid to get the corresponding + * fileset handles, so storing them in hash makes the search faster. + */ +typedef struct StreamXidHash +{ + TransactionId xid; /* xid is the hash key and must be first */ + SharedFileSet *stream_fileset; /* shared file set for stream data */ + SharedFileSet *subxact_fileset; /* shared file set for subxact info */ +} StreamXidHash; + +static MemoryContext ApplyMessageContext = NULL; +MemoryContext ApplyContext = NULL; + +/* per stream context for streaming transactions */ +static MemoryContext LogicalStreamingContext = NULL; + +WalReceiverConn *LogRepWorkerWalRcvConn = NULL; + +Subscription *MySubscription = NULL; +bool MySubscriptionValid = false; + +bool in_remote_transaction = false; +static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; + +/* fields valid only when processing streamed transaction */ +static bool in_streamed_transaction = false; + +static TransactionId stream_xid = InvalidTransactionId; + +/* + * Hash table for storing the streaming xid information along with shared file + * set for streaming and subxact files. + */ +static HTAB *xidhash = NULL; + +/* BufFile handle of the current streaming file */ +static BufFile *stream_fd = NULL; + +typedef struct SubXactInfo +{ + TransactionId xid; /* XID of the subxact */ + int fileno; /* file number in the buffile */ + off_t offset; /* offset in the file */ +} SubXactInfo; + +/* Sub-transaction data for the current streaming transaction */ +typedef struct ApplySubXactData +{ + uint32 nsubxacts; /* number of sub-transactions */ + uint32 nsubxacts_max; /* current capacity of subxacts */ + TransactionId subxact_last; /* xid of the last sub-transaction */ + SubXactInfo *subxacts; /* sub-xact offset in changes file */ +} ApplySubXactData; + +static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; + +static inline void subxact_filename(char *path, Oid subid, TransactionId xid); +static inline void changes_filename(char *path, Oid subid, TransactionId xid); + +/* + * Information about subtransactions of a given toplevel transaction. + */ +static void subxact_info_write(Oid subid, TransactionId xid); +static void subxact_info_read(Oid subid, TransactionId xid); +static void subxact_info_add(TransactionId xid); +static inline void cleanup_subxact_info(void); + +/* + * Serialize and deserialize changes for a toplevel transaction. + */ +static void stream_cleanup_files(Oid subid, TransactionId xid); +static void stream_open_file(Oid subid, TransactionId xid, bool first); +static void stream_write_change(char action, StringInfo s); +static void stream_close_file(void); + +static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); + +static void store_flush_position(XLogRecPtr remote_lsn); + +static void maybe_reread_subscription(void); + +/* prototype needed because of stream_commit */ +static void apply_dispatch(StringInfo s); + +static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); +static void apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); +static void apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup); +static void apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); +static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, + LogicalRepRelation *remoterel, + TupleTableSlot *remoteslot, + TupleTableSlot **localslot); +static void apply_handle_tuple_routing(ApplyExecutionData *edata, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + CmdType operation); + +/* + * Should this worker apply changes for given relation. + * + * This is mainly needed for initial relation data sync as that runs in + * separate worker process running in parallel and we need some way to skip + * changes coming to the main apply worker during the sync of a table. + * + * Note we need to do smaller or equals comparison for SYNCDONE state because + * it might hold position of end of initial slot consistent point WAL + * record + 1 (ie start of next record) and next record can be COMMIT of + * transaction we are now processing (which is what we set remote_final_lsn + * to in apply_handle_begin). + */ +static bool +should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) +{ + if (am_tablesync_worker()) + return MyLogicalRepWorker->relid == rel->localreloid; + else + return (rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn <= remote_final_lsn)); +} + +/* + * Begin one step (one INSERT, UPDATE, etc) of a replication transaction. + * + * Start a transaction, if this is the first step (else we keep using the + * existing transaction). + * Also provide a global snapshot and ensure we run in ApplyMessageContext. + */ +static void +begin_replication_step(void) +{ + SetCurrentStatementStartTimestamp(); + + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + PushActiveSnapshot(GetTransactionSnapshot()); + + MemoryContextSwitchTo(ApplyMessageContext); +} + +/* + * Finish up one step of a replication transaction. + * Callers of begin_replication_step() must also call this. + * + * We don't close out the transaction here, but we should increment + * the command counter to make the effects of this step visible. + */ +static void +end_replication_step(void) +{ + PopActiveSnapshot(); + + CommandCounterIncrement(); +} + +/* + * Handle streamed transactions. + * + * If in streaming mode (receiving a block of streamed transaction), we + * simply redirect it to a file for the proper toplevel transaction. + * + * Returns true for streamed transactions, false otherwise (regular mode). + */ +static bool +handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) +{ + TransactionId xid; + + /* not in streaming mode */ + if (!in_streamed_transaction) + return false; + + Assert(stream_fd != NULL); + Assert(TransactionIdIsValid(stream_xid)); + + /* + * We should have received XID of the subxact as the first part of the + * message, so extract it. + */ + xid = pq_getmsgint(s, 4); + + if (!TransactionIdIsValid(xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid transaction ID in streamed replication transaction"))); + + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(xid); + + /* write the change to the current file */ + stream_write_change(action, s); + + return true; +} + +/* + * Executor state preparation for evaluation of constraint expressions, + * indexes and triggers for the specified relation. + * + * Note that the caller must open and close any indexes to be updated. + */ +static ApplyExecutionData * +create_edata_for_relation(LogicalRepRelMapEntry *rel) +{ + ApplyExecutionData *edata; + EState *estate; + RangeTblEntry *rte; + ResultRelInfo *resultRelInfo; + + edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData)); + edata->targetRel = rel; + + edata->estate = estate = CreateExecutorState(); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel->localrel); + rte->relkind = rel->localrel->rd_rel->relkind; + rte->rellockmode = AccessShareLock; + ExecInitRangeTable(estate, list_make1(rte)); + + edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo); + + /* + * Use Relation opened by logicalrep_rel_open() instead of opening it + * again. + */ + InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0); + + /* + * We put the ResultRelInfo in the es_opened_result_relations list, even + * though we don't populate the es_result_relations array. That's a bit + * bogus, but it's enough to make ExecGetTriggerResultRel() find them. + * + * ExecOpenIndices() is not called here either, each execution path doing + * an apply operation being responsible for that. + */ + estate->es_opened_result_relations = + lappend(estate->es_opened_result_relations, resultRelInfo); + + estate->es_output_cid = GetCurrentCommandId(true); + + /* Prepare to catch AFTER triggers. */ + AfterTriggerBeginQuery(); + + /* other fields of edata remain NULL for now */ + + return edata; +} + +/* + * Finish any operations related to the executor state created by + * create_edata_for_relation(). + */ +static void +finish_edata(ApplyExecutionData *edata) +{ + EState *estate = edata->estate; + + /* Handle any queued AFTER triggers. */ + AfterTriggerEndQuery(estate); + + /* Shut down tuple routing, if any was done. */ + if (edata->proute) + ExecCleanupTupleRouting(edata->mtstate, edata->proute); + + /* + * Cleanup. It might seem that we should call ExecCloseResultRelations() + * here, but we intentionally don't. It would close the rel we added to + * es_opened_result_relations above, which is wrong because we took no + * corresponding refcount. We rely on ExecCleanupTupleRouting() to close + * any other relations opened during execution. + */ + ExecResetTupleTable(estate->es_tupleTable, false); + FreeExecutorState(estate); + pfree(edata); +} + +/* + * Executes default values for columns for which we can't map to remote + * relation columns. + * + * This allows us to support tables which have more columns on the downstream + * than on the upstream. + */ +static void +slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, + TupleTableSlot *slot) +{ + TupleDesc desc = RelationGetDescr(rel->localrel); + int num_phys_attrs = desc->natts; + int i; + int attnum, + num_defaults = 0; + int *defmap; + ExprState **defexprs; + ExprContext *econtext; + + econtext = GetPerTupleExprContext(estate); + + /* We got all the data via replication, no need to evaluate anything. */ + if (num_phys_attrs == rel->remoterel.natts) + return; + + defmap = (int *) palloc(num_phys_attrs * sizeof(int)); + defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); + + Assert(rel->attrmap->maplen == num_phys_attrs); + for (attnum = 0; attnum < num_phys_attrs; attnum++) + { + Expr *defexpr; + + if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated) + continue; + + if (rel->attrmap->attnums[attnum] >= 0) + continue; + + defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1); + + if (defexpr != NULL) + { + /* Run the expression through planner */ + defexpr = expression_planner(defexpr); + + /* Initialize executable expression in copycontext */ + defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); + defmap[num_defaults] = attnum; + num_defaults++; + } + + } + + for (i = 0; i < num_defaults; i++) + slot->tts_values[defmap[i]] = + ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]); +} + +/* + * Error callback to give more context info about data conversion failures + * while reading data from the remote server. + */ +static void +slot_store_error_callback(void *arg) +{ + SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; + LogicalRepRelMapEntry *rel; + + /* Nothing to do if remote attribute number is not set */ + if (errarg->remote_attnum < 0) + return; + + rel = errarg->rel; + errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\"", + rel->remoterel.nspname, rel->remoterel.relname, + rel->remoterel.attnames[errarg->remote_attnum]); +} + +/* + * Store tuple data into slot. + * + * Incoming data can be either text or binary format. + */ +static void +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) +{ + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; + + ExecClearTuple(slot); + + /* Push callback + info on the error context stack */ + errarg.rel = rel; + errarg.remote_attnum = -1; + errcallback.callback = slot_store_error_callback; + errcallback.arg = (void *) &errarg; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Call the "in" function for each non-dropped, non-null attribute */ + Assert(natts == rel->attrmap->maplen); + for (i = 0; i < natts; i++) + { + Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); + int remoteattnum = rel->attrmap->attnums[i]; + + if (!att->attisdropped && remoteattnum >= 0) + { + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; + + Assert(remoteattnum < tupleData->ncols); + + errarg.remote_attnum = remoteattnum; + + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* + * NULL value from remote. (We don't expect to see + * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as + * NULL.) + */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + + errarg.remote_attnum = -1; + } + else + { + /* + * We assign NULL to dropped attributes and missing values + * (missing values should be later filled using + * slot_fill_defaults). + */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + } + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + ExecStoreVirtualTuple(slot); +} + +/* + * Replace updated columns with data from the LogicalRepTupleData struct. + * This is somewhat similar to heap_modify_tuple but also calls the type + * input functions on the user data. + * + * "slot" is filled with a copy of the tuple in "srcslot", replacing + * columns provided in "tupleData" and leaving others as-is. + * + * Caution: unreplaced pass-by-ref columns in "slot" will point into the + * storage for "srcslot". This is OK for current usage, but someday we may + * need to materialize "slot" at the end to make it independent of "srcslot". + */ +static void +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) +{ + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; + + /* We'll fill "slot" with a virtual tuple, so we must start with ... */ + ExecClearTuple(slot); + + /* + * Copy all the column data from srcslot, so that we'll have valid values + * for unreplaced columns. + */ + Assert(natts == srcslot->tts_tupleDescriptor->natts); + slot_getallattrs(srcslot); + memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum)); + memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool)); + + /* For error reporting, push callback + info on the error context stack */ + errarg.rel = rel; + errarg.remote_attnum = -1; + errcallback.callback = slot_store_error_callback; + errcallback.arg = (void *) &errarg; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Call the "in" function for each replaced attribute */ + Assert(natts == rel->attrmap->maplen); + for (i = 0; i < natts; i++) + { + Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i); + int remoteattnum = rel->attrmap->attnums[i]; + + if (remoteattnum < 0) + continue; + + Assert(remoteattnum < tupleData->ncols); + + if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) + { + StringInfo colvalue = &tupleData->colvalues[remoteattnum]; + + errarg.remote_attnum = remoteattnum; + + if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT) + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, colvalue->data, + typioparam, att->atttypmod); + slot->tts_isnull[i] = false; + } + else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY) + { + Oid typreceive; + Oid typioparam; + + /* + * In some code paths we may be asked to re-parse the same + * tuple data. Reset the StringInfo's cursor so that works. + */ + colvalue->cursor = 0; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, colvalue, + typioparam, att->atttypmod); + + /* Trouble if it didn't eat the whole buffer */ + if (colvalue->cursor != colvalue->len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format in logical replication column %d", + remoteattnum + 1))); + slot->tts_isnull[i] = false; + } + else + { + /* must be LOGICALREP_COLUMN_NULL */ + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + + errarg.remote_attnum = -1; + } + } + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + /* And finally, declare that "slot" contains a valid virtual tuple */ + ExecStoreVirtualTuple(slot); +} + +/* + * Handle BEGIN message. + */ +static void +apply_handle_begin(StringInfo s) +{ + LogicalRepBeginData begin_data; + + logicalrep_read_begin(s, &begin_data); + + remote_final_lsn = begin_data.final_lsn; + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle COMMIT message. + * + * TODO, support tracking of multiple origins + */ +static void +apply_handle_commit(StringInfo s) +{ + LogicalRepCommitData commit_data; + + logicalrep_read_commit(s, &commit_data); + + if (commit_data.commit_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", + LSN_FORMAT_ARGS(commit_data.commit_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); + + apply_handle_commit_internal(&commit_data); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle ORIGIN message. + * + * TODO, support tracking of multiple origins + */ +static void +apply_handle_origin(StringInfo s) +{ + /* + * ORIGIN message can only come inside streaming transaction or inside + * remote transaction and before any actual writes. + */ + if (!in_streamed_transaction && + (!in_remote_transaction || + (IsTransactionState() && !am_tablesync_worker()))) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("ORIGIN message sent out of order"))); +} + +/* + * Handle STREAM START message. + */ +static void +apply_handle_stream_start(StringInfo s) +{ + bool first_segment; + HASHCTL hash_ctl; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("duplicate STREAM START message"))); + + /* + * Start a transaction on stream start, this transaction will be committed + * on the stream stop unless it is a tablesync worker in which case it + * will be committed after processing all the messages. We need the + * transaction for handling the buffile, used for serializing the + * streaming data and subxact info. + */ + begin_replication_step(); + + /* notify handle methods we're processing a remote transaction */ + in_streamed_transaction = true; + + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + + if (!TransactionIdIsValid(stream_xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid transaction ID in streamed replication transaction"))); + + /* + * Initialize the xidhash table if we haven't yet. This will be used for + * the entire duration of the apply worker so create it in permanent + * context. + */ + if (xidhash == NULL) + { + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(StreamXidHash); + hash_ctl.hcxt = ApplyContext; + xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + /* open the spool file for this transaction */ + stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + + /* if this is not the first segment, open existing subxact file */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + + pgstat_report_activity(STATE_RUNNING, NULL); + + end_replication_step(); +} + +/* + * Handle STREAM STOP message. + */ +static void +apply_handle_stream_stop(StringInfo s) +{ + if (!in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM STOP message without STREAM START"))); + + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, stream_xid); + stream_close_file(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + in_streamed_transaction = false; + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle STREAM abort message. + */ +static void +apply_handle_stream_abort(StringInfo s) +{ + TransactionId xid; + TransactionId subxid; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM ABORT message without STREAM STOP"))); + + logicalrep_read_stream_abort(s, &xid, &subxid); + + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, so + * just delete the files with serialized info. + */ + if (xid == subxid) + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + else + { + /* + * OK, so it's a subxact. We need to read the subxact file for the + * toplevel transaction, determine the offset tracked for the subxact, + * and truncate the file with changes. We also remove the subxacts + * with higher offsets (or rather higher XIDs). + * + * We intentionally scan the array from the tail, because we're likely + * aborting a change for the most recent subtransactions. + * + * We can't use the binary search here as subxact XIDs won't + * necessarily arrive in sorted order, consider the case where we have + * released the savepoint for multiple subtransactions and then + * performed rollback to savepoint for one of the earlier + * sub-transaction. + */ + int64 i; + int64 subidx; + BufFile *fd; + bool found = false; + char path[MAXPGPATH]; + StreamXidHash *ent; + + subidx = -1; + begin_replication_step(); + subxact_info_read(MyLogicalRepWorker->subid, xid); + + for (i = subxact_data.nsubxacts; i > 0; i--) + { + if (subxact_data.subxacts[i - 1].xid == subxid) + { + subidx = (i - 1); + found = true; + break; + } + } + + /* + * If it's an empty sub-transaction then we will not find the subxid + * here so just cleanup the subxact info and return. + */ + if (!found) + { + /* Cleanup the subxact info */ + cleanup_subxact_info(); + end_replication_step(); + CommitTransactionCommand(); + return; + } + + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); + + /* open the changes file */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + + /* OK, truncate the file at the right offset */ + BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno, + subxact_data.subxacts[subidx].offset); + BufFileClose(fd); + + /* discard the subxacts added later */ + subxact_data.nsubxacts = subidx; + + /* write the updated subxact list */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + + end_replication_step(); + CommitTransactionCommand(); + } +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + LogicalRepCommitData commit_data; + StreamXidHash *ent; + MemoryContext oldcxt; + BufFile *fd; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + /* Make sure we have an open transaction */ + begin_replication_step(); + + /* + * Allocate file handle and memory required to process all the messages in + * TopTransactionContext to avoid them getting reset after each message is + * processed. + */ + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + /* open the spool file for the committed transaction */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + elog(DEBUG1, "replaying changes from file \"%s\"", path); + + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); + + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); + + buffer = palloc(BLCKSZ); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + remote_final_lsn = commit_data.commit_lsn; + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + end_replication_step(); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + int nbytes; + int len; + + CHECK_FOR_INTERRUPTS(); + + /* read length of the on-disk record */ + nbytes = BufFileRead(fd, &len, sizeof(len)); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (nbytes != sizeof(len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file \"%s\": %m", + path))); + + if (len <= 0) + elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"", + len, path); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + if (BufFileRead(fd, buffer, len) != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file \"%s\": %m", + path))); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file \"%s\"", + nchanges, path); + } + + BufFileClose(fd); + + pfree(buffer); + pfree(s2.data); + + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", + nchanges, path); + + apply_handle_commit_internal(&commit_data); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Helper function for apply_handle_commit and apply_handle_stream_commit. + */ +static void +apply_handle_commit_internal(LogicalRepCommitData *commit_data) +{ + if (IsTransactionState()) + { + /* + * Update origin state so we can restart streaming from correct + * position in case of crash. + */ + replorigin_session_origin_lsn = commit_data->end_lsn; + replorigin_session_origin_timestamp = commit_data->committime; + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data->end_lsn); + } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } + + in_remote_transaction = false; +} + +/* + * Handle RELATION message. + * + * Note we don't do validation against local schema here. The validation + * against local schema is postponed until first change for given relation + * comes as we only care about it when applying changes for it anyway and we + * do less locking this way. + */ +static void +apply_handle_relation(StringInfo s) +{ + LogicalRepRelation *rel; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) + return; + + rel = logicalrep_read_rel(s); + logicalrep_relmap_update(rel); + + /* Also reset all entries in the partition map that refer to remoterel. */ + logicalrep_partmap_reset_relmap(rel); +} + +/* + * Handle TYPE message. + * + * This implementation pays no attention to TYPE messages; we expect the user + * to have set things up so that the incoming data is acceptable to the input + * functions for the locally subscribed tables. Hence, we just read and + * discard the message. + */ +static void +apply_handle_type(StringInfo s) +{ + LogicalRepTyp typ; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) + return; + + logicalrep_read_typ(s, &typ); +} + +/* + * Get replica identity index or if it is not defined a primary key. + * + * If neither is defined, returns InvalidOid + */ +static Oid +GetRelationIdentityOrPK(Relation rel) +{ + Oid idxoid; + + idxoid = RelationGetReplicaIndex(rel); + + if (!OidIsValid(idxoid)) + idxoid = RelationGetPrimaryKeyIndex(rel); + + return idxoid; +} + +/* + * Handle INSERT message. + */ + +static void +apply_handle_insert(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepTupleData newtup; + LogicalRepRelId relid; + ApplyExecutionData *edata; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + return; + + begin_replication_step(); + + relid = logicalrep_read_insert(s, &newtup); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return; + } + + /* Initialize the executor state. */ + edata = create_edata_for_relation(rel); + estate = edata->estate; + remoteslot = ExecInitExtraTupleSlot(estate, + RelationGetDescr(rel->localrel), + &TTSOpsVirtual); + + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, rel, &newtup); + slot_fill_defaults(rel, estate, remoteslot); + MemoryContextSwitchTo(oldctx); + + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_INSERT); + else + apply_handle_insert_internal(edata, edata->targetRelInfo, + remoteslot); + + finish_edata(edata); + + logicalrep_rel_close(rel, NoLock); + + end_replication_step(); +} + +/* + * Workhorse for apply_handle_insert() + * relinfo is for the relation we're actually inserting into + * (could be a child partition of edata->targetRelInfo) + */ +static void +apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot) +{ + EState *estate = edata->estate; + + /* We must open indexes here. */ + ExecOpenIndices(relinfo, false); + + /* Do the insert. */ + ExecSimpleRelationInsert(relinfo, estate, remoteslot); + + /* Cleanup. */ + ExecCloseIndices(relinfo); +} + +/* + * Check if the logical replication relation is updatable and throw + * appropriate error if it isn't. + */ +static void +check_relation_updatable(LogicalRepRelMapEntry *rel) +{ + /* + * For partitioned tables, we only need to care if the target partition is + * updatable (aka has PK or RI defined for it). + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + /* Updatable, no error. */ + if (rel->updatable) + return; + + /* + * We are in error mode so it's fine this is somewhat slow. It's better to + * give user correct error. + */ + if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publisher did not send replica identity column " + "expected by the logical replication target relation \"%s.%s\"", + rel->remoterel.nspname, rel->remoterel.relname))); + } + + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication target relation \"%s.%s\" has " + "neither REPLICA IDENTITY index nor PRIMARY " + "KEY and published relation does not have " + "REPLICA IDENTITY FULL", + rel->remoterel.nspname, rel->remoterel.relname))); +} + +/* + * Handle UPDATE message. + * + * TODO: FDW support + */ +static void +apply_handle_update(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepRelId relid; + ApplyExecutionData *edata; + EState *estate; + LogicalRepTupleData oldtup; + LogicalRepTupleData newtup; + bool has_oldtup; + TupleTableSlot *remoteslot; + RangeTblEntry *target_rte; + MemoryContext oldctx; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + return; + + begin_replication_step(); + + relid = logicalrep_read_update(s, &has_oldtup, &oldtup, + &newtup); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return; + } + + /* Check if we can do the update. */ + check_relation_updatable(rel); + + /* Initialize the executor state. */ + edata = create_edata_for_relation(rel); + estate = edata->estate; + remoteslot = ExecInitExtraTupleSlot(estate, + RelationGetDescr(rel->localrel), + &TTSOpsVirtual); + + /* + * Populate updatedCols so that per-column triggers can fire, and so + * executor can correctly pass down indexUnchanged hint. This could + * include more columns than were actually changed on the publisher + * because the logical replication protocol doesn't contain that + * information. But it would for example exclude columns that only exist + * on the subscriber, since we are not touching those. + */ + target_rte = list_nth(estate->es_range_table, 0); + for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i); + int remoteattnum = rel->attrmap->attnums[i]; + + if (!att->attisdropped && remoteattnum >= 0) + { + Assert(remoteattnum < newtup.ncols); + if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED) + target_rte->updatedCols = + bms_add_member(target_rte->updatedCols, + i + 1 - FirstLowInvalidHeapAttributeNumber); + } + } + + /* Also populate extraUpdatedCols, in case we have generated columns */ + fill_extraUpdatedCols(target_rte, rel->localrel); + + /* Build the search tuple. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); + MemoryContextSwitchTo(oldctx); + + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(edata, + remoteslot, &newtup, CMD_UPDATE); + else + apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup); + + finish_edata(edata); + + logicalrep_rel_close(rel, NoLock); + + end_replication_step(); +} + +/* + * Workhorse for apply_handle_update() + * relinfo is for the relation we're actually updating in + * (could be a child partition of edata->targetRelInfo) + */ +static void +apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup) +{ + EState *estate = edata->estate; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + Relation localrel = relinfo->ri_RelationDesc; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + ExecOpenIndices(relinfo, false); + + found = FindReplTupleInLocalRel(estate, localrel, + &relmapentry->remoterel, + remoteslot, &localslot); + ExecClearTuple(remoteslot); + + /* + * Tuple found. + * + * Note this will fail if there are other conflicting unique indexes. + */ + if (found) + { + /* Process and store remote tuple in the slot */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot, localslot, relmapentry, newtup); + MemoryContextSwitchTo(oldctx); + + EvalPlanQualSetSlot(&epqstate, remoteslot); + + /* Do the actual update. */ + ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot, + remoteslot); + } + else + { + /* + * The tuple to be updated could not be found. Do nothing except for + * emitting a log message. + * + * XXX should this be promoted to ereport(LOG) perhaps? + */ + elog(DEBUG1, + "logical replication did not find row to be updated " + "in replication target relation \"%s\"", + RelationGetRelationName(localrel)); + } + + /* Cleanup. */ + ExecCloseIndices(relinfo); + EvalPlanQualEnd(&epqstate); +} + +/* + * Handle DELETE message. + * + * TODO: FDW support + */ +static void +apply_handle_delete(StringInfo s) +{ + LogicalRepRelMapEntry *rel; + LogicalRepTupleData oldtup; + LogicalRepRelId relid; + ApplyExecutionData *edata; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + return; + + begin_replication_step(); + + relid = logicalrep_read_delete(s, &oldtup); + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + end_replication_step(); + return; + } + + /* Check if we can do the delete. */ + check_relation_updatable(rel); + + /* Initialize the executor state. */ + edata = create_edata_for_relation(rel); + estate = edata->estate; + remoteslot = ExecInitExtraTupleSlot(estate, + RelationGetDescr(rel->localrel), + &TTSOpsVirtual); + + /* Build the search tuple. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_store_data(remoteslot, rel, &oldtup); + MemoryContextSwitchTo(oldctx); + + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(edata, + remoteslot, NULL, CMD_DELETE); + else + apply_handle_delete_internal(edata, edata->targetRelInfo, + remoteslot); + + finish_edata(edata); + + logicalrep_rel_close(rel, NoLock); + + end_replication_step(); +} + +/* + * Workhorse for apply_handle_delete() + * relinfo is for the relation we're actually deleting from + * (could be a child partition of edata->targetRelInfo) + */ +static void +apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot) +{ + EState *estate = edata->estate; + Relation localrel = relinfo->ri_RelationDesc; + LogicalRepRelation *remoterel = &edata->targetRel->remoterel; + EPQState epqstate; + TupleTableSlot *localslot; + bool found; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + ExecOpenIndices(relinfo, false); + + found = FindReplTupleInLocalRel(estate, localrel, remoterel, + remoteslot, &localslot); + + /* If found delete it. */ + if (found) + { + EvalPlanQualSetSlot(&epqstate, localslot); + + /* Do the actual delete. */ + ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot); + } + else + { + /* + * The tuple to be deleted could not be found. Do nothing except for + * emitting a log message. + * + * XXX should this be promoted to ereport(LOG) perhaps? + */ + elog(DEBUG1, + "logical replication did not find row to be deleted " + "in replication target relation \"%s\"", + RelationGetRelationName(localrel)); + } + + /* Cleanup. */ + ExecCloseIndices(relinfo); + EvalPlanQualEnd(&epqstate); +} + +/* + * Try to find a tuple received from the publication side (in 'remoteslot') in + * the corresponding local relation using either replica identity index, + * primary key or if needed, sequential scan. + * + * Local tuple, if found, is returned in '*localslot'. + */ +static bool +FindReplTupleInLocalRel(EState *estate, Relation localrel, + LogicalRepRelation *remoterel, + TupleTableSlot *remoteslot, + TupleTableSlot **localslot) +{ + Oid idxoid; + bool found; + + *localslot = table_slot_create(localrel, &estate->es_tupleTable); + + idxoid = GetRelationIdentityOrPK(localrel); + Assert(OidIsValid(idxoid) || + (remoterel->replident == REPLICA_IDENTITY_FULL)); + + if (OidIsValid(idxoid)) + found = RelationFindReplTupleByIndex(localrel, idxoid, + LockTupleExclusive, + remoteslot, *localslot); + else + found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, + remoteslot, *localslot); + + return found; +} + +/* + * This handles insert, update, delete on a partitioned table. + */ +static void +apply_handle_tuple_routing(ApplyExecutionData *edata, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + CmdType operation) +{ + EState *estate = edata->estate; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + ResultRelInfo *relinfo = edata->targetRelInfo; + Relation parentrel = relinfo->ri_RelationDesc; + ModifyTableState *mtstate; + PartitionTupleRouting *proute; + ResultRelInfo *partrelinfo; + Relation partrel; + TupleTableSlot *remoteslot_part; + TupleConversionMap *map; + MemoryContext oldctx; + LogicalRepRelMapEntry *part_entry = NULL; + AttrMap *attrmap = NULL; + + /* ModifyTableState is needed for ExecFindPartition(). */ + edata->mtstate = mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = operation; + mtstate->resultRelInfo = relinfo; + + /* ... as is PartitionTupleRouting. */ + edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel); + + /* + * Find the partition to which the "search tuple" belongs. + */ + Assert(remoteslot != NULL); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrelinfo = ExecFindPartition(mtstate, relinfo, proute, + remoteslot, estate); + Assert(partrelinfo != NULL); + partrel = partrelinfo->ri_RelationDesc; + + /* + * To perform any of the operations below, the tuple must match the + * partition's rowtype. Convert if needed or just copy, using a dedicated + * slot to store the tuple in any case. + */ + remoteslot_part = partrelinfo->ri_PartitionTupleSlot; + if (remoteslot_part == NULL) + remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); + map = partrelinfo->ri_RootToPartitionMap; + if (map != NULL) + { + attrmap = map->attrMap; + remoteslot_part = execute_attr_map_slot(attrmap, remoteslot, + remoteslot_part); + } + else + { + remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); + slot_getallattrs(remoteslot_part); + } + MemoryContextSwitchTo(oldctx); + + /* Check if we can do the update or delete on the leaf partition. */ + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + check_relation_updatable(part_entry); + } + + switch (operation) + { + case CMD_INSERT: + apply_handle_insert_internal(edata, partrelinfo, + remoteslot_part); + break; + + case CMD_DELETE: + apply_handle_delete_internal(edata, partrelinfo, + remoteslot_part); + break; + + case CMD_UPDATE: + + /* + * For UPDATE, depending on whether or not the updated tuple + * satisfies the partition's constraint, perform a simple UPDATE + * of the partition or move the updated tuple into a different + * suitable partition. + */ + { + TupleTableSlot *localslot; + ResultRelInfo *partrelinfo_new; + bool found; + + /* Get the matching local tuple from the partition. */ + found = FindReplTupleInLocalRel(estate, partrel, + &part_entry->remoterel, + remoteslot_part, &localslot); + if (!found) + { + /* + * The tuple to be updated could not be found. Do nothing + * except for emitting a log message. + * + * XXX should this be promoted to ereport(LOG) perhaps? + */ + elog(DEBUG1, + "logical replication did not find row to be updated " + "in replication target relation's partition \"%s\"", + RelationGetRelationName(partrel)); + return; + } + + /* + * Apply the update to the local tuple, putting the result in + * remoteslot_part. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_data(remoteslot_part, localslot, part_entry, + newtup); + MemoryContextSwitchTo(oldctx); + + /* + * Does the updated tuple still satisfy the current + * partition's constraint? + */ + if (!partrel->rd_rel->relispartition || + ExecPartitionCheck(partrelinfo, remoteslot_part, estate, + false)) + { + /* + * Yes, so simply UPDATE the partition. We don't call + * apply_handle_update_internal() here, which would + * normally do the following work, to avoid repeating some + * work already done above to find the local tuple in the + * partition. + */ + EPQState epqstate; + + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1); + ExecOpenIndices(partrelinfo, false); + + EvalPlanQualSetSlot(&epqstate, remoteslot_part); + ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate, + localslot, remoteslot_part); + ExecCloseIndices(partrelinfo); + EvalPlanQualEnd(&epqstate); + } + else + { + /* Move the tuple into the new partition. */ + + /* + * New partition will be found using tuple routing, which + * can only occur via the parent table. We might need to + * convert the tuple to the parent's rowtype. Note that + * this is the tuple found in the partition, not the + * original search tuple received by this function. + */ + if (map) + { + TupleConversionMap *PartitionToRootMap = + convert_tuples_by_name(RelationGetDescr(partrel), + RelationGetDescr(parentrel)); + + remoteslot = + execute_attr_map_slot(PartitionToRootMap->attrMap, + remoteslot_part, remoteslot); + } + else + { + remoteslot = ExecCopySlot(remoteslot, remoteslot_part); + slot_getallattrs(remoteslot); + } + + + /* Find the new partition. */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrelinfo_new = ExecFindPartition(mtstate, relinfo, + proute, remoteslot, + estate); + MemoryContextSwitchTo(oldctx); + Assert(partrelinfo_new != partrelinfo); + + /* DELETE old tuple found in the old partition. */ + apply_handle_delete_internal(edata, partrelinfo, + localslot); + + /* INSERT new tuple into the new partition. */ + + /* + * Convert the replacement tuple to match the destination + * partition rowtype. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partrel = partrelinfo_new->ri_RelationDesc; + remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot; + if (remoteslot_part == NULL) + remoteslot_part = table_slot_create(partrel, + &estate->es_tupleTable); + map = partrelinfo_new->ri_RootToPartitionMap; + if (map != NULL) + { + remoteslot_part = execute_attr_map_slot(map->attrMap, + remoteslot, + remoteslot_part); + } + else + { + remoteslot_part = ExecCopySlot(remoteslot_part, + remoteslot); + slot_getallattrs(remoteslot); + } + MemoryContextSwitchTo(oldctx); + apply_handle_insert_internal(edata, partrelinfo_new, + remoteslot_part); + } + } + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } +} + +/* + * Handle TRUNCATE message. + * + * TODO: FDW support + */ +static void +apply_handle_truncate(StringInfo s) +{ + bool cascade = false; + bool restart_seqs = false; + List *remote_relids = NIL; + List *remote_rels = NIL; + List *rels = NIL; + List *part_rels = NIL; + List *relids = NIL; + List *relids_logged = NIL; + ListCell *lc; + LOCKMODE lockmode = AccessExclusiveLock; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + return; + + begin_replication_step(); + + remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); + + foreach(lc, remote_relids) + { + LogicalRepRelId relid = lfirst_oid(lc); + LogicalRepRelMapEntry *rel; + + rel = logicalrep_rel_open(relid, lockmode); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, lockmode); + continue; + } + + remote_rels = lappend(remote_rels, rel); + rels = lappend(rels, rel->localrel); + relids = lappend_oid(relids, rel->localreloid); + if (RelationIsLogicallyLogged(rel->localrel)) + relids_logged = lappend_oid(relids_logged, rel->localreloid); + + /* + * Truncate partitions if we got a message to truncate a partitioned + * table. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ListCell *child; + List *children = find_all_inheritors(rel->localreloid, + lockmode, + NULL); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(relids, childrelid)) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childrelid, NoLock); + + /* + * Ignore temp tables of other backends. See similar code in + * ExecuteTruncate(). + */ + if (RELATION_IS_OTHER_TEMP(childrel)) + { + table_close(childrel, lockmode); + continue; + } + + rels = lappend(rels, childrel); + part_rels = lappend(part_rels, childrel); + relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(childrel)) + relids_logged = lappend_oid(relids_logged, childrelid); + } + } + } + + /* + * Even if we used CASCADE on the upstream primary we explicitly default + * to replaying changes without further cascading. This might be later + * changeable with a user specified option. + */ + ExecuteTruncateGuts(rels, + relids, + relids_logged, + DROP_RESTRICT, + restart_seqs); + foreach(lc, remote_rels) + { + LogicalRepRelMapEntry *rel = lfirst(lc); + + logicalrep_rel_close(rel, NoLock); + } + foreach(lc, part_rels) + { + Relation rel = lfirst(lc); + + table_close(rel, NoLock); + } + + end_replication_step(); +} + + +/* + * Logical replication protocol message dispatcher. + */ +static void +apply_dispatch(StringInfo s) +{ + LogicalRepMsgType action = pq_getmsgbyte(s); + + switch (action) + { + case LOGICAL_REP_MSG_BEGIN: + apply_handle_begin(s); + return; + + case LOGICAL_REP_MSG_COMMIT: + apply_handle_commit(s); + return; + + case LOGICAL_REP_MSG_INSERT: + apply_handle_insert(s); + return; + + case LOGICAL_REP_MSG_UPDATE: + apply_handle_update(s); + return; + + case LOGICAL_REP_MSG_DELETE: + apply_handle_delete(s); + return; + + case LOGICAL_REP_MSG_TRUNCATE: + apply_handle_truncate(s); + return; + + case LOGICAL_REP_MSG_RELATION: + apply_handle_relation(s); + return; + + case LOGICAL_REP_MSG_TYPE: + apply_handle_type(s); + return; + + case LOGICAL_REP_MSG_ORIGIN: + apply_handle_origin(s); + return; + + case LOGICAL_REP_MSG_MESSAGE: + + /* + * Logical replication does not use generic logical messages yet. + * Although, it could be used by other applications that use this + * output plugin. + */ + return; + + case LOGICAL_REP_MSG_STREAM_START: + apply_handle_stream_start(s); + return; + + case LOGICAL_REP_MSG_STREAM_END: + apply_handle_stream_stop(s); + return; + + case LOGICAL_REP_MSG_STREAM_ABORT: + apply_handle_stream_abort(s); + return; + + case LOGICAL_REP_MSG_STREAM_COMMIT: + apply_handle_stream_commit(s); + return; + } + + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid logical replication message type \"%c\"", + action))); +} + +/* + * Figure out which write/flush positions to report to the walsender process. + * + * We can't simply report back the last LSN the walsender sent us because the + * local transaction might not yet be flushed to disk locally. Instead we + * build a list that associates local with remote LSNs for every commit. When + * reporting back the flush position to the sender we iterate that list and + * check which entries on it are already locally flushed. Those we can report + * as having been flushed. + * + * The have_pending_txes is true if there are outstanding transactions that + * need to be flushed. + */ +static void +get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, + bool *have_pending_txes) +{ + dlist_mutable_iter iter; + XLogRecPtr local_flush = GetFlushRecPtr(); + + *write = InvalidXLogRecPtr; + *flush = InvalidXLogRecPtr; + + dlist_foreach_modify(iter, &lsn_mapping) + { + FlushPosition *pos = + dlist_container(FlushPosition, node, iter.cur); + + *write = pos->remote_end; + + if (pos->local_end <= local_flush) + { + *flush = pos->remote_end; + dlist_delete(iter.cur); + pfree(pos); + } + else + { + /* + * Don't want to uselessly iterate over the rest of the list which + * could potentially be long. Instead get the last element and + * grab the write position from there. + */ + pos = dlist_tail_element(FlushPosition, node, + &lsn_mapping); + *write = pos->remote_end; + *have_pending_txes = true; + return; + } + } + + *have_pending_txes = !dlist_is_empty(&lsn_mapping); +} + +/* + * Store current remote/local lsn pair in the tracking list. + */ +static void +store_flush_position(XLogRecPtr remote_lsn) +{ + FlushPosition *flushpos; + + /* Need to do this in permanent context */ + MemoryContextSwitchTo(ApplyContext); + + /* Track commit lsn */ + flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); + flushpos->local_end = XactLastCommitEnd; + flushpos->remote_end = remote_lsn; + + dlist_push_tail(&lsn_mapping, &flushpos->node); + MemoryContextSwitchTo(ApplyMessageContext); +} + + +/* Update statistics of the worker. */ +static void +UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) +{ + MyLogicalRepWorker->last_lsn = last_lsn; + MyLogicalRepWorker->last_send_time = send_time; + MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp(); + if (reply) + { + MyLogicalRepWorker->reply_lsn = last_lsn; + MyLogicalRepWorker->reply_time = send_time; + } +} + +/* + * Apply main loop. + */ +static void +LogicalRepApplyLoop(XLogRecPtr last_received) +{ + TimestampTz last_recv_timestamp = GetCurrentTimestamp(); + bool ping_sent = false; + TimeLineID tli; + + /* + * Init the ApplyMessageContext which we clean up after each replication + * protocol message. + */ + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); + + /* + * This memory context is used for per-stream data when the streaming mode + * is enabled. This context is reset on each stream stop. + */ + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); + + /* mark as idle, before starting to loop */ + pgstat_report_activity(STATE_IDLE, NULL); + + /* This outer loop iterates once per wait. */ + for (;;) + { + pgsocket fd = PGINVALID_SOCKET; + int rc; + int len; + char *buf = NULL; + bool endofstream = false; + long wait_time; + + CHECK_FOR_INTERRUPTS(); + + MemoryContextSwitchTo(ApplyMessageContext); + + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + + if (len != 0) + { + /* Loop to process all available data (without blocking). */ + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + { + break; + } + else if (len < 0) + { + ereport(LOG, + (errmsg("data stream from publisher has ended"))); + endofstream = true; + break; + } + else + { + int c; + StringInfoData s; + + /* Reset timeout. */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; + + /* Ensure we are reading the data into our memory context. */ + MemoryContextSwitchTo(ApplyMessageContext); + + s.data = buf; + s.len = len; + s.cursor = 0; + s.maxlen = -1; + + c = pq_getmsgbyte(&s); + + if (c == 'w') + { + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimestampTz send_time; + + start_lsn = pq_getmsgint64(&s); + end_lsn = pq_getmsgint64(&s); + send_time = pq_getmsgint64(&s); + + if (last_received < start_lsn) + last_received = start_lsn; + + if (last_received < end_lsn) + last_received = end_lsn; + + UpdateWorkerStats(last_received, send_time, false); + + apply_dispatch(&s); + } + else if (c == 'k') + { + XLogRecPtr end_lsn; + TimestampTz timestamp; + bool reply_requested; + + end_lsn = pq_getmsgint64(&s); + timestamp = pq_getmsgint64(&s); + reply_requested = pq_getmsgbyte(&s); + + if (last_received < end_lsn) + last_received = end_lsn; + + send_feedback(last_received, reply_requested, false); + UpdateWorkerStats(last_received, timestamp, true); + } + /* other message types are purposefully ignored */ + + MemoryContextReset(ApplyMessageContext); + } + + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + } + } + + /* confirm all writes so far */ + send_feedback(last_received, false, false); + + if (!in_remote_transaction && !in_streamed_transaction) + { + /* + * If we didn't get any transactions for a while there might be + * unconsumed invalidation messages in the queue, consume them + * now. + */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + + /* Process any table synchronization changes. */ + process_syncing_tables(last_received); + } + + /* Cleanup the memory. */ + MemoryContextResetAndDeleteChildren(ApplyMessageContext); + MemoryContextSwitchTo(TopMemoryContext); + + /* Check if we need to exit the streaming loop. */ + if (endofstream) + break; + + /* + * Wait for more data or latch. If we have unflushed transactions, + * wake up after WalWriterDelay to see if they've been flushed yet (in + * which case we should send a feedback message). Otherwise, there's + * no particular urgency about waking up unless we get data or a + * signal. + */ + if (!dlist_is_empty(&lsn_mapping)) + wait_time = WalWriterDelay; + else + wait_time = NAPTIME_PER_CYCLE; + + rc = WaitLatchOrSocket(MyLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + fd, wait_time, + WAIT_EVENT_LOGICAL_APPLY_MAIN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (rc & WL_TIMEOUT) + { + /* + * We didn't receive anything new. If we haven't heard anything + * from the server for more than wal_receiver_timeout / 2, ping + * the server. Also, if it's been longer than + * wal_receiver_status_interval since the last update we sent, + * send a status update to the primary anyway, to report any + * progress in applying WAL. + */ + bool requestReply = false; + + /* + * Check if time since last receive from primary has reached the + * configured limit. + */ + if (wal_receiver_timeout > 0) + { + TimestampTz now = GetCurrentTimestamp(); + TimestampTz timeout; + + timeout = + TimestampTzPlusMilliseconds(last_recv_timestamp, + wal_receiver_timeout); + + if (now >= timeout) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("terminating logical replication worker due to timeout"))); + + /* Check to see if it's time for a ping. */ + if (!ping_sent) + { + timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, + (wal_receiver_timeout / 2)); + if (now >= timeout) + { + requestReply = true; + ping_sent = true; + } + } + } + + send_feedback(last_received, requestReply, requestReply); + } + } + + /* All done */ + walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); +} + +/* + * Send a Standby Status Update message to server. + * + * 'recvpos' is the latest LSN we've received data to, force is set if we need + * to send a response to avoid timeouts. + */ +static void +send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) +{ + static StringInfo reply_message = NULL; + static TimestampTz send_time = 0; + + static XLogRecPtr last_recvpos = InvalidXLogRecPtr; + static XLogRecPtr last_writepos = InvalidXLogRecPtr; + static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + + XLogRecPtr writepos; + XLogRecPtr flushpos; + TimestampTz now; + bool have_pending_txes; + + /* + * If the user doesn't want status to be reported to the publisher, be + * sure to exit before doing anything at all. + */ + if (!force && wal_receiver_status_interval <= 0) + return; + + /* It's legal to not pass a recvpos */ + if (recvpos < last_recvpos) + recvpos = last_recvpos; + + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + /* + * No outstanding transactions to flush, we can report the latest received + * position. This is important for synchronous replication. + */ + if (!have_pending_txes) + flushpos = writepos = recvpos; + + if (writepos < last_writepos) + writepos = last_writepos; + + if (flushpos < last_flushpos) + flushpos = last_flushpos; + + now = GetCurrentTimestamp(); + + /* if we've already reported everything we're good */ + if (!force && + writepos == last_writepos && + flushpos == last_flushpos && + !TimestampDifferenceExceeds(send_time, now, + wal_receiver_status_interval * 1000)) + return; + send_time = now; + + if (!reply_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + reply_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(reply_message); + + pq_sendbyte(reply_message, 'r'); + pq_sendint64(reply_message, recvpos); /* write */ + pq_sendint64(reply_message, flushpos); /* flush */ + pq_sendint64(reply_message, writepos); /* apply */ + pq_sendint64(reply_message, now); /* sendTime */ + pq_sendbyte(reply_message, requestReply); /* replyRequested */ + + elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", + force, + LSN_FORMAT_ARGS(recvpos), + LSN_FORMAT_ARGS(writepos), + LSN_FORMAT_ARGS(flushpos)); + + walrcv_send(LogRepWorkerWalRcvConn, + reply_message->data, reply_message->len); + + if (recvpos > last_recvpos) + last_recvpos = recvpos; + if (writepos > last_writepos) + last_writepos = writepos; + if (flushpos > last_flushpos) + last_flushpos = flushpos; +} + +/* + * Reread subscription info if needed. Most changes will be exit. + */ +static void +maybe_reread_subscription(void) +{ + MemoryContext oldctx; + Subscription *newsub; + bool started_tx = false; + + /* When cache state is valid there is nothing to do here. */ + if (MySubscriptionValid) + return; + + /* This function might be called inside or outside of transaction. */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Ensure allocations in permanent context. */ + oldctx = MemoryContextSwitchTo(ApplyContext); + + newsub = GetSubscription(MyLogicalRepWorker->subid, true); + + /* + * Exit if the subscription was removed. This normally should not happen + * as the worker gets killed during DROP SUBSCRIPTION. + */ + if (!newsub) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); + + proc_exit(0); + } + + /* + * Exit if the subscription was disabled. This normally should not happen + * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE. + */ + if (!newsub->enabled) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); + + proc_exit(0); + } + + /* !slotname should never happen when enabled is true. */ + Assert(newsub->slotname); + + /* + * Exit if any parameter that affects the remote connection was changed. + * The launcher will start a new worker. + */ + if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || + strcmp(newsub->name, MySubscription->name) != 0 || + strcmp(newsub->slotname, MySubscription->slotname) != 0 || + newsub->binary != MySubscription->binary || + newsub->stream != MySubscription->stream || + !equal(newsub->publications, MySubscription->publications)) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will restart because of a parameter change", + MySubscription->name))); + + proc_exit(0); + } + + /* Check for other changes that should never happen too. */ + if (newsub->dbid != MySubscription->dbid) + { + elog(ERROR, "subscription %u changed unexpectedly", + MyLogicalRepWorker->subid); + } + + /* Clean old subscription info and switch to new one. */ + FreeSubscription(MySubscription); + MySubscription = newsub; + + MemoryContextSwitchTo(oldctx); + + /* Change synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + + if (started_tx) + CommitTransactionCommand(); + + MySubscriptionValid = true; +} + +/* + * Callback from subscription syscache invalidation. + */ +static void +subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + MySubscriptionValid = false; +} + +/* + * subxact_info_write + * Store information about subxacts for a toplevel transaction. + * + * For each subxact we store offset of it's first change in the main file. + * The file is always over-written as a whole. + * + * XXX We should only store subxacts that were not aborted yet. + */ +static void +subxact_info_write(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + Size len; + StreamXidHash *ent; + BufFile *fd; + + Assert(TransactionIdIsValid(xid)); + + /* Find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + NULL); + /* By this time we must have created the transaction entry */ + Assert(ent); + + /* + * If there is no subtransaction then nothing to do, but if already have + * subxact file then delete that. + */ + if (subxact_data.nsubxacts == 0) + { + if (ent->subxact_fileset) + { + cleanup_subxact_info(); + SharedFileSetDeleteAll(ent->subxact_fileset); + pfree(ent->subxact_fileset); + ent->subxact_fileset = NULL; + } + return; + } + + subxact_filename(path, subid, xid); + + /* + * Create the subxact file if it not already created, otherwise open the + * existing file. + */ + if (ent->subxact_fileset == NULL) + { + MemoryContext oldctx; + + /* + * We need to maintain shared fileset across multiple stream + * start/stop calls. So, need to allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + ent->subxact_fileset = palloc(sizeof(SharedFileSet)); + SharedFileSetInit(ent->subxact_fileset, NULL); + MemoryContextSwitchTo(oldctx); + + fd = BufFileCreateShared(ent->subxact_fileset, path); + } + else + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR); + + len = sizeof(SubXactInfo) * subxact_data.nsubxacts; + + /* Write the subxact count and subxact info */ + BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts)); + BufFileWrite(fd, subxact_data.subxacts, len); + + BufFileClose(fd); + + /* free the memory allocated for subxact info */ + cleanup_subxact_info(); +} + +/* + * subxact_info_read + * Restore information about subxacts of a streamed transaction. + * + * Read information about subxacts into the structure subxact_data that can be + * used later. + */ +static void +subxact_info_read(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + Size len; + BufFile *fd; + StreamXidHash *ent; + MemoryContext oldctx; + + Assert(!subxact_data.subxacts); + Assert(subxact_data.nsubxacts == 0); + Assert(subxact_data.nsubxacts_max == 0); + + /* Find the stream xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); + + /* + * If subxact_fileset is not valid that mean we don't have any subxact + * info + */ + if (ent->subxact_fileset == NULL) + return; + + subxact_filename(path, subid, xid); + + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY); + + /* read number of subxact items */ + if (BufFileRead(fd, &subxact_data.nsubxacts, + sizeof(subxact_data.nsubxacts)) != + sizeof(subxact_data.nsubxacts)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + len = sizeof(SubXactInfo) * subxact_data.nsubxacts; + + /* we keep the maximum as a power of 2 */ + subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts); + + /* + * Allocate subxact information in the logical streaming context. We need + * this information during the complete stream so that we can add the sub + * transaction info to this. On stream stop we will flush this information + * to the subxact file and reset the logical streaming context. + */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxact_data.subxacts = palloc(subxact_data.nsubxacts_max * + sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + + if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + BufFileClose(fd); +} + +/* + * subxact_info_add + * Add information about a subxact (offset in the main file). + */ +static void +subxact_info_add(TransactionId xid) +{ + SubXactInfo *subxacts = subxact_data.subxacts; + int64 i; + + /* We must have a valid top level stream xid and a stream fd. */ + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* + * If the XID matches the toplevel transaction, we don't want to add it. + */ + if (stream_xid == xid) + return; + + /* + * In most cases we're checking the same subxact as we've already seen in + * the last call, so make sure to ignore it (this change comes later). + */ + if (subxact_data.subxact_last == xid) + return; + + /* OK, remember we're processing this XID. */ + subxact_data.subxact_last = xid; + + /* + * Check if the transaction is already present in the array of subxact. We + * intentionally scan the array from the tail, because we're likely adding + * a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + */ + for (i = subxact_data.nsubxacts; i > 0; i--) + { + /* found, so we're done */ + if (subxacts[i - 1].xid == xid) + return; + } + + /* This is a new subxact, so we need to add it to the array. */ + if (subxact_data.nsubxacts == 0) + { + MemoryContext oldctx; + + subxact_data.nsubxacts_max = 128; + + /* + * Allocate this memory for subxacts in per-stream context, see + * subxact_info_read. + */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + } + else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max) + { + subxact_data.nsubxacts_max *= 2; + subxacts = repalloc(subxacts, + subxact_data.nsubxacts_max * sizeof(SubXactInfo)); + } + + subxacts[subxact_data.nsubxacts].xid = xid; + + /* + * Get the current offset of the stream file and store it as offset of + * this subxact. + */ + BufFileTell(stream_fd, + &subxacts[subxact_data.nsubxacts].fileno, + &subxacts[subxact_data.nsubxacts].offset); + + subxact_data.nsubxacts++; + subxact_data.subxacts = subxacts; +} + +/* format filename for file containing the info about subxacts */ +static inline void +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid); +} + +/* format filename for file containing serialized changes */ +static inline void +changes_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); +} + +/* + * stream_cleanup_files + * Cleanup files for a subscription / toplevel transaction. + * + * Remove files with serialized changes and subxact info for a particular + * toplevel transaction. Each subscription has a separate set of files. + */ +static void +stream_cleanup_files(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + StreamXidHash *ent; + + /* Find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + NULL); + if (!ent) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("transaction %u not found in stream XID hash table", + xid))); + + /* Delete the change file and release the stream fileset memory */ + changes_filename(path, subid, xid); + SharedFileSetDeleteAll(ent->stream_fileset); + pfree(ent->stream_fileset); + ent->stream_fileset = NULL; + + /* Delete the subxact file and release the memory, if it exist */ + if (ent->subxact_fileset) + { + subxact_filename(path, subid, xid); + SharedFileSetDeleteAll(ent->subxact_fileset); + pfree(ent->subxact_fileset); + ent->subxact_fileset = NULL; + } + + /* Remove the xid entry from the stream xid hash */ + hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); +} + +/* + * stream_open_file + * Open a file that we'll use to serialize changes for a toplevel + * transaction. + * + * Open a file for streamed changes from a toplevel transaction identified + * by stream_xid (global variable). If it's the first chunk of streamed + * changes for this transaction, initialize the shared fileset and create the + * buffile, otherwise open the previously created file. + * + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_open_file(Oid subid, TransactionId xid, bool first_segment) +{ + char path[MAXPGPATH]; + bool found; + MemoryContext oldcxt; + StreamXidHash *ent; + + Assert(in_streamed_transaction); + Assert(OidIsValid(subid)); + Assert(TransactionIdIsValid(xid)); + Assert(stream_fd == NULL); + + /* create or find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_ENTER, + &found); + + changes_filename(path, subid, xid); + elog(DEBUG1, "opening file \"%s\" for streamed changes", path); + + /* + * Create/open the buffiles under the logical streaming context so that we + * have those files until stream stop. + */ + oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); + + /* + * If this is the first streamed segment, the file must not exist, so make + * sure we're the ones creating it. Otherwise just open the file for + * writing, in append mode. + */ + if (first_segment) + { + MemoryContext savectx; + SharedFileSet *fileset; + + if (found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); + + /* + * We need to maintain shared fileset across multiple stream + * start/stop calls. So, need to allocate it in a persistent context. + */ + savectx = MemoryContextSwitchTo(ApplyContext); + fileset = palloc(sizeof(SharedFileSet)); + + SharedFileSetInit(fileset, NULL); + MemoryContextSwitchTo(savectx); + + stream_fd = BufFileCreateShared(fileset, path); + + /* Remember the fileset for the next stream of the same transaction */ + ent->xid = xid; + ent->stream_fileset = fileset; + ent->subxact_fileset = NULL; + } + else + { + if (!found) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); + + /* + * Open the file and seek to the end of the file because we always + * append the changes file. + */ + stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + BufFileSeek(stream_fd, 0, 0, SEEK_END); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * stream_close_file + * Close the currently open file with streamed changes. + * + * This can only be called at the end of a streaming block, i.e. at stream_stop + * message from the upstream. + */ +static void +stream_close_file(void) +{ + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + BufFileClose(stream_fd); + + stream_xid = InvalidTransactionId; + stream_fd = NULL; +} + +/* + * stream_write_change + * Serialize a change to a file for the current toplevel transaction. + * + * The change is serialized in a simple format, with length (not including + * the length), action code (identifying the message type) and message + * contents (without the subxact TransactionId value). + */ +static void +stream_write_change(char action, StringInfo s) +{ + int len; + + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* total on-disk size, including the action type character */ + len = (s->len - s->cursor) + sizeof(char); + + /* first write the size */ + BufFileWrite(stream_fd, &len, sizeof(len)); + + /* then the action */ + BufFileWrite(stream_fd, &action, sizeof(action)); + + /* and finally the remaining part of the buffer (after the XID) */ + len = (s->len - s->cursor); + + BufFileWrite(stream_fd, &s->data[s->cursor], len); +} + +/* + * Cleanup the memory for subxacts and reset the related variables. + */ +static inline void +cleanup_subxact_info() +{ + if (subxact_data.subxacts) + pfree(subxact_data.subxacts); + + subxact_data.subxacts = NULL; + subxact_data.subxact_last = InvalidTransactionId; + subxact_data.nsubxacts = 0; + subxact_data.nsubxacts_max = 0; +} + +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + MemoryContext oldctx; + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos; + char *myslotname; + WalRcvStreamOptions options; + + /* Attach to slot */ + logicalrep_worker_attach(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * We don't currently need any ResourceOwner in a walreceiver process, but + * if we did, we could call CreateAuxProcessResourceOwner here. + */ + + /* Initialise stats to a sanish value */ + MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = + MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* Run as replica session replication role. */ + SetConfigOption("session_replication_role", "replica", + PGC_SUSET, PGC_S_OVERRIDE); + + /* Connect to our database. */ + BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid, + MyLogicalRepWorker->userid, + 0); + + /* + * Set always-secure search path, so malicious users can't redirect user + * code (e.g. pg_index.indexprs). + */ + SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE); + + /* Load the subscription into persistent memory context. */ + ApplyContext = AllocSetContextCreate(TopMemoryContext, + "ApplyContext", + ALLOCSET_DEFAULT_SIZES); + StartTransactionCommand(); + oldctx = MemoryContextSwitchTo(ApplyContext); + + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); + if (!MySubscription) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription %u will not " + "start because the subscription was removed during startup", + MyLogicalRepWorker->subid))); + proc_exit(0); + } + + MySubscriptionValid = true; + MemoryContextSwitchTo(oldctx); + + if (!MySubscription->enabled) + { + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" will not " + "start because the subscription was disabled during startup", + MySubscription->name))); + + proc_exit(0); + } + + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", MySubscription->synccommit, + PGC_BACKEND, PGC_S_OVERRIDE); + + /* Keep us informed about subscription changes. */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, + subscription_change_cb, + (Datum) 0); + + if (am_tablesync_worker()) + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + else + ereport(LOG, + (errmsg("logical replication apply worker for subscription \"%s\" has started", + MySubscription->name))); + + CommitTransactionCommand(); + + /* Connect to the origin and start the replication. */ + elog(DEBUG1, "connecting to publisher using connection string \"%s\"", + MySubscription->conninfo); + + if (am_tablesync_worker()) + { + char *syncslotname; + + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + + /* allocate slot name in long-lived context */ + myslotname = MemoryContextStrdup(ApplyContext, syncslotname); + + pfree(syncslotname); + } + else + { + /* This is main apply worker */ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + + myslotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard + * against DDL bugs or manual catalog changes. (libpqwalreceiver will + * crash if slot is NULL.) + */ + if (!myslotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + CommitTransactionCommand(); + + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + MySubscription->name, &err); + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + } + + /* + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. + */ + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + invalidate_syncing_table_states, + (Datum) 0); + + /* Build logical replication streaming options. */ + options.logical = true; + options.startpoint = origin_startpos; + options.slotname = myslotname; + options.proto.logical.proto_version = + walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ? + LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM; + options.proto.logical.publication_names = MySubscription->publications; + options.proto.logical.binary = MySubscription->binary; + options.proto.logical.streaming = MySubscription->stream; + + /* Start normal logical streaming replication. */ + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + /* Run the main loop. */ + LogicalRepApplyLoop(origin_startpos); + + proc_exit(0); +} + +/* + * Is current process a logical replication worker? + */ +bool +IsLogicalWorker(void) +{ + return MyLogicalRepWorker != NULL; +} |