Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion source/dnode/vnode/src/inc/tsdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSe
typedef enum ETsdbRepFmt {
TSDB_SNAP_REP_FMT_DEFAULT = 0,
TSDB_SNAP_REP_FMT_RAW,
TSDB_SNAP_REP_FMT_HYBRID,
} ETsdbRepFmt;

typedef struct STsdbRepOpts {
Expand All @@ -734,6 +733,12 @@ typedef struct STsdbRepOpts {
int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);

int32_t tMissingFileListDataLenCalc(int32_t fileCount);
int32_t tDeserializeMissingFileList(void *buf, int32_t bufLen, SHashObj **ppHash);
int32_t tsdbExtractMissingFids(STsdb *pTsdb, SHashObj *missingFileHash, int32_t **ppFids, int32_t *pFidCount);

static inline int64_t tsdbMissingFileKey(int32_t fid, int32_t ftype) { return ((int64_t)fid << 32) | (uint32_t)ftype; }

// snap read
struct STsdbReadSnap {
SMemTable *pMem;
Expand Down
6 changes: 4 additions & 2 deletions source/dnode/vnode/src/inc/vnodeInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
STsdbSnapReader** ppReader);
const int32_t* missingFids, int32_t missingFidCount, STsdbSnapReader** ppReader);
void tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ========================================
Expand All @@ -344,7 +344,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapRAWReader ========================================
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, void* pRanges, SHashObj* missingFileHash,
const int32_t* missingFids, int32_t missingFidCount, STsdbSnapRAWReader** ppReader);
void tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
// STsdbSnapRAWWriter ========================================
Expand Down Expand Up @@ -615,6 +616,7 @@ enum {
// SNAP_DATA_TQ_CHECKINFO = 13,
SNAP_DATA_RAW = 14,
SNAP_DATA_BSE = 15,
SNAP_DATA_MISSING_FIDS = 16,
};

struct SSnapDataHdr {
Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/sma/smaSnapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, (i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2), NULL,
&pReader->pDataReader[i]);
NULL, 0, &pReader->pDataReader[i]);
TAOS_CHECK_GOTO(code, &lino, _exit);
}
}
Expand Down
68 changes: 67 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbFS2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,52 @@ static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
return pHash;
}

int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr) {
int32_t code = 0;
STFileSet *fset, *fset1;
SHashObj *pHash = NULL;

fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
if (fsetArr[0] == NULL) return terrno;

pHash = tsdbFSetRangeArrayToHash(pRanges);
if (pHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}

(void)taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) {
int32_t fid = fset->fid;
if (taosHashGet(pHash, &fid, sizeof(fid)) == NULL) {
tsdbDebug("skip fid:%d, not in ranges", fid);
continue;
}

code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
if (code) break;

code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) {
tsdbTFileSetClear(&fset1);
break;
}
}
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);

if (code) {
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
taosMemoryFree(fsetArr[0]);
fsetArr[0] = NULL;
}

_out:
if (pHash) {
taosHashCleanup(pHash);
}
return code;
}

int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
TFileOpArray *fopArr) {
int32_t code = 0;
Expand Down Expand Up @@ -1152,7 +1198,21 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa

void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }

static bool tsdbFSFidInMissingSet(int32_t fid, const int32_t *missingFids, int32_t missingFidCount) {
int32_t lo = 0, hi = missingFidCount - 1;
while (lo <= hi) {
int32_t mid = lo + (hi - lo) / 2;
if (missingFids[mid] == fid) return true;
if (missingFids[mid] < fid)
lo = mid + 1;
else
hi = mid - 1;
}
return false;
}

int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
const int32_t *missingFids, int32_t missingFidCount,
TFileSetRangeArray **fsrArr) {
int32_t code = 0;
STFileSet *fset;
Expand All @@ -1179,6 +1239,12 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
int64_t sver1 = sver;
int64_t ever1 = ever;

// skip fids not in missing-fid filter
if (missingFids != NULL && !tsdbFSFidInMissingSet(fset->fid, missingFids, missingFidCount)) {
tsdbDebug("vgId:%d, skip fid:%d not in missing-fid set", TD_VID(fs->tsdb->pVnode), fset->fid);
continue;
}

if (pHash) {
int32_t fid = fset->fid;
STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
Expand All @@ -1193,7 +1259,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
continue;
}

tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);

code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
if (code) break;
Expand Down
3 changes: 2 additions & 1 deletion source/dnode/vnode/src/tsdb/tsdbFS2.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefSnapshotWithRanges(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr);
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr);

int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr,
TFileOpArray *fopArr);
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr);
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
TFileSetRangeArray **fsrArr);
const int32_t *missingFids, int32_t missingFidCount, TFileSetRangeArray **fsrArr);
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr);
// txn
int64_t tsdbFSAllocEid(STFileSystem *fs);
Expand Down
Loading
Loading