Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 177 additions & 82 deletions source/dnode/mnode/sdb/src/sdbFile.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,45 @@
#define SDB_TABLE_SIZE_EXTRA SDB_MAX
#define SDB_RESERVE_SIZE_EXTRA (512 - (SDB_TABLE_SIZE_EXTRA - SDB_TABLE_SIZE) * 2 * sizeof(int64_t))

// Write-buffer size: batches per-row writes to reduce the number of write() syscalls.
#define SDB_WRITE_BUF_SIZE (256 * 1024)

// Forward declaration
static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type);

// Append |dataLen| bytes from |data| to the write buffer. When the buffer is
// full it is flushed to the file first. Large chunks that exceed the buffer
// capacity are written directly without copying.
static int32_t sdbBufWrite(TdFilePtr pFile, char *wBuf, int32_t *wBufLen, const void *data, int32_t dataLen) {
if (*wBufLen + dataLen > SDB_WRITE_BUF_SIZE) {
if (*wBufLen > 0) {
if (taosWriteFile(pFile, wBuf, *wBufLen) != *wBufLen) {
return terrno;
}
*wBufLen = 0;
}
if (dataLen > SDB_WRITE_BUF_SIZE) {
if (taosWriteFile(pFile, data, dataLen) != dataLen) {
return terrno;
}
return 0;
}
}
memcpy(wBuf + *wBufLen, data, dataLen);
*wBufLen += dataLen;
return 0;
}

// Flush any remaining bytes in the write buffer to the file.
static int32_t sdbFlushBuf(TdFilePtr pFile, char *wBuf, int32_t *wBufLen) {
if (*wBufLen == 0) return 0;
if (taosWriteFile(pFile, wBuf, *wBufLen) != *wBufLen) {
return terrno;
}
*wBufLen = 0;
return 0;
}

static int32_t sdbDeployData(SSdb *pSdb) {
int32_t code = 0;
mInfo("start to deploy sdb");
Expand Down Expand Up @@ -610,113 +646,172 @@ int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
return code;
}

// Perf-C: check encryption key once here, outside all loops, to avoid the
// overhead of calling taosWaitCfgKeyLoaded() once per row.
if (taosWaitCfgKeyLoaded() != 0) {
code = terrno;
Comment on lines +651 to +652
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The return value of taosWaitCfgKeyLoaded() should be used directly to set the code variable, rather than relying on terrno. This is more robust as it avoids potential issues if terrno was not correctly set by the callee.

  if ((code = taosWaitCfgKeyLoaded()) != 0) {

mError("failed to wait for encryption keys since %s", tstrerror(code));
int32_t ret = 0;
if ((ret = taosCloseFile(&pFile)) != 0) {
mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret));
}
TAOS_RETURN(code);
}

// Perf-B: write buffer — accumulates per-row writes and flushes in large
// chunks, reducing the number of write() syscalls from 3*N to ~N/K.
char *wBuf = taosMemoryMalloc(SDB_WRITE_BUF_SIZE);
if (wBuf == NULL) {
code = terrno;
int32_t ret = 0;
if ((ret = taosCloseFile(&pFile)) != 0) {
mError("failed to close sdb file:%s since %s", tmpfile, tstrerror(ret));
}
TAOS_RETURN(code);
}
int32_t wBufLen = 0;

// Perf-A-sub: reusable encryption buffer — avoids per-row malloc/free.
char *encBuf = NULL;
int32_t encBufLen = 0;

for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
if (i == skip_type) continue;
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue;

mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));

SHashObj *hash = pSdb->hashObjs[i];
sdbWriteLock(pSdb, i);
SHashObj *hash = pSdb->hashObjs[i];
int32_t hashSize = taosHashGetSize(hash);

