diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
commit | 311bcfc6b3acdd6fd152798c7f287ddf74fa2a98 (patch) | |
tree | 0ec307299b1dada3701e42f4ca6eda57d708261e /src/backend/storage/buffer/localbuf.c | |
parent | Initial commit. (diff) | |
download | postgresql-15-upstream.tar.xz postgresql-15-upstream.zip |
Adding upstream version 15.4.upstream/15.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/storage/buffer/localbuf.c')
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c new file mode 100644 index 0000000..e71f95a --- /dev/null +++ b/src/backend/storage/buffer/localbuf.c @@ -0,0 +1,596 @@ +/*------------------------------------------------------------------------- + * + * localbuf.c + * local buffer manager. Fast buffer manager for temporary tables, + * which never need to be WAL-logged or checkpointed, etc. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994-5, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/buffer/localbuf.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/parallel.h" +#include "catalog/catalog.h" +#include "executor/instrument.h" +#include "storage/buf_internals.h" +#include "storage/bufmgr.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner_private.h" + + +/*#define LBDEBUG*/ + +/* entry for buffer lookup hashtable */ +typedef struct +{ + BufferTag key; /* Tag of a disk page */ + int id; /* Associated local buffer's index */ +} LocalBufferLookupEnt; + +/* Note: this macro only works on local buffers, not shared ones! */ +#define LocalBufHdrGetBlock(bufHdr) \ + LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] + +int NLocBuffer = 0; /* until buffers are initialized */ + +BufferDesc *LocalBufferDescriptors = NULL; +Block *LocalBufferBlockPointers = NULL; +int32 *LocalRefCount = NULL; + +static int nextFreeLocalBuf = 0; + +static HTAB *LocalBufHash = NULL; + + +static void InitLocalBuffers(void); +static Block GetLocalBufferStorage(void); + + +/* + * PrefetchLocalBuffer - + * initiate asynchronous read of a block of a relation + * + * Do PrefetchBuffer's work for temporary relations. + * No-op if prefetching isn't compiled in. + */ +PrefetchBufferResult +PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, + BlockNumber blockNum) +{ + PrefetchBufferResult result = {InvalidBuffer, false}; + BufferTag newTag; /* identity of requested block */ + LocalBufferLookupEnt *hresult; + + INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); + + /* Initialize local buffers if first request in this session */ + if (LocalBufHash == NULL) + InitLocalBuffers(); + + /* See if the desired buffer already exists */ + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL); + + if (hresult) + { + /* Yes, so nothing to do */ + result.recent_buffer = -hresult->id - 1; + } + else + { +#ifdef USE_PREFETCH + /* Not in buffers, so initiate prefetch */ + smgrprefetch(smgr, forkNum, blockNum); + result.initiated_io = true; +#endif /* USE_PREFETCH */ + } + + return result; +} + + +/* + * LocalBufferAlloc - + * Find or create a local buffer for the given page of the given relation. + * + * API is similar to bufmgr.c's BufferAlloc, except that we do not need + * to do any locking since this is all local. Also, IO_IN_PROGRESS + * does not get set. Lastly, we support only default access strategy + * (hence, usage_count is always advanced). + */ +BufferDesc * +LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, + bool *foundPtr) +{ + BufferTag newTag; /* identity of requested block */ + LocalBufferLookupEnt *hresult; + BufferDesc *bufHdr; + int b; + int trycounter; + bool found; + uint32 buf_state; + + INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); + + /* Initialize local buffers if first request in this session */ + if (LocalBufHash == NULL) + InitLocalBuffers(); + + /* See if the desired buffer already exists */ + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL); + + if (hresult) + { + b = hresult->id; + bufHdr = GetLocalBufferDescriptor(b); + Assert(BUFFERTAGS_EQUAL(bufHdr->tag, newTag)); +#ifdef LBDEBUG + fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", + smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); +#endif + buf_state = pg_atomic_read_u32(&bufHdr->state); + + /* this part is equivalent to PinBuffer for a shared buffer */ + if (LocalRefCount[b] == 0) + { + if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) + { + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + } + LocalRefCount[b]++; + ResourceOwnerRememberBuffer(CurrentResourceOwner, + BufferDescriptorGetBuffer(bufHdr)); + if (buf_state & BM_VALID) + *foundPtr = true; + else + { + /* Previous read attempt must have failed; try again */ + *foundPtr = false; + } + return bufHdr; + } + +#ifdef LBDEBUG + fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", + smgr->smgr_rnode.node.relNode, forkNum, blockNum, + -nextFreeLocalBuf - 1); +#endif + + /* + * Need to get a new buffer. We use a clock sweep algorithm (essentially + * the same as what freelist.c does now...) + */ + trycounter = NLocBuffer; + for (;;) + { + b = nextFreeLocalBuf; + + if (++nextFreeLocalBuf >= NLocBuffer) + nextFreeLocalBuf = 0; + + bufHdr = GetLocalBufferDescriptor(b); + + if (LocalRefCount[b] == 0) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) + { + buf_state -= BUF_USAGECOUNT_ONE; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + trycounter = NLocBuffer; + } + else + { + /* Found a usable buffer */ + LocalRefCount[b]++; + ResourceOwnerRememberBuffer(CurrentResourceOwner, + BufferDescriptorGetBuffer(bufHdr)); + break; + } + } + else if (--trycounter == 0) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("no empty local buffer available"))); + } + + /* + * this buffer is not referenced but it might still be dirty. if that's + * the case, write it out before reusing it! + */ + if (buf_state & BM_DIRTY) + { + SMgrRelation oreln; + Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); + + /* Find smgr relation for buffer */ + oreln = smgropen(bufHdr->tag.rnode, MyBackendId); + + PageSetChecksumInplace(localpage, bufHdr->tag.blockNum); + + /* And write... */ + smgrwrite(oreln, + bufHdr->tag.forkNum, + bufHdr->tag.blockNum, + localpage, + false); + + /* Mark not-dirty now in case we error out below */ + buf_state &= ~BM_DIRTY; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + + pgBufferUsage.local_blks_written++; + } + + /* + * lazy memory allocation: allocate space on first use of a buffer. + */ + if (LocalBufHdrGetBlock(bufHdr) == NULL) + { + /* Set pointer for use by BufferGetBlock() macro */ + LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage(); + } + + /* + * Update the hash table: remove old entry, if any, and make new one. + */ + if (buf_state & BM_TAG_VALID) + { + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &bufHdr->tag, + HASH_REMOVE, NULL); + if (!hresult) /* shouldn't happen */ + elog(ERROR, "local buffer hash table corrupted"); + /* mark buffer invalid just in case hash insert fails */ + CLEAR_BUFFERTAG(bufHdr->tag); + buf_state &= ~(BM_VALID | BM_TAG_VALID); + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found); + if (found) /* shouldn't happen */ + elog(ERROR, "local buffer hash table corrupted"); + hresult->id = b; + + /* + * it's all ours now. + */ + bufHdr->tag = newTag; + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); + buf_state |= BM_TAG_VALID; + buf_state &= ~BUF_USAGECOUNT_MASK; + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + + *foundPtr = false; + return bufHdr; +} + +/* + * MarkLocalBufferDirty - + * mark a local buffer dirty + */ +void +MarkLocalBufferDirty(Buffer buffer) +{ + int bufid; + BufferDesc *bufHdr; + uint32 buf_state; + + Assert(BufferIsLocal(buffer)); + +#ifdef LBDEBUG + fprintf(stderr, "LB DIRTY %d\n", buffer); +#endif + + bufid = -(buffer + 1); + + Assert(LocalRefCount[bufid] > 0); + + bufHdr = GetLocalBufferDescriptor(bufid); + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if (!(buf_state & BM_DIRTY)) + pgBufferUsage.local_blks_dirtied++; + + buf_state |= BM_DIRTY; + + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); +} + +/* + * DropRelFileNodeLocalBuffers + * This function removes from the buffer pool all the pages of the + * specified relation that have block numbers >= firstDelBlock. + * (In particular, with firstDelBlock = 0, all pages are removed.) + * Dirty pages are simply dropped, without bothering to write them + * out first. Therefore, this is NOT rollback-able, and so should be + * used only with extreme caution! + * + * See DropRelFileNodeBuffers in bufmgr.c for more notes. + */ +void +DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, + BlockNumber firstDelBlock) +{ + int i; + + for (i = 0; i < NLocBuffer; i++) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(i); + LocalBufferLookupEnt *hresult; + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if ((buf_state & BM_TAG_VALID) && + RelFileNodeEquals(bufHdr->tag.rnode, rnode) && + bufHdr->tag.forkNum == forkNum && + bufHdr->tag.blockNum >= firstDelBlock) + { + if (LocalRefCount[i] != 0) + elog(ERROR, "block %u of %s is still referenced (local %u)", + bufHdr->tag.blockNum, + relpathbackend(bufHdr->tag.rnode, MyBackendId, + bufHdr->tag.forkNum), + LocalRefCount[i]); + /* Remove entry from hashtable */ + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &bufHdr->tag, + HASH_REMOVE, NULL); + if (!hresult) /* shouldn't happen */ + elog(ERROR, "local buffer hash table corrupted"); + /* Mark buffer invalid */ + CLEAR_BUFFERTAG(bufHdr->tag); + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + } +} + +/* + * DropRelFileNodeAllLocalBuffers + * This function removes from the buffer pool all pages of all forks + * of the specified relation. + * + * See DropRelFileNodesAllBuffers in bufmgr.c for more notes. + */ +void +DropRelFileNodeAllLocalBuffers(RelFileNode rnode) +{ + int i; + + for (i = 0; i < NLocBuffer; i++) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(i); + LocalBufferLookupEnt *hresult; + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if ((buf_state & BM_TAG_VALID) && + RelFileNodeEquals(bufHdr->tag.rnode, rnode)) + { + if (LocalRefCount[i] != 0) + elog(ERROR, "block %u of %s is still referenced (local %u)", + bufHdr->tag.blockNum, + relpathbackend(bufHdr->tag.rnode, MyBackendId, + bufHdr->tag.forkNum), + LocalRefCount[i]); + /* Remove entry from hashtable */ + hresult = (LocalBufferLookupEnt *) + hash_search(LocalBufHash, (void *) &bufHdr->tag, + HASH_REMOVE, NULL); + if (!hresult) /* shouldn't happen */ + elog(ERROR, "local buffer hash table corrupted"); + /* Mark buffer invalid */ + CLEAR_BUFFERTAG(bufHdr->tag); + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + } +} + +/* + * InitLocalBuffers - + * init the local buffer cache. Since most queries (esp. multi-user ones) + * don't involve local buffers, we delay allocating actual memory for the + * buffers until we need them; just make the buffer headers here. + */ +static void +InitLocalBuffers(void) +{ + int nbufs = num_temp_buffers; + HASHCTL info; + int i; + + /* + * Parallel workers can't access data in temporary tables, because they + * have no visibility into the local buffers of their leader. This is a + * convenient, low-cost place to provide a backstop check for that. Note + * that we don't wish to prevent a parallel worker from accessing catalog + * metadata about a temp table, so checks at higher levels would be + * inappropriate. + */ + if (IsParallelWorker()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot access temporary tables during a parallel operation"))); + + /* Allocate and zero buffer headers and auxiliary arrays */ + LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc)); + LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block)); + LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32)); + if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + nextFreeLocalBuf = 0; + + /* initialize fields that need to start off nonzero */ + for (i = 0; i < nbufs; i++) + { + BufferDesc *buf = GetLocalBufferDescriptor(i); + + /* + * negative to indicate local buffer. This is tricky: shared buffers + * start with 0. We have to start with -2. (Note that the routine + * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id + * is -1.) + */ + buf->buf_id = -i - 2; + + /* + * Intentionally do not initialize the buffer's atomic variable + * (besides zeroing the underlying memory above). That way we get + * errors on platforms without atomics, if somebody (re-)introduces + * atomic operations for local buffers. + */ + } + + /* Create the lookup hash table */ + info.keysize = sizeof(BufferTag); + info.entrysize = sizeof(LocalBufferLookupEnt); + + LocalBufHash = hash_create("Local Buffer Lookup Table", + nbufs, + &info, + HASH_ELEM | HASH_BLOBS); + + if (!LocalBufHash) + elog(ERROR, "could not initialize local buffer hash table"); + + /* Initialization done, mark buffers allocated */ + NLocBuffer = nbufs; +} + +/* + * GetLocalBufferStorage - allocate memory for a local buffer + * + * The idea of this function is to aggregate our requests for storage + * so that the memory manager doesn't see a whole lot of relatively small + * requests. Since we'll never give back a local buffer once it's created + * within a particular process, no point in burdening memmgr with separately + * managed chunks. + */ +static Block +GetLocalBufferStorage(void) +{ + static char *cur_block = NULL; + static int next_buf_in_block = 0; + static int num_bufs_in_block = 0; + static int total_bufs_allocated = 0; + static MemoryContext LocalBufferContext = NULL; + + char *this_buf; + + Assert(total_bufs_allocated < NLocBuffer); + + if (next_buf_in_block >= num_bufs_in_block) + { + /* Need to make a new request to memmgr */ + int num_bufs; + + /* + * We allocate local buffers in a context of their own, so that the + * space eaten for them is easily recognizable in MemoryContextStats + * output. Create the context on first use. + */ + if (LocalBufferContext == NULL) + LocalBufferContext = + AllocSetContextCreate(TopMemoryContext, + "LocalBufferContext", + ALLOCSET_DEFAULT_SIZES); + + /* Start with a 16-buffer request; subsequent ones double each time */ + num_bufs = Max(num_bufs_in_block * 2, 16); + /* But not more than what we need for all remaining local bufs */ + num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated); + /* And don't overflow MaxAllocSize, either */ + num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ); + + cur_block = (char *) MemoryContextAlloc(LocalBufferContext, + num_bufs * BLCKSZ); + next_buf_in_block = 0; + num_bufs_in_block = num_bufs; + } + + /* Allocate next buffer in current memory block */ + this_buf = cur_block + next_buf_in_block * BLCKSZ; + next_buf_in_block++; + total_bufs_allocated++; + + return (Block) this_buf; +} + +/* + * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins + * + * This is just like CheckForBufferLeaks(), but for local buffers. + */ +static void +CheckForLocalBufferLeaks(void) +{ +#ifdef USE_ASSERT_CHECKING + if (LocalRefCount) + { + int RefCountErrors = 0; + int i; + + for (i = 0; i < NLocBuffer; i++) + { + if (LocalRefCount[i] != 0) + { + Buffer b = -i - 1; + + PrintBufferLeakWarning(b); + RefCountErrors++; + } + } + Assert(RefCountErrors == 0); + } +#endif +} + +/* + * AtEOXact_LocalBuffers - clean up at end of transaction. + * + * This is just like AtEOXact_Buffers, but for local buffers. + */ +void +AtEOXact_LocalBuffers(bool isCommit) +{ + CheckForLocalBufferLeaks(); +} + +/* + * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit. + * + * This is just like AtProcExit_Buffers, but for local buffers. + */ +void +AtProcExit_LocalBuffers(void) +{ + /* + * We shouldn't be holding any remaining pins; if we are, and assertions + * aren't enabled, we'll fail later in DropRelFileNodeBuffers while trying + * to drop the temp rels. + */ + CheckForLocalBufferLeaks(); +} |