diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9413bc2bfa26..c3af75ab9fe2 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -734,6 +734,24 @@ typedef struct STsdbRepOpts { int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); +int32_t tMissingFileListDataLenCalc(int32_t fileCount); +int32_t tDeserializeMissingFileList(void *buf, int32_t bufLen, void **ppFiles, int32_t *pFileCount, SHashObj **ppHash, + SHashObj **ppSttHash); +int32_t tsdbExtractMissingFids(STsdb *pTsdb, SHashObj *missingFileHash, int32_t **ppFids, int32_t *pFidCount); +int32_t tsdbDetermineFidSyncMode(STsdb *pTsdb, const void *files, int32_t fileCount, SHashObj **ppFidModeHash); + +#define TSDB_SNAP_SYNC_FILE_LEVEL 0 +#define TSDB_SNAP_SYNC_FSET_LEVEL 1 + +static inline int64_t tsdbMissingFileKey(int32_t fid, int32_t ftype) { return ((int64_t)fid << 32) | (uint32_t)ftype; } + +// stt hash key: (fid, cid) composite key to avoid collision across different file sets +#define TSDB_STT_HASH_KEY_LEN (sizeof(int32_t) + sizeof(int64_t)) +static inline void tsdbSttHashKey(int32_t fid, int64_t cid, char key[TSDB_STT_HASH_KEY_LEN]) { + memcpy(key, &fid, sizeof(fid)); + memcpy(key + sizeof(fid), &cid, sizeof(cid)); +} + // snap read struct STsdbReadSnap { SMemTable *pMem; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 415529bf1808..3a1cc0136024 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -335,7 +335,7 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback); // STsdbSnapReader ======================================== int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges, - STsdbSnapReader** ppReader); + const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** ppReader); void tsdbSnapReaderClose(STsdbSnapReader** ppReader); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== @@ -344,7 +344,9 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); // STsdbSnapRAWReader ======================================== -int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); +int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash, + SHashObj* fidModeHash, SHashObj* missingSttHash, const int32_t* missingFids, + int32_t missingFidCount, STsdbSnapRAWReader** ppReader); void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader); int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData); // STsdbSnapRAWWriter ======================================== @@ -615,6 +617,7 @@ enum { // SNAP_DATA_TQ_CHECKINFO = 13, SNAP_DATA_RAW = 14, SNAP_DATA_BSE = 15, + SNAP_DATA_MISSING_FIDS = 16, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 0e18346c2f3f..129a1fba5907 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -48,7 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, (i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2), NULL, - &pReader->pDataReader[i]); + NULL, 0, &pReader->pDataReader[i]); TAOS_CHECK_GOTO(code, &lino, _exit); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index b222d71dce4c..69612002087d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -1099,6 +1099,52 @@ static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) { return pHash; } +int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr) { + int32_t code = 0; + STFileSet *fset, *fset1; + SHashObj *pHash = NULL; + + fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0])); + if (fsetArr[0] == NULL) return terrno; + + pHash = tsdbFSetRangeArrayToHash(pRanges); + if (pHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _out; + } + + (void)taosThreadMutexLock(&fs->tsdb->mutex); + TARRAY2_FOREACH(fs->fSetArr, fset) { + int32_t fid = fset->fid; + if (taosHashGet(pHash, &fid, sizeof(fid)) == NULL) { + tsdbDebug("skip fid:%d, not in ranges", fid); + continue; + } + + code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1); + if (code) break; + + code = TARRAY2_APPEND(fsetArr[0], fset1); + if (code) { + tsdbTFileSetClear(&fset1); + break; + } + } + (void)taosThreadMutexUnlock(&fs->tsdb->mutex); + + if (code) { + TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); + taosMemoryFree(fsetArr[0]); + fsetArr[0] = NULL; + } + +_out: + if (pHash) { + taosHashCleanup(pHash); + } + return code; +} + int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr, TFileOpArray *fopArr) { int32_t code = 0; @@ -1152,7 +1198,21 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); } +static bool tsdbFSFidInMissingSet(int32_t fid, const int32_t *missingFids, int32_t missingFidCount) { + int32_t lo = 0, hi = missingFidCount - 1; + while (lo <= hi) { + int32_t mid = lo + (hi - lo) / 2; + if (missingFids[mid] == fid) return true; + if (missingFids[mid] < fid) + lo = mid + 1; + else + hi = mid - 1; + } + return false; +} + int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, + const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr) { int32_t code = 0; STFileSet *fset; @@ -1179,6 +1239,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev int64_t sver1 = sver; int64_t ever1 = ever; + // skip fids not in missing-fid filter + if (missingFids != NULL && !tsdbFSFidInMissingSet(fset->fid, missingFids, missingFidCount)) { + tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(fs->tsdb->pVnode), fset->fid); + continue; + } + if (pHash) { int32_t fid = fset->fid; STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid)); @@ -1193,7 +1259,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev continue; } - tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); + tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1); if (code) break; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 4a21947e0904..c5acd81b5e36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -48,13 +48,14 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr); void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr); int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr); +int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr); void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr, TFileOpArray *fopArr); void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr); int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, - TFileSetRangeArray **fsrArr); + const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr); void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr); // txn int64_t tsdbFSAllocEid(STFileSystem *fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c index ea404a874fe9..dfcff677b427 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c @@ -16,7 +16,17 @@ #include "tsdb.h" #include "tsdbFS2.h" -#define TSDB_SNAP_MSG_VER 1 +#define TSDB_SNAP_MSG_VER 2 + +// file info for snapshot sync: (fid, ftype, level, cid, size, isMissing) +typedef struct { + int32_t fid; + int32_t ftype; // tsdb_ftype_t + int32_t level; // STT level (0/1/2), farr files use 0 + int64_t cid; // commit id + int64_t size; // file size + int8_t isMissing; // 1=missing, 0=present +} STsdbSnapFileInfo; // fset partition static int32_t tsdbFSetPartCmprFn(STsdbFSetPartition* x, STsdbFSetPartition* y) { @@ -518,6 +528,438 @@ int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) return code; } +int32_t tMissingFileListDataLenCalc(int32_t fileCount) { + int32_t hdrLen = sizeof(int32_t); + int32_t datLen = 0; + + int8_t msgVer = 0; + hdrLen += sizeof(msgVer); + datLen += hdrLen; + datLen += sizeof(int32_t); // fileCount + // fid + ftype + level + cid + size + isMissing = 4+4+4+8+8+1 = 29 bytes per record + datLen += fileCount * + (sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int64_t) + sizeof(int64_t) + sizeof(int8_t)); + return datLen; +} + +int32_t tSerializeMissingFileList(void* buf, int32_t bufLen, const STsdbSnapFileInfo* files, int32_t fileCount) { + int32_t code = 0; + SEncoder encoder = {0}; + int8_t msgVer = TSDB_SNAP_MSG_VER; + + tEncoderInit(&encoder, buf, bufLen); + + if ((code = tStartEncode(&encoder))) goto _err; + if ((code = tEncodeI8(&encoder, msgVer))) goto _err; + if ((code = tEncodeI32(&encoder, fileCount))) goto _err; + for (int32_t i = 0; i < fileCount; ++i) { + if ((code = tEncodeI32(&encoder, files[i].fid))) goto _err; + if ((code = tEncodeI32(&encoder, files[i].ftype))) goto _err; + if ((code = tEncodeI32(&encoder, files[i].level))) goto _err; + if ((code = tEncodeI64(&encoder, files[i].cid))) goto _err; + if ((code = tEncodeI64(&encoder, files[i].size))) goto _err; + if ((code = tEncodeI8(&encoder, files[i].isMissing))) goto _err; + } + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; + +_err: + tEncoderClear(&encoder); + return code; +} + +int32_t tDeserializeMissingFileList(void* buf, int32_t bufLen, void** ppFiles, int32_t* pFileCount, SHashObj** ppHash, + SHashObj** ppSttHash) { + int32_t code = 0; + SDecoder decoder = {0}; + int8_t msgVer = 0; + int32_t fileCount = 0; + SHashObj* pHash = NULL; + SHashObj* pSttHash = NULL; + STsdbSnapFileInfo* files = NULL; + + tDecoderInit(&decoder, buf, bufLen); + + if ((code = tStartDecode(&decoder))) goto _err; + if ((code = tDecodeI8(&decoder, &msgVer))) goto _err; + if (msgVer != TSDB_SNAP_MSG_VER) { + code = TSDB_CODE_INVALID_MSG; + goto _err; + } + if ((code = tDecodeI32(&decoder, &fileCount))) goto _err; + if (fileCount < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _err; + } + if (fileCount > 0) { + pHash = taosHashInit(fileCount * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (pHash == NULL) { + code = terrno; + goto _err; + } + pSttHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (pSttHash == NULL) { + code = terrno; + goto _err; + } + files = taosMemoryMalloc(fileCount * sizeof(STsdbSnapFileInfo)); + if (files == NULL) { + code = terrno; + goto _err; + } + for (int32_t i = 0; i < fileCount; ++i) { + if ((code = tDecodeI32(&decoder, &files[i].fid))) goto _err; + if ((code = tDecodeI32(&decoder, &files[i].ftype))) goto _err; + if ((code = tDecodeI32(&decoder, &files[i].level))) goto _err; + if ((code = tDecodeI64(&decoder, &files[i].cid))) goto _err; + if ((code = tDecodeI64(&decoder, &files[i].size))) goto _err; + if ((code = tDecodeI8(&decoder, &files[i].isMissing))) goto _err; + + tsdbDebug("FileInfo fid:%d ftype:%d level:%d cid:%" PRId64 " size:%" PRId64 " isMissing:%d", files[i].fid, + files[i].ftype, files[i].level, files[i].cid, files[i].size, files[i].isMissing); + + if (files[i].isMissing) { + char dummy = 0; + int64_t key = tsdbMissingFileKey(files[i].fid, files[i].ftype); + if (taosHashPut(pHash, &key, sizeof(key), &dummy, sizeof(dummy)) != 0) { + code = terrno; + goto _err; + } + // for STT files, also put into missingSttHash keyed by (fid, cid) + if (files[i].ftype == TSDB_FTYPE_STT) { + char sttKey[TSDB_STT_HASH_KEY_LEN]; + tsdbSttHashKey(files[i].fid, files[i].cid, sttKey); + if (taosHashPut(pSttHash, sttKey, sizeof(sttKey), &dummy, sizeof(dummy)) != 0) { + code = terrno; + goto _err; + } + } + } + } + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + + *ppHash = pHash; + *ppSttHash = pSttHash; + *ppFiles = (void*)files; + *pFileCount = fileCount; + return 0; + +_err: + if (pHash) taosHashCleanup(pHash); + if (pSttHash) taosHashCleanup(pSttHash); + taosMemoryFree(files); + tDecoderClear(&decoder); + return code; +} + +int32_t tsdbExtractMissingFids(STsdb* pTsdb, SHashObj* missingFileHash, int32_t** ppFids, int32_t* pFidCount) { + int32_t code = 0; + int32_t fidCap = 0; + int32_t fidCount = 0; + int32_t* fids = NULL; + + // extract unique fids from hash keys (key = (fid << 32) | ftype) + void* pIter = NULL; + while ((pIter = taosHashIterate(missingFileHash, pIter)) != NULL) { + size_t keyLen = 0; + int64_t* pKey = taosHashGetKey(pIter, &keyLen); + int32_t fid = (int32_t)(*pKey >> 32); + + // check if fid already exists + bool exists = false; + for (int32_t i = 0; i < fidCount; ++i) { + if (fids[i] == fid) { + exists = true; + break; + } + } + if (exists) continue; + + if (fidCount >= fidCap) { + int32_t newCap = fidCap == 0 ? 16 : fidCap * 2; + int32_t* tmp = taosMemoryRealloc(fids, newCap * sizeof(int32_t)); + if (tmp == NULL) { + code = terrno; + taosHashCancelIterate(missingFileHash, pIter); + taosMemoryFree(fids); + return code; + } + fids = tmp; + fidCap = newCap; + } + fids[fidCount++] = fid; + } + + // sort fids for binary search + if (fidCount > 1) { + for (int32_t i = 0; i < fidCount - 1; ++i) { + for (int32_t j = i + 1; j < fidCount; ++j) { + if (fids[i] > fids[j]) { + int32_t tmp = fids[i]; + fids[i] = fids[j]; + fids[j] = tmp; + } + } + } + } + + *ppFids = fids; + *pFidCount = fidCount; + return 0; +} + +int32_t tsdbDetermineFidSyncMode(STsdb* pTsdb, const void* pFileArr, int32_t fileCount, SHashObj** ppFidModeHash) { + int32_t code = 0; + SHashObj* pFidModeHash = NULL; + const STsdbSnapFileInfo* files = (const STsdbSnapFileInfo*)pFileArr; + + if (fileCount <= 0 || files == NULL) { + *ppFidModeHash = NULL; + return 0; + } + + pFidModeHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pFidModeHash == NULL) { + return terrno; + } + + (void)taosThreadMutexLock(&pTsdb->mutex); + + for (int32_t i = 0; i < fileCount; ++i) { + int32_t fid = files[i].fid; + + // check if already marked as FSET_LEVEL — skip further checks for this fid + uint8_t* pExistMode = taosHashGet(pFidModeHash, &fid, sizeof(fid)); + if (pExistMode != NULL && *pExistMode == TSDB_SNAP_SYNC_FSET_LEVEL) { + continue; + } + + // only process present files (isMissing=0) for content comparison + if (files[i].isMissing) continue; + + int32_t ftype = files[i].ftype; + int64_t followerCid = files[i].cid; + int64_t followerSize = files[i].size; + + // find leader's corresponding file + bool found = false; + int64_t leaderCid = 0; + int64_t leaderSize = 0; + + STFileSet* fset; + TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { + if (fset->fid != fid) continue; + + if (ftype == TSDB_FTYPE_STT) { + // search STT files by cid + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + if (fobj->f->cid == followerCid) { + found = true; + leaderCid = fobj->f->cid; + leaderSize = fobj->f->size; + break; + } + } + if (found) break; + } + } else if (ftype >= 0 && ftype < TSDB_FTYPE_MAX) { + if (fset->farr[ftype] != NULL) { + found = true; + leaderCid = fset->farr[ftype]->f->cid; + leaderSize = fset->farr[ftype]->f->size; + } + } + break; // found the fid + } + + // determine mode + uint8_t mode = TSDB_SNAP_SYNC_FILE_LEVEL; + if (!found) { + // follower has a file that leader doesn't — content mismatcah + tsdbInfo("leader doesn't have file fid:%d, ftype:%d", files[i].fid, files[i].ftype); + mode = TSDB_SNAP_SYNC_FSET_LEVEL; + } else if (leaderCid != followerCid || leaderSize != followerSize) { + tsdbInfo("leader doesn't have different file fid:%d, ftype:%d, l-cid:%" PRId64 ", l-size:%" PRId64 + ", f-cid:%" PRId64 ", f-size:%" PRId64, + files[i].fid, files[i].ftype, leaderCid, leaderSize, followerCid, followerSize); + mode = TSDB_SNAP_SYNC_FSET_LEVEL; + } + + if (mode == TSDB_SNAP_SYNC_FSET_LEVEL) { + if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { + code = terrno; + (void)taosThreadMutexUnlock(&pTsdb->mutex); + taosHashCleanup(pFidModeHash); + return code; + } + continue; + } + + // ensure fid has an entry (FILE_LEVEL if not already set) + if (pExistMode == NULL) { + if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { + code = terrno; + (void)taosThreadMutexUnlock(&pTsdb->mutex); + taosHashCleanup(pFidModeHash); + return code; + } + } + } + + // second pass: check leader STT files that follower doesn't know about at all + STFileSet* fset; + TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { + int32_t fid = fset->fid; + + uint8_t* pExistMode = taosHashGet(pFidModeHash, &fid, sizeof(fid)); + if (pExistMode != NULL && *pExistMode == TSDB_SNAP_SYNC_FSET_LEVEL) { + continue; + } + + // check if this fid has any file info from follower + bool fidHasInfo = false; + for (int32_t i = 0; i < fileCount; ++i) { + if (files[i].fid == fid) { + fidHasInfo = true; + break; + } + } + if (!fidHasInfo) continue; + + // for each leader STT file, check if follower reported it (either present or missing) + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + int64_t leaderCid = fobj->f->cid; + bool followerKnows = false; + for (int32_t i = 0; i < fileCount; ++i) { + if (files[i].fid == fid && files[i].ftype == TSDB_FTYPE_STT && files[i].cid == leaderCid) { + followerKnows = true; + break; + } + } + if (!followerKnows) { + // leader has STT file that follower doesn't know about — need FSET_LEVEL + tsdbInfo("follower doesn't have stt file fid:%d, cid:%" PRId64, fid, leaderCid); + uint8_t mode = TSDB_SNAP_SYNC_FSET_LEVEL; + if (taosHashPut(pFidModeHash, &fid, sizeof(fid), &mode, sizeof(mode)) != 0) { + code = terrno; + (void)taosThreadMutexUnlock(&pTsdb->mutex); + taosHashCleanup(pFidModeHash); + return code; + } + goto _next_fset; + } + } + } + _next_fset:; + } + + (void)taosThreadMutexUnlock(&pTsdb->mutex); + + *ppFidModeHash = pFidModeHash; + return 0; +} + +static int32_t tsdbCollectAllFileInfo(SVnode* pVnode, STsdbSnapFileInfo** ppFiles, int32_t* pFileCount) { + int32_t code = 0; + STsdbSnapFileInfo* files = NULL; + int32_t fileCount = 0; + int32_t fileCap = 0; + STsdb* pTsdb = pVnode->pTsdb; + + *ppFiles = NULL; + *pFileCount = 0; + + (void)taosThreadMutexLock(&pTsdb->mutex); + + STFileSet* fset; + TARRAY2_FOREACH(pTsdb->pFS->fSetArr, fset) { + // collect farr entries (HEAD, DATA, SMA, TOMB) + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset->farr[ftype] != NULL) { + if (fileCount >= fileCap) { + int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; + STsdbSnapFileInfo* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbSnapFileInfo)); + if (tmp == NULL) { + code = terrno; + goto _unlock; + } + files = tmp; + fileCap = newCap; + } + files[fileCount].fid = fset->fid; + files[fileCount].ftype = ftype; + files[fileCount].level = 0; + files[fileCount].cid = fset->farr[ftype]->f->cid; + files[fileCount].size = fset->farr[ftype]->f->size; + files[fileCount].isMissing = !taosCheckExistFile(fset->farr[ftype]->fname) ? 1 : 0; + fileCount++; + } + } + + // collect STT files in lvlArr + SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + if (fileCount >= fileCap) { + int32_t newCap = fileCap == 0 ? 16 : fileCap * 2; + STsdbSnapFileInfo* tmp = taosMemoryRealloc(files, newCap * sizeof(STsdbSnapFileInfo)); + if (tmp == NULL) { + code = terrno; + goto _unlock; + } + files = tmp; + fileCap = newCap; + } + files[fileCount].fid = fset->fid; + files[fileCount].ftype = TSDB_FTYPE_STT; + files[fileCount].level = lvl->level; + files[fileCount].cid = fobj->f->cid; + files[fileCount].size = fobj->f->size; + files[fileCount].isMissing = !taosCheckExistFile(fobj->fname) ? 1 : 0; + fileCount++; + } + } + } + +_unlock: + (void)taosThreadMutexUnlock(&pTsdb->mutex); + + if (code != 0) { + taosMemoryFree(files); + return code; + } + + *ppFiles = files; + *pFileCount = fileCount; + return 0; +} + +static int32_t tsdbMissingFilesEstSize(int32_t fileCount) { + return sizeof(SSyncTLV) + tMissingFileListDataLenCalc(fileCount); +} + +static int32_t tsdbMissingFilesSerialize(const STsdbSnapFileInfo* files, int32_t fileCount, void* buf, int32_t bufLen) { + SSyncTLV* pSubHead = buf; + int32_t tlen = tSerializeMissingFileList(pSubHead->val, bufLen - sizeof(*pSubHead), files, fileCount); + if (tlen < 0) return tlen; + pSubHead->typ = SNAP_DATA_MISSING_FIDS; + pSubHead->len = tlen; + return sizeof(*pSubHead) + tlen; +} + static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) { int32_t dataLen = 0; dataLen += sizeof(SSyncTLV); @@ -567,9 +1009,8 @@ static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, ST } } break; default: - code = TSDB_CODE_INVALID_MSG; - tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ); - return code; + tsdbWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), pField->typ); + break; } } @@ -587,7 +1028,9 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { } // deal with snap info for reply - STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; + STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; + STsdbSnapFileInfo* missingFiles = NULL; + int32_t missingFileCount = 0; if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { STsdbRepOpts leaderOpts = {0}; if ((code = tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts)) < 0) { @@ -595,6 +1038,15 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { goto _out; } opts.format = TMIN(opts.format, leaderOpts.format); + + int32_t detectCode = tsdbCollectAllFileInfo(pVnode, &missingFiles, &missingFileCount); + if (detectCode != 0) { + tsdbWarn("vgId:%d, failed to collect file info since %s, continuing without", TD_VID(pVnode), + tstrerror(detectCode)); + missingFileCount = 0; + } else if (missingFileCount > 0) { + tsdbInfo("vgId:%d, collected %d file info entries for snapshot", TD_VID(pVnode), missingFileCount); + } } // info data realloc @@ -602,6 +1054,9 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { int32_t bufLen = headLen; bufLen += tsdbPartitionInfoEstSize(pInfo); bufLen += tsdbRepOptsEstSize(&opts); + if (missingFileCount > 0) { + bufLen += tsdbMissingFilesEstSize(missingFileCount); + } if ((code = syncSnapInfoDataRealloc(pSnap, bufLen)) != 0) { tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen); goto _out; @@ -626,6 +1081,15 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { } offset += tlen; + if (missingFileCount > 0) { + if ((tlen = tsdbMissingFilesSerialize(missingFiles, missingFileCount, buf + offset, bufLen - offset)) < 0) { + code = tlen; + tsdbError("vgId:%d, failed to serialize missing files since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + offset += tlen; + } + // set header of info data SSyncTLV* pHead = pSnap->data; pHead->typ = pSnap->type; @@ -635,6 +1099,7 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { pHead->len); _out: + taosMemoryFree(missingFiles); tsdbPartitionInfoClear(pInfo); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index b356beee453b..654203e1294e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -52,6 +52,10 @@ struct STsdbSnapReader { TTsdbIterArray tombIterArr[1]; SIterMerger* tombIterMerger; + // missing fid filter (NULL = send all, non-NULL = only send listed fids) + int32_t* missingFids; + int32_t missingFidCount; + // data SBlockData blockData[1]; STombBlock tombBlock[1]; @@ -198,12 +202,35 @@ static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) { return; } +static bool tsdbSnapFidInMissingSet(int32_t fid, const int32_t* missingFids, int32_t missingFidCount) { + int32_t lo = 0, hi = missingFidCount - 1; + while (lo <= hi) { + int32_t mid = lo + (hi - lo) / 2; + if (missingFids[mid] == fid) return true; + if (missingFids[mid] < fid) + lo = mid + 1; + else + hi = mid - 1; + } + return false; +} + static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) { int32_t code = 0; int32_t lino = 0; - if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) { + while (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) { reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++); + + // skip fids not in missing-fid filter + if (reader->missingFids != NULL && + !tsdbSnapFidInMissingSet(reader->ctx->fsr->fset->fid, reader->missingFids, reader->missingFidCount)) { + tsdbDebug("vgId:%d, snap reader skip fid:%d not in missing-fid set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fsr->fset->fid); + reader->ctx->fsr = NULL; + continue; + } + reader->ctx->isDataDone = false; reader->ctx->isTombDone = false; @@ -212,6 +239,8 @@ static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) { code = tsdbSnapReadFileSetOpenIter(reader); TSDB_CHECK_CODE(code, lino, _exit); + + return code; } _exit: @@ -441,7 +470,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) { } int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges, - STsdbSnapReader** reader) { + const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** reader) { int32_t code = 0; int32_t lino = 0; @@ -453,7 +482,20 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, reader[0]->ever = ever; reader[0]->type = type; - code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr); + // copy missing fid filter + if (missingFids != NULL && missingFidCount > 0) { + reader[0]->missingFids = taosMemoryMalloc(missingFidCount * sizeof(int32_t)); + if (reader[0]->missingFids == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + memcpy(reader[0]->missingFids, missingFids, missingFidCount * sizeof(int32_t)); + reader[0]->missingFidCount = missingFidCount; + tsdbInfo("vgId:%d, snap reader opened with %d missing-fid filter", TD_VID(tsdb->pVnode), missingFidCount); + } + + code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, reader[0]->missingFids, + reader[0]->missingFidCount, &reader[0]->fsrArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -461,6 +503,7 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code), sver, ever, type); tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr); + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; } else { @@ -496,6 +539,7 @@ void tsdbSnapReaderClose(STsdbSnapReader** reader) { tBufferDestroy(reader[0]->buffers + i); } + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c index cbc11135214c..2ee46331bc56 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c @@ -45,9 +45,31 @@ typedef struct STsdbSnapRAWReader { // iter SDataFileRAWReaderIter dataIter[1]; + + // missing file filter + SHashObj* missingFileHash; // key=(fid,ftype) — per-file filtering (not owned, do not free) + SHashObj* fidModeHash; // key=fid, val=uint8_t mode (not owned, do not free) + SHashObj* missingSttHash; // key=(fid,cid) — per-STT filtering (not owned, do not free) + int32_t* missingFids; // FID set for FID-level pre-filtering (owned, copy) + int32_t missingFidCount; } STsdbSnapRAWReader; -int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** reader) { +static bool tsdbFidInMissingSet(int32_t fid, const int32_t* missingFids, int32_t missingFidCount) { + int32_t lo = 0, hi = missingFidCount - 1; + while (lo <= hi) { + int32_t mid = lo + (hi - lo) / 2; + if (missingFids[mid] == fid) return true; + if (missingFids[mid] < fid) + lo = mid + 1; + else + hi = mid - 1; + } + return false; +} + +int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash, + SHashObj* fidModeHash, SHashObj* missingSttHash, const int32_t* missingFids, + int32_t missingFidCount, STsdbSnapRAWReader** reader) { int32_t code = 0; int32_t lino = 0; @@ -58,19 +80,42 @@ int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapR reader[0]->ever = ever; reader[0]->type = type; - code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); + // set missing file filter (hash is borrowed, not owned) + reader[0]->missingFileHash = missingFileHash; + reader[0]->fidModeHash = fidModeHash; + reader[0]->missingSttHash = missingSttHash; + + // copy missing fid filter + if (missingFids != NULL && missingFidCount > 0) { + reader[0]->missingFids = taosMemoryMalloc(missingFidCount * sizeof(int32_t)); + if (reader[0]->missingFids == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + memcpy(reader[0]->missingFids, missingFids, missingFidCount * sizeof(int32_t)); + reader[0]->missingFidCount = missingFidCount; + tsdbInfo("vgId:%d, RAW reader opened with %d missing-fid filter", TD_VID(tsdb->pVnode), missingFidCount); + } + + TFileSetRangeArray* pTypedRanges = (TFileSetRangeArray*)pRanges; + if (pTypedRanges != NULL && TARRAY2_SIZE(pTypedRanges) > 0) { + code = tsdbFSCreateRefSnapshotWithRanges(tsdb->pFS, pTypedRanges, &reader[0]->fsetArr); + } else { + code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); + } TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, - lino, tstrerror(code), ever, type); + tsdbError("vgId:%d %s failed at line %d since %s, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, lino, + tstrerror(code), ever, type); tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; } else { - tsdbInfo("vgId:%d, tsdb snapshot raw reader opened. sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), ever, - type); + tsdbInfo("vgId:%d, tsdb snapshot raw reader opened. ever:%" PRId64 " type:%d ranged:%d", TD_VID(tsdb->pVnode), ever, + type, (pTypedRanges != NULL && TARRAY2_SIZE(pTypedRanges) > 0)); } return code; } @@ -85,6 +130,8 @@ void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** reader) { TARRAY2_DESTROY(reader[0]->dataReaderArr, tsdbDataFileRAWReaderClose); tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + // missingFileHash is borrowed, not freed here + taosMemoryFree(reader[0]->missingFids); taosMemoryFree(reader[0]); reader[0] = NULL; return; @@ -94,12 +141,35 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { int32_t code = 0; int32_t lino = 0; + // determine sync mode for this fid + int32_t curFid = reader->ctx->fset->fid; + bool fsetLevel = false; + if (reader->fidModeHash != NULL) { + uint8_t* pMode = taosHashGet(reader->fidModeHash, &curFid, sizeof(curFid)); + if (pMode != NULL && *pMode == TSDB_SNAP_SYNC_FSET_LEVEL) { + fsetLevel = true; + tsdbInfo("vgId:%d, RAW fid:%d using FSET_LEVEL sync (send all files)", TD_VID(reader->tsdb->pVnode), curFid); + } else { + tsdbInfo("vgId:%d, RAW fid:%d using FILE_LEVEL sync (send only missing files)", TD_VID(reader->tsdb->pVnode), + curFid); + } + } + // data for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { if (reader->ctx->fset->farr[ftype] == NULL) { continue; } STFileObj* fobj = reader->ctx->fset->farr[ftype]; + // per-file filter: skip files not in missing set (only when FILE_LEVEL mode) + if (!fsetLevel && reader->missingFileHash != NULL) { + int64_t mfKey = tsdbMissingFileKey(reader->ctx->fset->fid, ftype); + if (taosHashGet(reader->missingFileHash, &mfKey, sizeof(mfKey)) == NULL) { + tsdbDebug("vgId:%d, RAW skip file fid:%d ftype:%d not in missing set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid, ftype); + continue; + } + } SDataFileRAWReader* dataReader; SDataFileRAWReaderConfig config = { .tsdb = reader->tsdb, @@ -110,6 +180,8 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + tsdbInfo("vgId:%d, RAW include file fid:%d ftype:%d in missing set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid, ftype); TSDB_CHECK_CODE(code, lino, _exit); } @@ -118,6 +190,17 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) { STFileObj* fobj; TARRAY2_FOREACH(lvl->fobjArr, fobj) { + // per-file filter: skip stt files not in missing set (only when FILE_LEVEL mode) + if (!fsetLevel && reader->missingSttHash != NULL) { + int64_t sttCid = fobj->f->cid; + char sttKey[TSDB_STT_HASH_KEY_LEN]; + tsdbSttHashKey(reader->ctx->fset->fid, sttCid, sttKey); + if (taosHashGet(reader->missingSttHash, sttKey, sizeof(sttKey)) == NULL) { + tsdbDebug("vgId:%d, RAW skip stt file fid:%d cid:%" PRId64 " not in missing set", + TD_VID(reader->tsdb->pVnode), reader->ctx->fset->fid, sttCid); + continue; + } + } SDataFileRAWReader* dataReader; SDataFileRAWReaderConfig config = { .tsdb = reader->tsdb, @@ -128,6 +211,8 @@ static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + tsdbInfo("vgId:%d, RAW include stt file fid:%d in missing set", TD_VID(reader->tsdb->pVnode), + reader->ctx->fset->fid); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -233,8 +318,18 @@ static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { int32_t code = 0; int32_t lino = 0; - if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { + while (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); + + // skip fids not in missing-fid filter + if (reader->missingFids != NULL && + !tsdbFidInMissingSet(reader->ctx->fset->fid, reader->missingFids, reader->missingFidCount)) { + tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(reader->tsdb->pVnode), reader->ctx->fset->fid); + reader->ctx->fset = NULL; + continue; + } + tsdbInfo("vgId:%d, RAW include fid:%d in missing-fid set", TD_VID(reader->tsdb->pVnode), reader->ctx->fset->fid); + reader->ctx->isDataDone = false; code = tsdbSnapRAWReadFileSetOpenReader(reader); @@ -242,6 +337,8 @@ static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { code = tsdbSnapRAWReadFileSetOpenIter(reader); TSDB_CHECK_CODE(code, lino, _exit); + + return code; } _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 557622fc3c73..68ab12984991 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -55,6 +55,12 @@ struct SVSnapReader { // tsdb raw int8_t tsdbRAWDone; STsdbSnapRAWReader *pTsdbRAWReader; + // missing file filter + SHashObj *missingFileHash; // key=(fid,ftype), val=dummy — for RAW mode per-file filtering + SHashObj *fidModeHash; // key=fid, val=uint8_t mode (FILE_LEVEL or FSET_LEVEL) + SHashObj *missingSttHash; // key=cid, val=dummy — for RAW mode per-STT filtering + int32_t *missingFids; // FID set extracted from file names — for Normal mode FID filtering + int32_t missingFidCount; // tq int8_t tqHandleDone; @@ -88,6 +94,11 @@ static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, } } +static int32_t vnodeExtractMissingFids(SVSnapReader *pReader) { + return tsdbExtractMissingFids(pReader->pVnode->pTsdb, pReader->missingFileHash, &pReader->missingFids, + &pReader->missingFidCount); +} + static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) { int32_t code = 0; SVnode *pVnode = pReader->pVnode; @@ -134,16 +145,47 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP goto _out; } } break; + case SNAP_DATA_MISSING_FIDS: { + void *missingFiles = NULL; + int32_t missingFileCount = 0; + code = tDeserializeMissingFileList(buf, bufLen, &missingFiles, &missingFileCount, &pReader->missingFileHash, + &pReader->missingSttHash); + if (code) { + vError("vgId:%d, failed to deserialize missing file list since %s", TD_VID(pVnode), tstrerror(code)); + goto _out; + } + vInfo("vgId:%d, received %d file infos from follower, missing:%d", TD_VID(pVnode), missingFileCount, + (pReader->missingFileHash ? (int32_t)taosHashGetSize(pReader->missingFileHash) : 0)); + // determine sync mode per fid + if (missingFiles && missingFileCount > 0) { + code = tsdbDetermineFidSyncMode(pVnode->pTsdb, missingFiles, missingFileCount, &pReader->fidModeHash); + if (code) { + taosMemoryFree(missingFiles); + vError("vgId:%d, failed to determine fid sync mode since %s", TD_VID(pVnode), tstrerror(code)); + goto _out; + } + } + taosMemoryFree(missingFiles); + } break; default: - vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); - code = TSDB_CODE_INVALID_DATA_FMT; - goto _out; + vWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), subField->typ); + break; + } + } + + // extract FID set from missing file hash for normal mode filtering + if (pReader->missingFileHash && taosHashGetSize(pReader->missingFileHash) > 0) { + code = vnodeExtractMissingFids(pReader); + if (code) { + vError("vgId:%d, failed to extract missing fids from file hash since %s", TD_VID(pVnode), tstrerror(code)); + goto _out; } } // toggle snap replication mode - vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); - if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { + vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d, sver:%" PRId64, TD_VID(pVnode), tsdbOpts.format, + pReader->sver); + if (tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { pReader->tsdbDone = true; } else { pReader->tsdbRAWDone = true; @@ -178,7 +220,9 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader // open tsdb snapshot raw reader if (!pReader->tsdbRAWDone) { - code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); + code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, pReader->pRanges, pReader->missingFileHash, + pReader->fidModeHash, pReader->missingSttHash, pReader->missingFids, + pReader->missingFidCount, &pReader->pTsdbRAWReader); if (code) goto _exit; } @@ -247,6 +291,16 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { if (pReader->pBseReader) { bseSnapReaderClose(&pReader->pBseReader); } + if (pReader->missingFileHash) { + taosHashCleanup(pReader->missingFileHash); + } + if (pReader->fidModeHash) { + taosHashCleanup(pReader->fidModeHash); + } + if (pReader->missingSttHash) { + taosHashCleanup(pReader->missingSttHash); + } + taosMemoryFree(pReader->missingFids); taosMemoryFree(pReader); } @@ -332,7 +386,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) // open if not if (pReader->pTsdbReader == NULL) { code = tsdbSnapReaderOpen(pReader->pVnode->pTsdb, pReader->sver, pReader->ever, SNAP_DATA_TSDB, pReader->pRanges, - &pReader->pTsdbReader); + pReader->missingFids, pReader->missingFidCount, &pReader->pTsdbReader); TSDB_CHECK_CODE(code, lino, _exit); } @@ -349,7 +403,9 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (!pReader->tsdbRAWDone) { // open if not if (pReader->pTsdbRAWReader == NULL) { - code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); + code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, pReader->pRanges, + pReader->missingFileHash, pReader->fidModeHash, pReader->missingSttHash, + pReader->missingFids, pReader->missingFidCount, &pReader->pTsdbRAWReader); TSDB_CHECK_CODE(code, lino, _exit); } @@ -535,10 +591,12 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP code = tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts); TSDB_CHECK_CODE(code, lino, _exit); } break; + case SNAP_DATA_MISSING_FIDS: { + vInfo("vgId:%d, snap writer received missing fids subfield, skipping (handled by reader)", TD_VID(pVnode)); + } break; default: - vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); - TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_DATA_FMT, lino, _exit); - goto _exit; + vWarn("vgId:%d, unknown subfield type in snap info, skipping. typ:%d", TD_VID(pVnode), subField->typ); + break; } }