Feat/dmchen/6984169049 improve long snapshot 3.3.6#35329
Conversation
There was a problem hiding this comment.
Pull request overview
This PR optimizes "long snapshot" replication for vnodes by letting the follower describe which TSDB files it is missing and having the leader filter the snapshot stream to only those file sets / files. A new SNAP_DATA_MISSING_FIDS sub-TLV transports a (fid, ftype) list; both the normal and RAW TSDB snap readers learn to filter file sets by an extracted missing-fid set, and the RAW reader additionally filters individual files via a borrowed hash. Several defensive changes also relax the "unknown subfield" handling from hard-error to warn-and-continue, drop the sver == 0 guard on RAW-format selection, and remove the unused TSDB_SNAP_REP_FMT_HYBRID enumerator.
Changes:
- Introduces missing-file detection on the follower (
tsdbDetectMissingFiles) and a TLV (SNAP_DATA_MISSING_FIDS) plus serialize/deserialize/extract helpers intsdbSnapInfo.c. - Threads
missingFids/missingFileHashfilters throughtsdbSnapReaderOpen,tsdbSnapRAWReaderOpen,tsdbFSCreateRefRangedSnapshot, and addstsdbFSCreateRefSnapshotWithRangesto limit which file sets/files are sent. - Relaxes RAW-mode selection (removes
sver == 0) and demotes unknown-subfield errors to warnings; removesTSDB_SNAP_REP_FMT_HYBRID.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| source/dnode/vnode/src/inc/tsdb.h | Removes HYBRID format; adds missing-file API and tsdbMissingFileKey inline. |
| source/dnode/vnode/src/inc/vnodeInt.h | Adds SNAP_DATA_MISSING_FIDS=16 and updates tsdbSnap*ReaderOpen signatures. |
| source/dnode/vnode/src/sma/smaSnapshot.c | Passes new missingFids/count args (NULL/0) to tsdbSnapReaderOpen. |
| source/dnode/vnode/src/tsdb/tsdbFS2.h | Declares tsdbFSCreateRefSnapshotWithRanges and updates ranged ref signature. |
| source/dnode/vnode/src/tsdb/tsdbFS2.c | Implements ranged ref snapshot and adds missing-fid filtering inside tsdbFSCreateRefRangedSnapshot. |
| source/dnode/vnode/src/tsdb/tsdbSnapInfo.c | Implements detect/serialize/deserialize of missing files, fid extraction, and TLV emission in tsdbSnapPrepDescription. |
| source/dnode/vnode/src/tsdb/tsdbSnapshot.c | Adds missing-fid filter to the normal TSDB snap reader open/read/close. |
| source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c | Adds per-file (missingFileHash) and per-fid filtering to the RAW reader. |
| source/dnode/vnode/src/vnd/vnodeSnapshot.c | Receives SNAP_DATA_MISSING_FIDS, extracts fids, drops sver==0 guard, threads filters into readers, and relaxes unknown-typ handling. |
Comments suppressed due to low confidence (4)
source/dnode/vnode/src/tsdb/tsdbSnapInfo.c:750
tsdbDetectMissingFilesholdspTsdb->mutexwhile iterating every file set and callingtaosCheckExistFile(a stat() syscall) on eachfarrentry and every STT file inlvlArr. On vnodes with many fsets and stt files this can perform thousands of blocking syscalls under the lock, stalling concurrent commit/compaction/query operations. Consider snapshotting the file list under the lock and performing existence checks outside, or using cached presence state from the FS, rather than hitting the filesystem while holding the global tsdb mutex.
(void)taosThreadMutexLock(&pTsdb->mutex);
STFileSet* fset;
TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) {
// check farr entries (HEAD, DATA, SMA, TOMB)
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] != NULL) {
if (!taosCheckExistFile(fset->farr[ftype]->fname)) {
if (fileCount >= fileCap) {
int32_t newCap = fileCap == 0 ? 16 : fileCap * 2;
STsdbMissingFile* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbMissingFile));
if (tmp == NULL) {
code = terrno;
goto _unlock;
}
files = tmp;
fileCap = newCap;
}
files[fileCount].fid = fset->fid;
files[fileCount].ftype = ftype;
fileCount++;
}
}
}
// check STT files in lvlArr
SSttLvl* lvl;
bool sttMissing = false;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (!taosCheckExistFile(fobj->fname)) {
sttMissing = true;
break;
}
}
if (sttMissing) break;
}
if (sttMissing) {
if (fileCount >= fileCap) {
int32_t newCap = fileCap == 0 ? 16 : fileCap * 2;
STsdbMissingFile* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbMissingFile));
if (tmp == NULL) {
code = terrno;
goto _unlock;
}
files = tmp;
fileCap = newCap;
}
files[fileCount].fid = fset->fid;
files[fileCount].ftype = TSDB_FTYPE_STT;
fileCount++;
}
}
_unlock:
(void)taosThreadMutexUnlock(&pTsdb->mutex);
source/dnode/vnode/src/tsdb/tsdbSnapInfo.c:677
- The dedup loop (lines 642-648) is O(N²) over all hash entries, and the sort below is a bubble sort (lines 667-676), also O(N²). For a vnode with a large number of missing fsets this is unnecessarily expensive. Since the input is dedup'd by
(fid<<32)|ftypekeys, multiple ftypes per fid produce duplicates; consider extracting all fids first and then usingtaosSort/qsortfollowed by a single linear pass to dedup. This also avoids the realloc-on-every-new-fid pattern.
bool exists = false;
for (int32_t i = 0; i < fidCount; ++i) {
if (fids[i] == fid) {
exists = true;
break;
}
}
if (exists) continue;
if (fidCount >= fidCap) {
int32_t newCap = fidCap == 0 ? 16 : fidCap * 2;
int32_t* tmp = taosMemoryRealloc(fids, newCap * sizeof(int32_t));
if (tmp == NULL) {
code = terrno;
taosHashCancelIterate(missingFileHash, pIter);
taosMemoryFree(fids);
return code;
}
fids = tmp;
fidCap = newCap;
}
fids[fidCount++] = fid;
}
// sort fids for binary search
if (fidCount > 1) {
for (int32_t i = 0; i < fidCount - 1; ++i) {
for (int32_t j = i + 1; j < fidCount; ++j) {
if (fids[i] > fids[j]) {
int32_t tmp = fids[i];
fids[i] = fids[j];
fids[j] = tmp;
}
}
}
}
source/dnode/vnode/src/tsdb/tsdbSnapInfo.c:746
- Inside the inner
TARRAY2_FOREACH(lvl->fobjArr, fobj)loop, every stt file in every level is checked, but as soon as one stt file is missing the same(fset->fid, TSDB_FTYPE_STT)entry is appended. There is no per-(fid,level) granularity, so a single missing stt forces the leader to resend all stt files for that fid. If this is intentional, fine — but please add a comment, because the per-ftype handling above suggests finer granularity was intended elsewhere. AlsosttMissingis only set true once and there is no benefit to continuing the outer level loop after the break — theif (sttMissing) break;inside followed by anotherif (sttMissing) break;outside is correct but a bit confusing; a labeled break or restructure would be clearer.
SSttLvl* lvl;
bool sttMissing = false;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (!taosCheckExistFile(fobj->fname)) {
sttMissing = true;
break;
}
}
if (sttMissing) break;
}
if (sttMissing) {
if (fileCount >= fileCap) {
int32_t newCap = fileCap == 0 ? 16 : fileCap * 2;
STsdbMissingFile* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbMissingFile));
if (tmp == NULL) {
code = terrno;
goto _unlock;
}
files = tmp;
fileCap = newCap;
}
files[fileCount].fid = fset->fid;
files[fileCount].ftype = TSDB_FTYPE_STT;
fileCount++;
}
source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c:184
tsdbInfo(...)is logged for every included fid/file and for every included stt file intsdbSnapRAWReadFileSetOpenReader(lines 153, 182), and similarly intsdbFSCreateRefRangedSnapshot(line 1249 was demoted fromtsdbDebugtotsdbInfo). For databases with many fsets this can flood the log on every snapshot. Consider keeping the per-include logs attsdbDebuglevel and only emitting a summary (e.g., total included/skipped counts) at INFO.
}
tsdbInfo("vgId:%d, RAW include file fid:%d ftype:%d in missing set", TD_VID(reader->tsdb->pVnode),
reader->ctx->fset->fid, ftype);
}
SDataFileRAWReader* dataReader;
SDataFileRAWReaderConfig config = {
.tsdb = reader->tsdb,
.szPage = reader->tsdb->pVnode->config.tsdbPageSize,
.file = fobj->f[0],
};
code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(reader->dataReaderArr, dataReader);
TSDB_CHECK_CODE(code, lino, _exit);
}
// stt
SSttLvl* lvl;
TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) {
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
// per-file filter: skip stt files not in missing set (use fid + ftype)
if (reader->missingFileHash != NULL) {
int64_t mfKey = tsdbMissingFileKey(reader->ctx->fset->fid, TSDB_FTYPE_STT);
if (taosHashGet(reader->missingFileHash, &mfKey, sizeof(mfKey)) == NULL) {
tsdbDebug("vgId:%d, RAW skip stt file fid:%d not in missing set", TD_VID(reader->tsdb->pVnode),
reader->ctx->fset->fid);
continue;
}
tsdbInfo("vgId:%d, RAW include stt file fid:%d in missing set", TD_VID(reader->tsdb->pVnode),
reader->ctx->fset->fid);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { | ||
| vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d, sver:%" PRId64, TD_VID(pVnode), tsdbOpts.format, | ||
| pReader->sver); | ||
| if (tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { |
| vWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), subField->typ); | ||
| break; |
| static bool tsdbFidInMissingSet(int32_t fid, const int32_t* missingFids, int32_t missingFidCount) { | ||
| int32_t lo = 0, hi = missingFidCount - 1; | ||
| while (lo <= hi) { | ||
| int32_t mid = lo + (hi - lo) / 2; | ||
| if (missingFids[mid] == fid) return true; | ||
| if (missingFids[mid] < fid) | ||
| lo = mid + 1; | ||
| else | ||
| hi = mid - 1; | ||
| } | ||
| return false; | ||
| } |
| SNAP_DATA_STREAM_STATE_BACKEND = 12, | ||
| SNAP_DATA_TQ_CHECKINFO = 13, | ||
| SNAP_DATA_RAW = 14, | ||
| SNAP_DATA_MISSING_FIDS = 16, |
| // snap rep format | ||
| typedef enum ETsdbRepFmt { | ||
| TSDB_SNAP_REP_FMT_DEFAULT = 0, | ||
| TSDB_SNAP_REP_FMT_RAW, |
| int32_t fidCap = 0; | ||
| int32_t fidCount = 0; | ||
| int32_t* fids = NULL; | ||
|
|
There was a problem hiding this comment.
Code Review
This pull request implements a mechanism to identify and synchronize missing files during snapshot replication by introducing a missing file filter. Key changes include the addition of detection, serialization, and deserialization logic for missing files, along with filtering support in both standard and RAW snapshot readers. Critical feedback was provided regarding a potential data loss scenario in incremental snapshots due to the removal of a version check when enabling RAW mode. Other identified issues include performance concerns related to disk I/O within a mutex, inefficient O(N^2) algorithms for FID extraction and sorting, and a potential memory leak during the deserialization of missing file lists.
| if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { | ||
| vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d, sver:%" PRId64, TD_VID(pVnode), tsdbOpts.format, | ||
| pReader->sver); | ||
| if (tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { |
There was a problem hiding this comment.
The removal of the pReader->sver == 0 condition when toggling RAW replication mode is problematic. If sver > 0 (incremental snapshot) and the follower supports RAW format, the leader will now skip the normal tsdbSnapReader (which handles memtable data and incremental file data) and only use the tsdbSnapRAWReader. This will result in data loss for incremental snapshots as memtable data will never be sent. RAW mode should generally only be used for full snapshots or specifically for missing file catch-up without skipping incremental data.
| (void)taosThreadMutexLock(&pTsdb->mutex); | ||
|
|
||
| STFileSet* fset; | ||
| TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { | ||
| // check farr entries (HEAD, DATA, SMA, TOMB) | ||
| for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { | ||
| if (fset->farr[ftype] != NULL) { | ||
| if (!taosCheckExistFile(fset->farr[ftype]->fname)) { | ||
| if (fileCount >= fileCap) { | ||
| int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; | ||
| STsdbMissingFile* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbMissingFile)); | ||
| if (tmp == NULL) { | ||
| code = terrno; | ||
| goto _unlock; | ||
| } | ||
| files = tmp; | ||
| fileCap = newCap; | ||
| } | ||
| files[fileCount].fid = fset->fid; | ||
| files[fileCount].ftype = ftype; | ||
| fileCount++; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // check STT files in lvlArr | ||
| SSttLvl* lvl; | ||
| bool sttMissing = false; | ||
| TARRAY2_FOREACH(fset->lvlArr, lvl) { | ||
| STFileObj* fobj; | ||
| TARRAY2_FOREACH(lvl->fobjArr, fobj) { | ||
| if (!taosCheckExistFile(fobj->fname)) { | ||
| sttMissing = true; | ||
| break; | ||
| } | ||
| } | ||
| if (sttMissing) break; | ||
| } | ||
| if (sttMissing) { | ||
| if (fileCount >= fileCap) { | ||
| int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; | ||
| STsdbMissingFile* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbMissingFile)); | ||
| if (tmp == NULL) { | ||
| code = terrno; | ||
| goto _unlock; | ||
| } | ||
| files = tmp; | ||
| fileCap = newCap; | ||
| } | ||
| files[fileCount].fid = fset->fid; | ||
| files[fileCount].ftype = TSDB_FTYPE_STT; | ||
| fileCount++; | ||
| } | ||
| } | ||
|
|
||
| _unlock: | ||
| (void)taosThreadMutexUnlock(&pTsdb->mutex); |
There was a problem hiding this comment.
The function tsdbDetectMissingFiles performs disk I/O operations (taosCheckExistFile) while holding the pTsdb->mutex lock. Disk operations can be slow and unpredictable, potentially blocking other threads that need access to the TSDB for a significant amount of time. It is highly recommended to collect the list of files to check under the lock and then perform the existence checks outside the critical section.
| // extract unique fids from hash keys (key = (fid << 32) | ftype) | ||
| void* pIter = NULL; | ||
| while ((pIter = taosHashIterate(missingFileHash, pIter)) != NULL) { | ||
| size_t keyLen = 0; | ||
| int64_t* pKey = taosHashGetKey(pIter, &keyLen); | ||
| int32_t fid = (int32_t)(*pKey >> 32); | ||
|
|
||
| // check if fid already exists | ||
| bool exists = false; | ||
| for (int32_t i = 0; i < fidCount; ++i) { | ||
| if (fids[i] == fid) { | ||
| exists = true; | ||
| break; | ||
| } | ||
| } | ||
| if (exists) continue; | ||
|
|
||
| if (fidCount >= fidCap) { | ||
| int32_t newCap = fidCap == 0 ? 16 : fidCap * 2; | ||
| int32_t* tmp = taosMemoryRealloc(fids, newCap * sizeof(int32_t)); | ||
| if (tmp == NULL) { | ||
| code = terrno; | ||
| taosHashCancelIterate(missingFileHash, pIter); | ||
| taosMemoryFree(fids); | ||
| return code; | ||
| } | ||
| fids = tmp; | ||
| fidCap = newCap; | ||
| } | ||
| fids[fidCount++] = fid; | ||
| } |
There was a problem hiding this comment.
The logic to extract unique FIDs from the hash map is inefficient. It uses a linear search for (int32_t i = 0; i < fidCount; ++i) inside the iteration loop, resulting in
| if (fidCount > 1) { | ||
| for (int32_t i = 0; i < fidCount - 1; ++i) { | ||
| for (int32_t j = i + 1; j < fidCount; ++j) { | ||
| if (fids[i] > fids[j]) { | ||
| int32_t tmp = fids[i]; | ||
| fids[i] = fids[j]; | ||
| fids[j] = tmp; | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The sorting logic implemented here is a selection sort with qsort function for better performance ($O(N \log N)$).
References
- Prefer standard library functions like qsort for sorting to ensure efficiency and maintainability. (link)
| } | ||
| } break; | ||
| case SNAP_DATA_MISSING_FIDS: { | ||
| code = tDeserializeMissingFileList(buf, bufLen, &pReader->missingFileHash); |
There was a problem hiding this comment.
Potential memory leak: tDeserializeMissingFileList allocates a new hash map and assigns it to &pReader->missingFileHash. If this subfield appears multiple times in the snapshot info message, the previously allocated hash map will be leaked. Ensure any existing hash map is cleaned up before assignment.
| // follower has a file that leader doesn't — content mismatcah | ||
| tsdbInfo("leader doesn't have file fid:%d, ftype:%d", files[i].fid, files[i].ftype); | ||
| mode = TSDB_SNAP_SYNC_FSET_LEVEL; | ||
| } else if (leaderCid != followerCid || leaderSize != followerSize) { | ||
| tsdbInfo("leader doesn't have different file fid:%d, ftype:%d, l-cid:%" PRId64 ", l-size:%" PRId64 |
| // check if fid already exists | ||
| bool exists = false; | ||
| for (int32_t i = 0; i < fidCount; ++i) { | ||
| if (fids[i] == fid) { | ||
| exists = true; | ||
| break; | ||
| } | ||
| } | ||
| if (exists) continue; | ||
|
|
||
| if (fidCount >= fidCap) { | ||
| int32_t newCap = fidCap == 0 ? 16 : fidCap * 2; | ||
| int32_t* tmp = taosMemoryRealloc(fids, newCap * sizeof(int32_t)); | ||
| if (tmp == NULL) { | ||
| code = terrno; | ||
| taosHashCancelIterate(missingFileHash, pIter); | ||
| taosMemoryFree(fids); | ||
| return code; | ||
| } | ||
| fids = tmp; | ||
| fidCap = newCap; | ||
| } | ||
| fids[fidCount++] = fid; | ||
| } | ||
|
|
||
| // sort fids for binary search | ||
| if (fidCount > 1) { | ||
| for (int32_t i = 0; i < fidCount - 1; ++i) { | ||
| for (int32_t j = i + 1; j < fidCount; ++j) { | ||
| if (fids[i] > fids[j]) { | ||
| int32_t tmp = fids[i]; | ||
| fids[i] = fids[j]; | ||
| fids[j] = tmp; | ||
| } | ||
| } | ||
| } | ||
| } |
| STFileSet* fset; | ||
| TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { | ||
| if (fset->fid != fid) continue; | ||
|
|
||
| if (ftype == TSDB_FTYPE_STT) { | ||
| // search STT files by cid | ||
| SSttLvl* lvl; | ||
| TARRAY2_FOREACH(fset->lvlArr, lvl) { | ||
| STFileObj* fobj; | ||
| TARRAY2_FOREACH(lvl->fobjArr, fobj) { | ||
| if (fobj->f->cid == followerCid) { | ||
| found = true; | ||
| leaderCid = fobj->f->cid; | ||
| leaderSize = fobj->f->size; | ||
| break; | ||
| } | ||
| } | ||
| if (found) break; | ||
| } | ||
| } else if (ftype >= 0 && ftype < TSDB_FTYPE_MAX) { | ||
| if (fset->farr[ftype] != NULL) { | ||
| found = true; | ||
| leaderCid = fset->farr[ftype]->f->cid; | ||
| leaderSize = fset->farr[ftype]->f->size; | ||
| } | ||
| } | ||
| break; // found the fid | ||
| } | ||
|
|
||
| // determine mode | ||
| uint8_t mode = TSDB_SNAP_SYNC_FILE_LEVEL; | ||
| if (!found) { | ||
| // follower has a file that leader doesn't — content mismatcah | ||
| tsdbInfo("leader doesn't have file fid:%d, ftype:%d", files[i].fid, files[i].ftype); | ||
| mode = TSDB_SNAP_SYNC_FSET_LEVEL; | ||
| } else if (leaderCid != followerCid || leaderSize != followerSize) { | ||
| tsdbInfo("leader doesn't have different file fid:%d, ftype:%d, l-cid:%" PRId64 ", l-size:%" PRId64 | ||
| ", f-cid:%" PRId64 ", f-size:%" PRId64, | ||
| files[i].fid, files[i].ftype, leaderCid, leaderSize, followerCid, followerSize); | ||
| mode = TSDB_SNAP_SYNC_FSET_LEVEL; | ||
| } | ||
|
|
||
| if (mode == TSDB_SNAP_SYNC_FSET_LEVEL) { | ||
| if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { | ||
| code = terrno; | ||
| (void)taosThreadMutexUnlock(&pTsdb->mutex); | ||
| taosHashCleanup(pFidModeHash); | ||
| return code; | ||
| } | ||
| continue; | ||
| } | ||
|
|
||
| // ensure fid has an entry (FILE_LEVEL if not already set) | ||
| if (pExistMode == NULL) { | ||
| if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { | ||
| code = terrno; | ||
| (void)taosThreadMutexUnlock(&pTsdb->mutex); | ||
| taosHashCleanup(pFidModeHash); | ||
| return code; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // second pass: check leader STT files that follower doesn't know about at all | ||
| STFileSet* fset; | ||
| TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { | ||
| int32_t fid = fset->fid; | ||
|
|
||
| uint8_t* pExistMode = taosHashGet(pFidModeHash, &fid, sizeof(fid)); | ||
| if (pExistMode != NULL && *pExistMode == TSDB_SNAP_SYNC_FSET_LEVEL) { | ||
| continue; | ||
| } | ||
|
|
||
| // check if this fid has any file info from follower | ||
| bool fidHasInfo = false; | ||
| for (int32_t i = 0; i < fileCount; ++i) { | ||
| if (files[i].fid == fid) { | ||
| fidHasInfo = true; | ||
| break; | ||
| } | ||
| } | ||
| if (!fidHasInfo) continue; | ||
|
|
||
| // for each leader STT file, check if follower reported it (either present or missing) | ||
| SSttLvl* lvl; | ||
| TARRAY2_FOREACH(fset->lvlArr, lvl) { | ||
| STFileObj* fobj; | ||
| TARRAY2_FOREACH(lvl->fobjArr, fobj) { | ||
| int64_t leaderCid = fobj->f->cid; | ||
| bool followerKnows = false; | ||
| for (int32_t i = 0; i < fileCount; ++i) { | ||
| if (files[i].fid == fid && files[i].ftype == TSDB_FTYPE_STT && files[i].cid == leaderCid) { | ||
| followerKnows = true; | ||
| break; | ||
| } | ||
| } | ||
| if (!followerKnows) { | ||
| // leader has STT file that follower doesn't know about — need FSET_LEVEL | ||
| tsdbInfo("follower doesn't have stt file fid:%d, cid:%" PRId64, fid, leaderCid); | ||
| uint8_t mode = TSDB_SNAP_SYNC_FSET_LEVEL; | ||
| if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { | ||
| code = terrno; | ||
| (void)taosThreadMutexUnlock(&pTsdb->mutex); | ||
| taosHashCleanup(pFidModeHash); | ||
| return code; | ||
| } | ||
| goto _next_fset; | ||
| } | ||
| } | ||
| } | ||
| _next_fset:; | ||
| } |
| (void)taosThreadMutexLock(&pTsdb->mutex); | ||
|
|
||
| STFileSet* fset; | ||
| TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { | ||
| // collect farr entries (HEAD, DATA, SMA, TOMB) | ||
| for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { | ||
| if (fset->farr[ftype] != NULL) { | ||
| if (fileCount >= fileCap) { | ||
| int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; | ||
| STsdbSnapFileInfo* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbSnapFileInfo)); | ||
| if (tmp == NULL) { | ||
| code = terrno; | ||
| goto _unlock; | ||
| } | ||
| files = tmp; | ||
| fileCap = newCap; | ||
| } | ||
| files[fileCount].fid = fset->fid; | ||
| files[fileCount].ftype = ftype; | ||
| files[fileCount].level = 0; | ||
| files[fileCount].cid = fset->farr[ftype]->f->cid; | ||
| files[fileCount].size = fset->farr[ftype]->f->size; | ||
| files[fileCount].isMissing = !taosCheckExistFile(fset->farr[ftype]->fname) ? 1 : 0; | ||
| fileCount++; | ||
| } | ||
| } | ||
|
|
||
| // collect STT files in lvlArr | ||
| SSttLvl* lvl; | ||
| TARRAY2_FOREACH(fset->lvlArr, lvl) { | ||
| STFileObj* fobj; | ||
| TARRAY2_FOREACH(lvl->fobjArr, fobj) { | ||
| if (fileCount >= fileCap) { | ||
| int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; | ||
| STsdbSnapFileInfo* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbSnapFileInfo)); | ||
| if (tmp == NULL) { | ||
| code = terrno; | ||
| goto _unlock; | ||
| } | ||
| files = tmp; | ||
| fileCap = newCap; | ||
| } | ||
| files[fileCount].fid = fset->fid; | ||
| files[fileCount].ftype = TSDB_FTYPE_STT; | ||
| files[fileCount].level = lvl->level; | ||
| files[fileCount].cid = fobj->f->cid; | ||
| files[fileCount].size = fobj->f->size; | ||
| files[fileCount].isMissing = !taosCheckExistFile(fobj->fname) ? 1 : 0; | ||
| fileCount++; | ||
| } | ||
| } | ||
| } |
| if (tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { | ||
| pReader->tsdbDone = true; | ||
| } else { |
| @@ -585,6 +587,7 @@ enum { | |||
| SNAP_DATA_STREAM_STATE_BACKEND = 12, | |||
| SNAP_DATA_TQ_CHECKINFO = 13, | |||
| SNAP_DATA_RAW = 14, | |||
| if ((code = tDecodeI32(&decoder, &fileCount))) goto _err; | ||
| if (fileCount < 0) { | ||
| code = TSDB_CODE_INVALID_MSG; | ||
| goto _err; | ||
| } | ||
| if (fileCount > 0) { | ||
| pHash = taosHashInit(fileCount * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); | ||
| if (pHash == NULL) { | ||
| code = terrno; | ||
| goto _err; | ||
| } | ||
| pSttHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); | ||
| if (pSttHash == NULL) { | ||
| code = terrno; | ||
| goto _err; | ||
| } | ||
| files = taosMemoryMalloc(fileCount * sizeof(STsdbSnapFileInfo)); | ||
| if (files == NULL) { | ||
| code = terrno; | ||
| goto _err; | ||
| } |
| tsdbInfo("vgId:%d, RAW include file fid:%d ftype:%d in missing set", TD_VID(reader->tsdb->pVnode), | ||
| reader->ctx->fset->fid, ftype); |
| } | ||
|
|
||
| tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); | ||
| tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); |
| code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1); | ||
| if (code) break; | ||
|
|
||
| code = TARRAY2_APPEND(fsetArr[0], fset1); | ||
| if (code) { | ||
| tsdbTFileSetClear(&fset1); | ||
| break; | ||
| } | ||
| } | ||
| (void)taosThreadMutexUnlock(&fs->tsdb->mutex); | ||
|
|
||
| if (code) { | ||
| TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); | ||
| taosMemoryFree(fsetArr[0]); | ||
| fsetArr[0] = NULL; | ||
| } |
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.