// Perf-A: Phase 1 — snapshot row pointers under READ lock (not write lock).
// We only hold the lock for the fast in-memory hash traversal and release it
// before any file I/O, so concurrent CREATE TABLE / DROP TABLE are not
// blocked for the duration of the checkpoint write.
SArray *pRowList = taosArrayInit(hashSize > 0 ? hashSize : 16, sizeof(SSdbRow *));
if (pRowList == NULL) {
code = terrno;
break;
}

sdbReadLock(pSdb, i);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL) {
ppRow = taosHashIterate(hash, ppRow);
continue;
}

if (pRow->status != SDB_STATUS_READY && pRow->status != SDB_STATUS_DROPPING) {
sdbPrintOper(pSdb, pRow, "not-write");
ppRow = taosHashIterate(hash, ppRow);
continue;
}

sdbPrintOper(pSdb, pRow, "write");

SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
if (pRaw != NULL) {
pRaw->status = pRow->status;

if (taosWriteFile(pFile, pRaw, sizeof(SSdbRaw)) != sizeof(SSdbRaw)) {
code = terrno;
if (pRow != NULL &&
(pRow->status == SDB_STATUS_READY || pRow->status == SDB_STATUS_DROPPING)) {
// Pin the row so its memory stays valid after we release the lock.
(void)atomic_add_fetch_32(&pRow->refCount, 1);
if (taosArrayPush(pRowList, &pRow) == NULL) {
// OOM: undo the pin, stop the iteration, then propagate the error.
(void)atomic_sub_fetch_32(&pRow->refCount, 1);
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
code = terrno;
break;
}
}
ppRow = taosHashIterate(hash, ppRow);
}
sdbUnLock(pSdb, i); // release READ lock immediately — file I/O happens below

int32_t newDataLen = pRaw->dataLen;
char *newData = pRaw->pData;
if (code != 0) {
// Unpin every row that was already snapshotted before the failure.
for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pRowList); j++) {
SSdbRow *pUnpinRow = *(SSdbRow **)taosArrayGet(pRowList, j);
sdbReleaseLock(pSdb, pUnpinRow->pObj, true);
}
taosArrayDestroy(pRowList);
break;
}

if (taosWaitCfgKeyLoaded() != 0) {
code = terrno;
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
}
// Perf-A: Phase 2 — encode and write with no lock held.
// Pinned rows remain valid in memory even if concurrently deleted from the
// hash. Re-check status before encoding: a row that transitioned to DROPPED
// after the snapshot must not be written to the checkpoint file.
Comment on lines +728 to +731
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Releasing the table lock before encoding and writing rows introduces a "fuzzy checkpoint" behavior. While this eliminates lock contention, it means that concurrent updates to the same row could result in inconsistent data being written to the checkpoint file (e.g., a row containing a mix of old and new field values if the encoding process is not atomic relative to updates). If the system requires strict consistency for the checkpoint file, the lock should be held during the encoding phase, or an MVCC-like mechanism should be used.

