diff options
Diffstat (limited to 'src/backend/commands/vacuumparallel.c')
-rw-r--r-- | src/backend/commands/vacuumparallel.c | 1074 |
1 files changed, 1074 insertions, 0 deletions
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c new file mode 100644 index 0000000..f26d796 --- /dev/null +++ b/src/backend/commands/vacuumparallel.c @@ -0,0 +1,1074 @@ +/*------------------------------------------------------------------------- + * + * vacuumparallel.c + * Support routines for parallel vacuum execution. + * + * This file contains routines that are intended to support setting up, using, + * and tearing down a ParallelVacuumState. + * + * In a parallel vacuum, we perform both index bulk deletion and index cleanup + * with parallel worker processes. Individual indexes are processed by one + * vacuum process. ParalleVacuumState contains shared information as well as + * the memory space for storing dead items allocated in the DSM segment. We + * launch parallel worker processes at the start of parallel index + * bulk-deletion and index cleanup and once all indexes are processed, the + * parallel worker processes exit. Each time we process indexes in parallel, + * the parallel context is re-initialized so that the same DSM can be used for + * multiple passes of index bulk-deletion and index cleanup. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/commands/vacuumparallel.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/amapi.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "optimizer/paths.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" + +/* + * DSM keys for parallel vacuum. Unlike other parallel execution code, since + * we don't need to worry about DSM keys conflicting with plan_node_id we can + * use small integers. + */ +#define PARALLEL_VACUUM_KEY_SHARED 1 +#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 +#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 +#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 +#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 + +/* + * Shared information among parallel workers. So this is allocated in the DSM + * segment. + */ +typedef struct PVShared +{ + /* + * Target table relid and log level (for messages about parallel workers + * launched during VACUUM VERBOSE). These fields are not modified during + * the parallel vacuum. + */ + Oid relid; + int elevel; + + /* + * Fields for both index vacuum and cleanup. + * + * reltuples is the total number of input heap tuples. We set either old + * live tuples in the index vacuum case or the new live tuples in the + * index cleanup case. + * + * estimated_count is true if reltuples is an estimated value. (Note that + * reltuples could be -1 in this case, indicating we have no idea.) + */ + double reltuples; + bool estimated_count; + + /* + * In single process vacuum we could consume more memory during index + * vacuuming or cleanup apart from the memory for heap scanning. In + * parallel vacuum, since individual vacuum workers can consume memory + * equal to maintenance_work_mem, the new maintenance_work_mem for each + * worker is set such that the parallel operation doesn't consume more + * memory than single process vacuum. + */ + int maintenance_work_mem_worker; + + /* + * Shared vacuum cost balance. During parallel vacuum, + * VacuumSharedCostBalance points to this value and it accumulates the + * balance of each parallel vacuum worker. + */ + pg_atomic_uint32 cost_balance; + + /* + * Number of active parallel workers. This is used for computing the + * minimum threshold of the vacuum cost balance before a worker sleeps for + * cost-based delay. + */ + pg_atomic_uint32 active_nworkers; + + /* Counter for vacuuming and cleanup */ + pg_atomic_uint32 idx; +} PVShared; + +/* Status used during parallel index vacuum or cleanup */ +typedef enum PVIndVacStatus +{ + PARALLEL_INDVAC_STATUS_INITIAL = 0, + PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, + PARALLEL_INDVAC_STATUS_NEED_CLEANUP, + PARALLEL_INDVAC_STATUS_COMPLETED +} PVIndVacStatus; + +/* + * Struct for index vacuum statistics of an index that is used for parallel vacuum. + * This includes the status of parallel index vacuum as well as index statistics. + */ +typedef struct PVIndStats +{ + /* + * The following two fields are set by leader process before executing + * parallel index vacuum or parallel index cleanup. These fields are not + * fixed for the entire VACUUM operation. They are only fixed for an + * individual parallel index vacuum and cleanup. + * + * parallel_workers_can_process is true if both leader and worker can + * process the index, otherwise only leader can process it. + */ + PVIndVacStatus status; + bool parallel_workers_can_process; + + /* + * Individual worker or leader stores the result of index vacuum or + * cleanup. + */ + bool istat_updated; /* are the stats updated? */ + IndexBulkDeleteResult istat; +} PVIndStats; + +/* + * Struct for maintaining a parallel vacuum state. typedef appears in vacuum.h. + */ +struct ParallelVacuumState +{ + /* NULL for worker processes */ + ParallelContext *pcxt; + + /* Target indexes */ + Relation *indrels; + int nindexes; + + /* Shared information among parallel vacuum workers */ + PVShared *shared; + + /* + * Shared index statistics among parallel vacuum workers. The array + * element is allocated for every index, even those indexes where parallel + * index vacuuming is unsafe or not worthwhile (e.g., + * will_parallel_vacuum[] is false). During parallel vacuum, + * IndexBulkDeleteResult of each index is kept in DSM and is copied into + * local memory at the end of parallel vacuum. + */ + PVIndStats *indstats; + + /* Shared dead items space among parallel vacuum workers */ + VacDeadItems *dead_items; + + /* Points to buffer usage area in DSM */ + BufferUsage *buffer_usage; + + /* Points to WAL usage area in DSM */ + WalUsage *wal_usage; + + /* + * False if the index is totally unsuitable target for all parallel + * processing. For example, the index could be < + * min_parallel_index_scan_size cutoff. + */ + bool *will_parallel_vacuum; + + /* + * The number of indexes that support parallel index bulk-deletion and + * parallel index cleanup respectively. + */ + int nindexes_parallel_bulkdel; + int nindexes_parallel_cleanup; + int nindexes_parallel_condcleanup; + + /* Buffer access strategy used by leader process */ + BufferAccessStrategy bstrategy; + + /* + * Error reporting state. The error callback is set only for workers + * processes during parallel index vacuum. + */ + char *relnamespace; + char *relname; + char *indname; + PVIndVacStatus status; +}; + +static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, + bool *will_parallel_vacuum); +static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, + bool vacuum); +static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs); +static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs); +static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, + PVIndStats *indstats); +static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, + bool vacuum); +static void parallel_vacuum_error_callback(void *arg); + +/* + * Try to enter parallel mode and create a parallel context. Then initialize + * shared memory state. + * + * On success, return parallel vacuum state. Otherwise return NULL. + */ +ParallelVacuumState * +parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, + int nrequested_workers, int max_items, + int elevel, BufferAccessStrategy bstrategy) +{ + ParallelVacuumState *pvs; + ParallelContext *pcxt; + PVShared *shared; + VacDeadItems *dead_items; + PVIndStats *indstats; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + bool *will_parallel_vacuum; + Size est_indstats_len; + Size est_shared_len; + Size est_dead_items_len; + int nindexes_mwm = 0; + int parallel_workers = 0; + int querylen; + + /* + * A parallel vacuum must be requested and there must be indexes on the + * relation + */ + Assert(nrequested_workers >= 0); + Assert(nindexes > 0); + + /* + * Compute the number of parallel vacuum workers to launch + */ + will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); + parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes, + nrequested_workers, + will_parallel_vacuum); + if (parallel_workers <= 0) + { + /* Can't perform vacuum in parallel -- return NULL */ + pfree(will_parallel_vacuum); + return NULL; + } + + pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState)); + pvs->indrels = indrels; + pvs->nindexes = nindexes; + pvs->will_parallel_vacuum = will_parallel_vacuum; + pvs->bstrategy = bstrategy; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", + parallel_workers); + Assert(pcxt->nworkers > 0); + pvs->pcxt = pcxt; + + /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ + est_indstats_len = mul_size(sizeof(PVIndStats), nindexes); + shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ + est_shared_len = sizeof(PVShared); + shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ + est_dead_items_len = vac_max_items_to_alloc_size(max_items); + shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate space for BufferUsage and WalUsage -- + * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgBufferUsage or + * pgWalUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + InitializeParallelDSM(pcxt); + + /* Prepare index vacuum stats */ + indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len); + MemSet(indstats, 0, est_indstats_len); + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Cleanup option should be either disabled, always performing in + * parallel or conditionally performing in parallel. + */ + Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); + Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); + + if (!will_parallel_vacuum[i]) + continue; + + if (indrel->rd_indam->amusemaintenanceworkmem) + nindexes_mwm++; + + /* + * Remember the number of indexes that support parallel operation for + * each phase. + */ + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + pvs->nindexes_parallel_bulkdel++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) + pvs->nindexes_parallel_cleanup++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) + pvs->nindexes_parallel_condcleanup++; + } + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats); + pvs->indstats = indstats; + + /* Prepare shared information */ + shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); + MemSet(shared, 0, est_shared_len); + shared->relid = RelationGetRelid(rel); + shared->elevel = elevel; + shared->maintenance_work_mem_worker = + (nindexes_mwm > 0) ? + maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : + maintenance_work_mem; + + pg_atomic_init_u32(&(shared->cost_balance), 0); + pg_atomic_init_u32(&(shared->active_nworkers), 0); + pg_atomic_init_u32(&(shared->idx), 0); + + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); + pvs->shared = shared; + + /* Prepare the dead_items space */ + dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, + est_dead_items_len); + dead_items->max_items = max_items; + dead_items->num_items = 0; + MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); + pvs->dead_items = dead_items; + + /* + * Allocate space for each worker's BufferUsage and WalUsage; no need to + * initialize + */ + buffer_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); + pvs->buffer_usage = buffer_usage; + wal_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); + pvs->wal_usage = wal_usage; + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + sharedquery[querylen] = '\0'; + shm_toc_insert(pcxt->toc, + PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); + } + + /* Success -- return parallel vacuum state */ + return pvs; +} + +/* + * Destroy the parallel context, and end parallel mode. + * + * Since writes are not allowed during parallel mode, copy the + * updated index statistics from DSM into local memory and then later use that + * to update the index statistics. One might think that we can exit from + * parallel mode, update the index statistics and then destroy parallel + * context, but that won't be safe (see ExitParallelMode). + */ +void +parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) +{ + Assert(!IsParallelWorker()); + + /* Copy the updated statistics */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + if (indstats->istat_updated) + { + istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult)); + } + else + istats[i] = NULL; + } + + DestroyParallelContext(pvs->pcxt); + ExitParallelMode(); + + pfree(pvs->will_parallel_vacuum); + pfree(pvs); +} + +/* Returns the dead items space */ +VacDeadItems * +parallel_vacuum_get_dead_items(ParallelVacuumState *pvs) +{ + return pvs->dead_items; +} + +/* + * Do parallel index bulk-deletion with parallel workers. + */ +void +parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, + int num_index_scans) +{ + Assert(!IsParallelWorker()); + + /* + * We can only provide an approximate value of num_heap_tuples, at least + * for now. + */ + pvs->shared->reltuples = num_table_tuples; + pvs->shared->estimated_count = true; + + parallel_vacuum_process_all_indexes(pvs, num_index_scans, true); +} + +/* + * Do parallel index cleanup with parallel workers. + */ +void +parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, + int num_index_scans, bool estimated_count) +{ + Assert(!IsParallelWorker()); + + /* + * We can provide a better estimate of total number of surviving tuples + * (we assume indexes are more interested in that than in the number of + * nominally live tuples). + */ + pvs->shared->reltuples = num_table_tuples; + pvs->shared->estimated_count = estimated_count; + + parallel_vacuum_process_all_indexes(pvs, num_index_scans, false); +} + +/* + * Compute the number of parallel worker processes to request. Both index + * vacuum and index cleanup can be executed with parallel workers. + * The index is eligible for parallel vacuum iff its size is greater than + * min_parallel_index_scan_size as invoking workers for very small indexes + * can hurt performance. + * + * nrequested is the number of parallel workers that user requested. If + * nrequested is 0, we compute the parallel degree based on nindexes, that is + * the number of indexes that support parallel vacuum. This function also + * sets will_parallel_vacuum to remember indexes that participate in parallel + * vacuum. + */ +static int +parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, + bool *will_parallel_vacuum) +{ + int nindexes_parallel = 0; + int nindexes_parallel_bulkdel = 0; + int nindexes_parallel_cleanup = 0; + int parallel_workers; + + /* + * We don't allow performing parallel operation in standalone backend or + * when parallelism is disabled. + */ + if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + return 0; + + /* + * Compute the number of indexes that can participate in parallel vacuum. + */ + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* Skip index that is not a suitable target for parallel index vacuum */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + continue; + + will_parallel_vacuum[i] = true; + + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + nindexes_parallel_bulkdel++; + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + nindexes_parallel_cleanup++; + } + + nindexes_parallel = Max(nindexes_parallel_bulkdel, + nindexes_parallel_cleanup); + + /* The leader process takes one index */ + nindexes_parallel--; + + /* No index supports parallel vacuum */ + if (nindexes_parallel <= 0) + return 0; + + /* Compute the parallel degree */ + parallel_workers = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; + + /* Cap by max_parallel_maintenance_workers */ + parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + + return parallel_workers; +} + +/* + * Perform index vacuum or index cleanup with parallel workers. This function + * must be used by the parallel vacuum leader process. + */ +static void +parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, + bool vacuum) +{ + int nworkers; + PVIndVacStatus new_status; + + Assert(!IsParallelWorker()); + + if (vacuum) + { + new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; + + /* Determine the number of parallel workers to launch */ + nworkers = pvs->nindexes_parallel_bulkdel; + } + else + { + new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; + + /* Determine the number of parallel workers to launch */ + nworkers = pvs->nindexes_parallel_cleanup; + + /* Add conditionally parallel-aware indexes if in the first time call */ + if (num_index_scans == 0) + nworkers += pvs->nindexes_parallel_condcleanup; + } + + /* The leader process will participate */ + nworkers--; + + /* + * It is possible that parallel context is initialized with fewer workers + * than the number of indexes that need a separate worker in the current + * phase, so we need to consider it. See + * parallel_vacuum_compute_workers(). + */ + nworkers = Min(nworkers, pvs->pcxt->nworkers); + + /* + * Set index vacuum status and mark whether parallel vacuum worker can + * process it. + */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL); + indstats->status = new_status; + indstats->parallel_workers_can_process = + (pvs->will_parallel_vacuum[i] && + parallel_vacuum_index_is_parallel_safe(pvs->indrels[i], + num_index_scans, + vacuum)); + } + + /* Reset the parallel index processing counter */ + pg_atomic_write_u32(&(pvs->shared->idx), 0); + + /* Setup the shared cost-based vacuum delay and launch workers */ + if (nworkers > 0) + { + /* Reinitialize parallel context to relaunch parallel workers */ + if (num_index_scans > 0) + ReinitializeParallelDSM(pvs->pcxt); + + /* + * Set up shared cost balance and the number of active workers for + * vacuum delay. We need to do this before launching workers as + * otherwise, they might not see the updated values for these + * parameters. + */ + pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); + + /* + * The number of workers can vary between bulkdelete and cleanup + * phase. + */ + ReinitializeParallelWorkers(pvs->pcxt, nworkers); + + LaunchParallelWorkers(pvs->pcxt); + + if (pvs->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have + * already accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvs->shared->cost_balance); + VacuumActiveNWorkers = &(pvs->shared->active_nworkers); + } + + if (vacuum) + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", + "launched %d parallel vacuum workers for index vacuuming (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, nworkers))); + else + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", + "launched %d parallel vacuum workers for index cleanup (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, nworkers))); + } + + /* Vacuum the indexes that can be processed by only leader process */ + parallel_vacuum_process_unsafe_indexes(pvs); + + /* + * Join as a parallel worker. The leader vacuums alone processes all + * parallel-safe indexes in the case where no workers are launched. + */ + parallel_vacuum_process_safe_indexes(pvs); + + /* + * Next, accumulate buffer and WAL usage. (This must wait for the workers + * to finish, or we might get incomplete data.) + */ + if (nworkers > 0) + { + /* Wait for all vacuum workers to finish */ + WaitForParallelWorkersToFinish(pvs->pcxt); + + for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + } + + /* + * Reset all index status back to initial (while checking that we have + * vacuumed all indexes). + */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) + elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", + RelationGetRelationName(pvs->indrels[i])); + + indstats->status = PARALLEL_INDVAC_STATUS_INITIAL; + } + + /* + * Carry the shared balance value to heap scan and disable shared costing + */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } +} + +/* + * Index vacuum/cleanup routine used by the leader process and parallel + * vacuum worker processes to vacuum the indexes in parallel. + */ +static void +parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs) +{ + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + /* Loop until all indexes are vacuumed */ + for (;;) + { + int idx; + PVIndStats *indstats; + + /* Get an index number to process */ + idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1); + + /* Done for all indexes? */ + if (idx >= pvs->nindexes) + break; + + indstats = &(pvs->indstats[idx]); + + /* + * Skip vacuuming index that is unsafe for workers or has an + * unsuitable target for parallel index vacuum (this is vacuumed in + * parallel_vacuum_process_unsafe_indexes() by the leader). + */ + if (!indstats->parallel_workers_can_process) + continue; + + /* Do vacuum or cleanup of the index */ + parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Perform parallel vacuuming of indexes in leader process. + * + * Handles index vacuuming (or index cleanup) for indexes that are not + * parallel safe. It's possible that this will vary for a given index, based + * on details like whether we're performing index cleanup right now. + * + * Also performs vacuuming of smaller indexes that fell under the size cutoff + * enforced by parallel_vacuum_compute_workers(). + */ +static void +parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + /* Skip, indexes that are safe for workers */ + if (indstats->parallel_workers_can_process) + continue; + + /* Do vacuum or cleanup of the index */ + parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Vacuum or cleanup index either by leader process or by one of the worker + * process. After vacuuming the index this function copies the index + * statistics returned from ambulkdelete and amvacuumcleanup to the DSM + * segment. + */ +static void +parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, + PVIndStats *indstats) +{ + IndexBulkDeleteResult *istat = NULL; + IndexBulkDeleteResult *istat_res; + IndexVacuumInfo ivinfo; + + /* + * Update the pointer to the corresponding bulk-deletion result if someone + * has already updated it + */ + if (indstats->istat_updated) + istat = &(indstats->istat); + + ivinfo.index = indrel; + ivinfo.analyze_only = false; + ivinfo.report_progress = false; + ivinfo.message_level = DEBUG2; + ivinfo.estimated_count = pvs->shared->estimated_count; + ivinfo.num_heap_tuples = pvs->shared->reltuples; + ivinfo.strategy = pvs->bstrategy; + + /* Update error traceback information */ + pvs->indname = pstrdup(RelationGetRelationName(indrel)); + pvs->status = indstats->status; + + switch (indstats->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + istat_res = vac_cleanup_one_index(&ivinfo, istat); + break; + default: + elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", + indstats->status, + RelationGetRelationName(indrel)); + } + + /* + * Copy the index bulk-deletion result returned from ambulkdelete and + * amvacuumcleanup to the DSM segment if it's the first cycle because they + * allocate locally and it's possible that an index will be vacuumed by a + * different vacuum process the next cycle. Copying the result normally + * happens only the first time an index is vacuumed. For any additional + * vacuum pass, we directly point to the result on the DSM segment and + * pass it to vacuum index APIs so that workers can update it directly. + * + * Since all vacuum workers write the bulk-deletion result at different + * slots we can write them without locking. + */ + if (!indstats->istat_updated && istat_res != NULL) + { + memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); + indstats->istat_updated = true; + + /* Free the locally-allocated bulk-deletion result */ + pfree(istat_res); + } + + /* + * Update the status to completed. No need to lock here since each worker + * touches different indexes. + */ + indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; + + /* Reset error traceback information */ + pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED; + pfree(pvs->indname); + pvs->indname = NULL; +} + +/* + * Returns false, if the given index can't participate in the next execution of + * parallel index vacuum or parallel index cleanup. + */ +static bool +parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, + bool vacuum) +{ + uint8 vacoptions; + + vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* In parallel vacuum case, check if it supports parallel bulk-deletion */ + if (vacuum) + return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); + + /* Not safe, if the index does not support parallel cleanup */ + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) + return false; + + /* + * Not safe, if the index supports parallel cleanup conditionally, but we + * have already processed the index (for bulkdelete). We do this to avoid + * the need to invoke workers when parallel index cleanup doesn't need to + * scan the index. See the comments for option + * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support + * parallel cleanup conditionally. + */ + if (num_index_scans > 0 && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + return false; + + return true; +} + +/* + * Perform work within a launched parallel process. + * + * Since parallel vacuum workers perform only index vacuum or index cleanup, + * we don't need to report progress information. + */ +void +parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelVacuumState pvs; + Relation rel; + Relation *indrels; + PVIndStats *indstats; + PVShared *shared; + VacDeadItems *dead_items; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + int nindexes; + char *sharedquery; + ErrorContextCallback errcallback; + + /* + * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we + * don't support parallel vacuum for autovacuum as of now. + */ + Assert(MyProc->statusFlags == PROC_IN_VACUUM); + + elog(DEBUG1, "starting parallel vacuum worker"); + + shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); + + /* Set debug_query_string for individual workers */ + sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* + * Open table. The lock mode is the same as the leader process. It's + * okay because the lock mode does not conflict among the parallel + * workers. + */ + rel = table_open(shared->relid, ShareUpdateExclusiveLock); + + /* + * Open all indexes. indrels are sorted in order by OID, which should be + * matched to the leader's one. + */ + vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); + Assert(nindexes > 0); + + if (shared->maintenance_work_mem_worker > 0) + maintenance_work_mem = shared->maintenance_work_mem_worker; + + /* Set index statistics */ + indstats = (PVIndStats *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_INDEX_STATS, + false); + + /* Set dead_items space */ + dead_items = (VacDeadItems *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_DEAD_ITEMS, + false); + + /* Set cost-based vacuum delay */ + VacuumCostActive = (VacuumCostDelay > 0); + VacuumCostBalance = 0; + VacuumPageHit = 0; + VacuumPageMiss = 0; + VacuumPageDirty = 0; + VacuumCostBalanceLocal = 0; + VacuumSharedCostBalance = &(shared->cost_balance); + VacuumActiveNWorkers = &(shared->active_nworkers); + + /* Set parallel vacuum state */ + pvs.indrels = indrels; + pvs.nindexes = nindexes; + pvs.indstats = indstats; + pvs.shared = shared; + pvs.dead_items = dead_items; + pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel)); + pvs.relname = pstrdup(RelationGetRelationName(rel)); + + /* These fields will be filled during index vacuum or cleanup */ + pvs.indname = NULL; + pvs.status = PARALLEL_INDVAC_STATUS_INITIAL; + + /* Each parallel VACUUM worker gets its own access strategy */ + pvs.bstrategy = GetAccessStrategy(BAS_VACUUM); + + /* Setup error traceback support for ereport() */ + errcallback.callback = parallel_vacuum_error_callback; + errcallback.arg = &pvs; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* Process indexes to perform vacuum/cleanup */ + parallel_vacuum_process_safe_indexes(&pvs); + + /* Report buffer/WAL usage during parallel execution */ + buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); + wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + vac_close_indexes(nindexes, indrels, RowExclusiveLock); + table_close(rel, ShareUpdateExclusiveLock); + FreeAccessStrategy(pvs.bstrategy); +} + +/* + * Error context callback for errors occurring during parallel index vacuum. + * The error context messages should match the messages set in the lazy vacuum + * error context. If you change this function, change vacuum_error_callback() + * as well. + */ +static void +parallel_vacuum_error_callback(void *arg) +{ + ParallelVacuumState *errinfo = arg; + + switch (errinfo->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case PARALLEL_INDVAC_STATUS_INITIAL: + case PARALLEL_INDVAC_STATUS_COMPLETED: + default: + return; + } +} |