1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
|
# Basic logical replication test
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 27;
# Initialize publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql(
'postgres',
"CREATE FUNCTION public.pg_get_replica_identity_index(int)
RETURNS regclass LANGUAGE sql AS 'SELECT 1/0'"); # shall not call
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rep (a int primary key)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_mixed (a int primary key, b text, c numeric)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)");
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
);
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_full_pk (a int primary key, b text)");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_full_pk REPLICA IDENTITY FULL");
# Let this table with REPLICA IDENTITY NOTHING, allowing only INSERT changes.
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_nothing REPLICA IDENTITY NOTHING");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_rep (a int primary key)");
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_full_pk (a int primary key, b text)");
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_full_pk REPLICA IDENTITY FULL");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)");
# different column count and order than on publisher
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)"
);
# replication of the table with included index
$node_subscriber->safe_psql('postgres',
"CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
);
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)");
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_nothing, tab_full_pk"
);
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
);
$node_publisher->wait_for_catchup('tap_sub');
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
is($result, qq(0), 'check non-replicated table is empty on subscriber');
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
is($result, qq(1002), 'check initial data was copied to subscriber');
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1,50)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rep SELECT generate_series(1,50)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_nothing VALUES (generate_series(1,20))");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_include SELECT generate_series(1,50)");
$node_publisher->safe_psql('postgres',
"DELETE FROM tab_include WHERE a > 20");
$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_rep");
is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed");
is( $result, qq(local|1.1|foo|1
local|2.2|bar|2), 'check replicated changes with different column order');
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_nothing");
is($result, qq(20), 'check replicated changes with REPLICA IDENTITY NOTHING');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_include");
is($result, qq(20|-20|-1),
'check replicated changes with primary key index with included columns');
# insert some duplicate rows
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_full SELECT generate_series(1,10)");
# add REPLICA IDENTITY FULL so we can update
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_full REPLICA IDENTITY FULL");
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_full REPLICA IDENTITY FULL");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_full2 REPLICA IDENTITY FULL");
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_full2 REPLICA IDENTITY FULL");
$node_publisher->safe_psql('postgres',
"ALTER TABLE tab_ins REPLICA IDENTITY FULL");
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_ins REPLICA IDENTITY FULL");
# tab_mixed can use DEFAULT, since it has a primary key
# and do the updates
$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a");
$node_publisher->safe_psql('postgres',
"UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
$node_publisher->safe_psql('postgres',
"UPDATE tab_mixed SET b = 'baz' WHERE a = 1");
$node_publisher->safe_psql('postgres',
"UPDATE tab_full_pk SET b = 'bar' WHERE a = 1");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_full");
is($result, qq(20|1|100),
'update works with REPLICA IDENTITY FULL and duplicate tuples');
$result = $node_subscriber->safe_psql('postgres',
"SELECT x FROM tab_full2 ORDER BY 1");
is( $result, qq(a
bb
bb),
'update works with REPLICA IDENTITY FULL and text datums');
$result = $node_subscriber->safe_psql('postgres',
"SELECT * FROM tab_mixed ORDER BY a");
is( $result, qq(local|1.1|baz|1
local|2.2|bar|2),
'update works with different column order and subscriber local values');
$result = $node_subscriber->safe_psql('postgres',
"SELECT * FROM tab_full_pk ORDER BY a");
is( $result, qq(1|bar
2|baz),
'update works with REPLICA IDENTITY FULL and a primary key');
# Check that subscriber handles cases where update/delete target tuple
# is missing. We have to look for the DEBUG1 log messages about that,
# so temporarily bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk");
# Note that the current location of the log file is not grabbed immediately
# after reloading the configuration, but after sending one SQL command to
# the node so as we are sure that the reloading has taken effect.
my $log_location = -s $node_subscriber->logfile;
$node_publisher->safe_psql('postgres',
"UPDATE tab_full_pk SET b = 'quux' WHERE a = 1");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2");
$node_publisher->wait_for_catchup('tap_sub');
my $logfile = slurp_file($node_subscriber->logfile, $log_location);
ok( $logfile =~
qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/,
'update target row is missing');
ok( $logfile =~
qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/,
'delete target row is missing');
$node_subscriber->append_conf('postgresql.conf',
"log_min_messages = warning");
$node_subscriber->reload;
# check behavior with toasted values
$node_publisher->safe_psql('postgres',
"UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
is( $result, qq(1|3|1.1|local
2|500000|2.2|local),
'update transmits large column value');
$node_publisher->safe_psql('postgres',
"UPDATE tab_mixed SET c = 3.3 WHERE a = 2");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
is( $result, qq(1|3|1.1|local
2|500000|3.3|local),
'update with non-transmitted large column value');
# check behavior with dropped columns
# this update should get transmitted before the column goes away
$node_publisher->safe_psql('postgres',
"UPDATE tab_mixed SET b = 'bar', c = 2.2 WHERE a = 2");
$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b");
$node_publisher->safe_psql('postgres',
"UPDATE tab_mixed SET c = 11.11 WHERE a = 1");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT * FROM tab_mixed ORDER BY a");
is( $result, qq(local|11.11|baz|1
local|2.2|bar|2),
'update works with dropped publisher column');
$node_subscriber->safe_psql('postgres',
"ALTER TABLE tab_mixed DROP COLUMN d");
$node_publisher->safe_psql('postgres',
"UPDATE tab_mixed SET c = 22.22 WHERE a = 2");
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT * FROM tab_mixed ORDER BY a");
is( $result, qq(11.11|baz|1
22.22|bar|2),
'update works with dropped subscriber column');
# check that change of connection string and/or publication list causes
# restart of subscription workers. Not all of these are registered as tests
# as we need to poll for a change but the test suite will fail none the less
# when something goes wrong.
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr sslmode=disable'"
);
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub';"
) or die "Timed out while waiting for apply to restart";
$oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)"
);
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub';"
) or die "Timed out while waiting for apply to restart";
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
# Restart the publisher and check the state of the subscriber which
# should be in a streaming state after catching up.
$node_publisher->stop('fast');
$node_publisher->start;
$node_publisher->wait_for_catchup('tap_sub');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1152|1|1100),
'check replicated inserts after subscription publication change');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_rep");
is($result, qq(20|-20|-1),
'check changes skipped after subscription publication change');
# check alter publication (relcache invalidation etc)
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')");
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0");
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = false)"
);
$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)");
$node_publisher->wait_for_catchup('tap_sub');
# note that data are different on provider and subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1052|1|1002),
'check replicated deletes after alter publication');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_full");
is($result, qq(21|0|100), 'check replicated insert after alter publication');
# check restart on rename
$oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed';"
) or die "Timed out while waiting for apply to restart";
# check all the cleanup
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription");
is($result, qq(0), 'check subscription was dropped on subscriber');
$result = $node_publisher->safe_psql('postgres',
"SELECT count(*) FROM pg_replication_slots");
is($result, qq(0), 'check replication slot was dropped on publisher');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription_rel");
is($result, qq(0),
'check subscription relation status was dropped on subscriber');
$result = $node_publisher->safe_psql('postgres',
"SELECT count(*) FROM pg_replication_slots");
is($result, qq(0), 'check replication slot was dropped on publisher');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_replication_origin");
is($result, qq(0), 'check replication origin was dropped on subscriber');
$node_subscriber->stop('fast');
$node_publisher->stop('fast');
# CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
$node_publisher->append_conf(
'postgresql.conf', qq(
wal_level=minimal
max_wal_senders=0
));
$node_publisher->start;
($result, my $retout, my $reterr) = $node_publisher->psql(
'postgres', qq{
BEGIN;
CREATE TABLE skip_wal();
CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal;
ROLLBACK;
});
ok( $reterr =~
m/WARNING: wal_level is insufficient to publish logical changes/,
'CREATE PUBLICATION while wal_level=minimal');
|