int32_t rowCount = (int32_t)taosArrayGetSize(pRowList);
for (int32_t j = 0; j < rowCount; j++) {
SSdbRow *pRow = *(SSdbRow **)taosArrayGet(pRowList, j);

if (tsMetaKey[0] != '\0') {
newDataLen = ENCRYPTED_LEN(pRaw->dataLen);
newData = taosMemoryMalloc(newDataLen);
if (newData == NULL) {
code = terrno;
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
}
if (code == 0) {
if (pRow->status == SDB_STATUS_DROPPED) {
sdbPrintOper(pSdb, pRow, "not-write");
} else {
sdbPrintOper(pSdb, pRow, "write");

SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
if (pRaw == NULL) {
Comment on lines +728 to +743
code = TSDB_CODE_APP_ERROR;
} else {
pRaw->status = pRow->status;

int32_t newDataLen = pRaw->dataLen;
char *newData = pRaw->pData;

if (tsMetaKey[0] != '\0') {
int32_t needed = ENCRYPTED_LEN(pRaw->dataLen);
// Perf-A-sub: grow the shared encryption buffer only when the
// current row is larger than any previously seen row.
if (needed > encBufLen) {
taosMemoryFree(encBuf);
encBuf = taosMemoryMalloc(needed);
if (encBuf == NULL) {
code = terrno;
encBufLen = 0;
} else {
encBufLen = needed;
}
}
Comment on lines +755 to +764
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of freeing and re-allocating the encryption buffer when a larger size is needed, consider using taosMemoryRealloc. This is generally more efficient as it may avoid a full copy and re-allocation if the current memory block can be extended in place.

              if (needed > encBufLen) {
                char *tmp = taosMemoryRealloc(encBuf, needed);
                if (tmp == NULL) {
                  code = terrno;
                } else {
                  encBuf = tmp;
                  encBufLen = needed;
                }
              }

if (code == 0) {
SCryptOpts opts = {0};
opts.len = needed;
opts.source = pRaw->pData;
opts.result = encBuf;
opts.unitLen = 16;
opts.pOsslAlgrName = taosGetEncryptAlgoName(tsEncryptAlgorithmType);
tstrncpy(opts.key, tsMetaKey, ENCRYPT_KEY_LEN + 1);
int32_t count = CBC_Encrypt(&opts);
if (count <= 0) {
code = terrno;
} else {
newDataLen = needed;
newData = encBuf;
}
}
}

if (code == 0) {
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
// Perf-B: merge header + data + checksum into the write buffer.
if ((code = sdbBufWrite(pFile, wBuf, &wBufLen, pRaw, sizeof(SSdbRaw))) == 0)
if ((code = sdbBufWrite(pFile, wBuf, &wBufLen, newData, newDataLen)) == 0)
code = sdbBufWrite(pFile, wBuf, &wBufLen, &cksum, sizeof(int32_t));
}

SCryptOpts opts = {0};
opts.len = newDataLen;
opts.source = pRaw->pData;
opts.result = newData;
opts.unitLen = 16;
opts.pOsslAlgrName = taosGetEncryptAlgoName(tsEncryptAlgorithmType);
tstrncpy(opts.key, tsMetaKey, ENCRYPT_KEY_LEN + 1);

int32_t count = CBC_Encrypt(&opts);
if (count <= 0) {
code = terrno;
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
}

// mDebug("write sdb, CBC Encrypt encryptedDataLen:%d, dataLen:%d, %s",
// newDataLen, pRaw->dataLen, __FUNCTION__);
}

if (taosWriteFile(pFile, newData, newDataLen) != newDataLen) {
code = terrno;
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
}
}

if (tsMetaKey[0] != '\0') {
taosMemoryFree(newData);
}
// Always unpin the row regardless of whether encoding/writing succeeded.
sdbReleaseLock(pSdb, pRow->pObj, true);
}

int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
if (taosWriteFile(pFile, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
code = terrno;
taosHashCancelIterate(hash, ppRow);
sdbFreeRaw(pRaw);
break;
}
} else {
code = TSDB_CODE_APP_ERROR;
taosHashCancelIterate(hash, ppRow);
break;
}
taosArrayDestroy(pRowList);
if (code != 0) break;
}

sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow);
// Flush any data remaining in the write buffer.
if (code == 0) {
code = sdbFlushBuf(pFile, wBuf, &wBufLen);
if (code != 0) {
mError("failed to flush write buffer to sdb file:%s since %s", tmpfile, tstrerror(code));
}
sdbUnLock(pSdb, i);
}

taosMemoryFree(wBuf);
taosMemoryFree(encBuf);

if (code == 0) {
code = taosFsyncFile(pFile);
if (code != 0) {
Expand All @@ -739,8 +834,8 @@ int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
if (code != 0) {
mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
} else {
pSdb->commitIndex = pSdb->applyIndex;
pSdb->commitTerm = pSdb->applyTerm;
pSdb->commitIndex = pSdb->applyIndex;
pSdb->commitTerm = pSdb->applyTerm;
pSdb->commitConfig = pSdb->applyConfig;
mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
" file:%s",
Expand Down
Loading