diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 14dc4f742384..aab2c967cece 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -30,9 +30,45 @@ #define SDB_TABLE_SIZE_EXTRA SDB_MAX #define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t)) +// Write-buffer size: batches per-row writes to reduce the number of write() syscalls. +#define SDB_WRITE_BUF_SIZE (256 * 1024) + // Forward declaration static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type); +// Append |dataLen| bytes from |data| to the write buffer. When the buffer is +// full it is flushed to the file first. Large chunks that exceed the buffer +// capacity are written directly without copying. +static int32_t sdbBufWrite(TdFilePtr pFile, char *wBuf, int32_t *wBufLen, const void *data, int32_t dataLen) { + if (*wBufLen + dataLen > SDB_WRITE_BUF_SIZE) { + if (*wBufLen > 0) { + if (taosWriteFile(pFile, wBuf, *wBufLen) != *wBufLen) { + return terrno; + } + *wBufLen = 0; + } + if (dataLen > SDB_WRITE_BUF_SIZE) { + if (taosWriteFile(pFile, data, dataLen) != dataLen) { + return terrno; + } + return 0; + } + } + memcpy(wBuf + *wBufLen, data, dataLen); + *wBufLen += dataLen; + return 0; +} + +// Flush any remaining bytes in the write buffer to the file. +static int32_t sdbFlushBuf(TdFilePtr pFile, char *wBuf, int32_t *wBufLen) { + if (*wBufLen == 0) return 0; + if (taosWriteFile(pFile, wBuf, *wBufLen) != *wBufLen) { + return terrno; + } + *wBufLen = 0; + return 0; +} + static int32_t sdbDeployData(SSdb *pSdb) { int32_t code = 0; mInfo("start to deploy sdb"); @@ -610,6 +646,35 @@ int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) { return code; } + // Perf-C: check encryption key once here, outside all loops, to avoid the + // overhead of calling taosWaitCfgKeyLoaded() once per row. + if (taosWaitCfgKeyLoaded() != 0) { + code = terrno; + mError("failed to wait for encryption keys since %s", tstrerror(code)); + int32_t ret = 0; + if ((ret = taosCloseFile(&pFile)) != 0) { + mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret)); + } + TAOS_RETURN(code); + } + + // Perf-B: write buffer — accumulates per-row writes and flushes in large + // chunks, reducing the number of write() syscalls from 3*N to ~N/K. + char *wBuf = taosMemoryMalloc(SDB_WRITE_BUF_SIZE); + if (wBuf == NULL) { + code = terrno; + int32_t ret = 0; + if ((ret = taosCloseFile(&pFile)) != 0) { + mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret)); + } + TAOS_RETURN(code); + } + int32_t wBufLen = 0; + + // Perf-A-sub: reusable encryption buffer — avoids per-row malloc/free. + char *encBuf = NULL; + int32_t encBufLen = 0; + for (int32_t i = SDB_MAX - 1; i >= 0; --i) { if (i == skip_type) continue; SdbEncodeFp encodeFp = pSdb->encodeFps[i]; @@ -617,106 +682,136 @@ int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) { mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); - SHashObj *hash = pSdb->hashObjs[i]; - sdbWriteLock(pSdb, i); + SHashObj *hash = pSdb->hashObjs[i]; + int32_t hashSize = taosHashGetSize(hash); + + // Perf-A: Phase 1 — snapshot row pointers under READ lock (not write lock). + // We only hold the lock for the fast in-memory hash traversal and release it + // before any file I/O, so concurrent CREATE TABLE / DROP TABLE are not + // blocked for the duration of the checkpoint write. + SArray *pRowList = taosArrayInit(hashSize > 0 ? hashSize : 16, sizeof(SSdbRow *)); + if (pRowList == NULL) { + code = terrno; + break; + } + sdbReadLock(pSdb, i); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { SSdbRow *pRow = *ppRow; - if (pRow == NULL) { - ppRow = taosHashIterate(hash, ppRow); - continue; - } - - if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) { - sdbPrintOper(pSdb, pRow, "not-write"); - ppRow = taosHashIterate(hash, ppRow); - continue; - } - - sdbPrintOper(pSdb, pRow, "write"); - - SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); - if (pRaw != NULL) { - pRaw->status = pRow->status; - - if (taosWriteFile(pFile, pRaw, sizeof(SSdbRaw)) != sizeof(SSdbRaw)) { - code = terrno; + if (pRow != NULL && + (pRow->status == SDB_STATUS_READY || pRow->status == SDB_STATUS_DROPPING)) { + // Pin the row so its memory stays valid after we release the lock. + (void)atomic_add_fetch_32(&pRow->refCount, 1); + if (taosArrayPush(pRowList, &pRow) == NULL) { + // OOM: undo the pin, stop the iteration, then propagate the error. + (void)atomic_sub_fetch_32(&pRow->refCount, 1); taosHashCancelIterate(hash, ppRow); - sdbFreeRaw(pRaw); + code = terrno; break; } + } + ppRow = taosHashIterate(hash, ppRow); + } + sdbUnLock(pSdb, i); // release READ lock immediately — file I/O happens below - int32_t newDataLen = pRaw->dataLen; - char *newData = pRaw->pData; + if (code != 0) { + // Unpin every row that was already snapshotted before the failure. + for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pRowList); j++) { + SSdbRow *pUnpinRow = *(SSdbRow **)taosArrayGet(pRowList, j); + sdbReleaseLock(pSdb, pUnpinRow->pObj, true); + } + taosArrayDestroy(pRowList); + break; + } - if (taosWaitCfgKeyLoaded() != 0) { - code = terrno; - taosHashCancelIterate(hash, ppRow); - sdbFreeRaw(pRaw); - break; - } + // Perf-A: Phase 2 — encode and write with no lock held. + // Pinned rows remain valid in memory even if concurrently deleted from the + // hash. Re-check status before encoding: a row that transitioned to DROPPED + // after the snapshot must not be written to the checkpoint file. + int32_t rowCount = (int32_t)taosArrayGetSize(pRowList); + for (int32_t j = 0; j < rowCount; j++) { + SSdbRow *pRow = *(SSdbRow **)taosArrayGet(pRowList, j); - if (tsMetaKey[0] != '\0') { - newDataLen = ENCRYPTED_LEN(pRaw->dataLen); - newData = taosMemoryMalloc(newDataLen); - if (newData == NULL) { - code = terrno; - taosHashCancelIterate(hash, ppRow); - sdbFreeRaw(pRaw); - break; - } + if (code == 0) { + if (pRow->status == SDB_STATUS_DROPPED) { + sdbPrintOper(pSdb, pRow, "not-write"); + } else { + sdbPrintOper(pSdb, pRow, "write"); + + SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); + if (pRaw == NULL) { + code = TSDB_CODE_APP_ERROR; + } else { + pRaw->status = pRow->status; + + int32_t newDataLen = pRaw->dataLen; + char *newData = pRaw->pData; + + if (tsMetaKey[0] != '\0') { + int32_t needed = ENCRYPTED_LEN(pRaw->dataLen); + // Perf-A-sub: grow the shared encryption buffer only when the + // current row is larger than any previously seen row. + if (needed > encBufLen) { + taosMemoryFree(encBuf); + encBuf = taosMemoryMalloc(needed); + if (encBuf == NULL) { + code = terrno; + encBufLen = 0; + } else { + encBufLen = needed; + } + } + if (code == 0) { + SCryptOpts opts = {0}; + opts.len = needed; + opts.source = pRaw->pData; + opts.result = encBuf; + opts.unitLen = 16; + opts.pOsslAlgrName = taosGetEncryptAlgoName(tsEncryptAlgorithmType); + tstrncpy(opts.key, tsMetaKey, ENCRYPT_KEY_LEN + 1); + int32_t count = CBC_Encrypt(&opts); + if (count <= 0) { + code = terrno; + } else { + newDataLen = needed; + newData = encBuf; + } + } + } + + if (code == 0) { + int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); + // Perf-B: merge header + data + checksum into the write buffer. + if ((code = sdbBufWrite(pFile, wBuf, &wBufLen, pRaw, sizeof(SSdbRaw))) == 0) + if ((code = sdbBufWrite(pFile, wBuf, &wBufLen, newData, newDataLen)) == 0) + code = sdbBufWrite(pFile, wBuf, &wBufLen, &cksum, sizeof(int32_t)); + } - SCryptOpts opts = {0}; - opts.len = newDataLen; - opts.source = pRaw->pData; - opts.result = newData; - opts.unitLen = 16; - opts.pOsslAlgrName = taosGetEncryptAlgoName(tsEncryptAlgorithmType); - tstrncpy(opts.key, tsMetaKey, ENCRYPT_KEY_LEN + 1); - - int32_t count = CBC_Encrypt(&opts); - if (count <= 0) { - code = terrno; - taosHashCancelIterate(hash, ppRow); sdbFreeRaw(pRaw); - break; } - - // mDebug("write sdb, CBC Encrypt encryptedDataLen:%d, dataLen:%d, %s", - // newDataLen, pRaw->dataLen, __FUNCTION__); - } - - if (taosWriteFile(pFile, newData, newDataLen) != newDataLen) { - code = terrno; - taosHashCancelIterate(hash, ppRow); - sdbFreeRaw(pRaw); - break; } + } - if (tsMetaKey[0] != '\0') { - taosMemoryFree(newData); - } + // Always unpin the row regardless of whether encoding/writing succeeded. + sdbReleaseLock(pSdb, pRow->pObj, true); + } - int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); - if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { - code = terrno; - taosHashCancelIterate(hash, ppRow); - sdbFreeRaw(pRaw); - break; - } - } else { - code = TSDB_CODE_APP_ERROR; - taosHashCancelIterate(hash, ppRow); - break; - } + taosArrayDestroy(pRowList); + if (code != 0) break; + } - sdbFreeRaw(pRaw); - ppRow = taosHashIterate(hash, ppRow); + // Flush any data remaining in the write buffer. + if (code == 0) { + code = sdbFlushBuf(pFile, wBuf, &wBufLen); + if (code != 0) { + mError("failed to flush write buffer to sdb file:%s since %s", tmpfile, tstrerror(code)); } - sdbUnLock(pSdb, i); } + taosMemoryFree(wBuf); + taosMemoryFree(encBuf); + if (code == 0) { code = taosFsyncFile(pFile); if (code != 0) { @@ -739,8 +834,8 @@ int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) { if (code != 0) { mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); } else { - pSdb->commitIndex = pSdb->applyIndex; - pSdb->commitTerm = pSdb->applyTerm; + pSdb->commitIndex = pSdb->applyIndex; + pSdb->commitTerm = pSdb->applyTerm; pSdb->commitConfig = pSdb->applyConfig; mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",