Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions source/dnode/vnode/src/inc/tsdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,24 @@ typedef struct STsdbRepOpts {
int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);

int32_t tMissingFileListDataLenCalc(int32_t fileCount);
int32_t tDeserializeMissingFileList(void *buf, int32_t bufLen, void **ppFiles, int32_t *pFileCount, SHashObj **ppHash,
SHashObj **ppSttHash);
int32_t tsdbExtractMissingFids(STsdb *pTsdb, SHashObj *missingFileHash, int32_t **ppFids, int32_t *pFidCount);
int32_t tsdbDetermineFidSyncMode(STsdb *pTsdb, const void *files, int32_t fileCount, SHashObj **ppFidModeHash);

#define TSDB_SNAP_SYNC_FILE_LEVEL 0
#define TSDB_SNAP_SYNC_FSET_LEVEL 1

static inline int64_t tsdbMissingFileKey(int32_t fid, int32_t ftype) { return ((int64_t)fid << 32) | (uint32_t)ftype; }

// stt hash key: (fid, cid) composite key to avoid collision across different file sets
#define TSDB_STT_HASH_KEY_LEN (sizeof(int32_t) + sizeof(int64_t))
static inline void tsdbSttHashKey(int32_t fid, int64_t cid, char key[TSDB_STT_HASH_KEY_LEN]) {
memcpy(key, &fid, sizeof(fid));
memcpy(key + sizeof(fid), &cid, sizeof(cid));
}

// snap read
struct STsdbReadSnap {
SMemTable *pMem;
Expand Down
7 changes: 5 additions & 2 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
STsdbSnapReader** ppReader);
const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** ppReader);
void tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ========================================
Expand All @@ -344,7 +344,9 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapRAWReader ========================================
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash,
SHashObj* fidModeHash, SHashObj* missingSttHash, const int32_t* missingFids,
int32_t missingFidCount, STsdbSnapRAWReader** ppReader);
void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
// STsdbSnapRAWWriter ========================================
Expand Down Expand Up @@ -615,6 +617,7 @@ enum {
// SNAP_DATA_TQ_CHECKINFO = 13,
SNAP_DATA_RAW = 14,
SNAP_DATA_BSE = 15,
SNAP_DATA_MISSING_FIDS = 16,
};

struct SSnapDataHdr {
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/sma/smaSnapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, (i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2), NULL,
&pReader->pDataReader[i]);
NULL, 0, &pReader->pDataReader[i]);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
}
Expand Down
68 changes: 67 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbFS2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,52 @@ static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
return pHash;
}

int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr) {
int32_t code = 0;
STFileSet *fset, *fset1;
SHashObj *pHash = NULL;

fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
if (fsetArr[0] == NULL) return terrno;

pHash = tsdbFSetRangeArrayToHash(pRanges);
if (pHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}

(void)taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) {
int32_t fid = fset->fid;
if (taosHashGet(pHash, &fid, sizeof(fid)) == NULL) {
tsdbDebug("skip fid:%d, not in ranges", fid);
continue;
}

code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
if (code) break;

code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) {
tsdbTFileSetClear(&fset1);
break;
}
}
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);

if (code) {
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
taosMemoryFree(fsetArr[0]);
fsetArr[0] = NULL;
}

_out:
if (pHash) {
taosHashCleanup(pHash);
}
return code;
}

int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
TFileOpArray *fopArr) {
int32_t code = 0;
Expand Down Expand Up @@ -1152,7 +1198,21 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa

void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }

static bool tsdbFSFidInMissingSet(int32_t fid, const int32_t *missingFids, int32_t missingFidCount) {
int32_t lo = 0, hi = missingFidCount - 1;
while (lo <= hi) {
int32_t mid = lo + (hi - lo) / 2;
if (missingFids[mid] == fid) return true;
if (missingFids[mid] < fid)
lo = mid + 1;
else
hi = mid - 1;
}
return false;
}

int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
const int32_t *missingFids, int32_t missingFidCount,
TFileSetRangeArray **fsrArr) {
int32_t code = 0;
STFileSet *fset;
Expand All @@ -1179,6 +1239,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
int64_t sver1 = sver;
int64_t ever1 = ever;

// skip fids not in missing-fid filter
if (missingFids != NULL && !tsdbFSFidInMissingSet(fset->fid, missingFids, missingFidCount)) {
tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(fs->tsdb->pVnode), fset->fid);
continue;
}

if (pHash) {
int32_t fid = fset->fid;
STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
Expand All @@ -1193,7 +1259,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
continue;
}

tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);

code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
if (code) break;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbFS2.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr);
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr);

int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr,
TFileOpArray *fopArr);
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
TFileSetRangeArray **fsrArr);
const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr);
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr);
// txn
int64_t tsdbFSAllocEid(STFileSystem *fs);
Expand Down
Loading
Loading