diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt b/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt index 54420f02..7dcb50b3 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/api/Netchdf.kt @@ -23,6 +23,7 @@ interface Netchdf : AutoCloseable { fun readChunksConcurrent(v2: Variable, lamda : (ArraySection<*>) -> Unit, done : () -> Unit, + wantSection: SectionPartial? = null, nthreads: Int? = null) { TODO() } diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt b/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt index 2e23a4c6..fa46ba4e 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/api/Section.kt @@ -1,6 +1,7 @@ package com.sunya.cdm.api /** A filled section of multidimensional array indices, plus the variable shape. */ +// TODO IndexSpace mo betta? data class Section(val ranges : List, val varShape : LongArray) { val rank = ranges.size val shape : LongArray // or IntArray ?? diff --git a/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt b/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt index 4c4291c5..ff105a91 100644 --- a/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt +++ b/core/src/commonMain/kotlin/com/sunya/cdm/layout/Tiling.kt @@ -1,5 +1,6 @@ package com.sunya.cdm.layout +import com.sunya.cdm.api.computeSize import kotlin.math.max import kotlin.math.min @@ -20,6 +21,7 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) { val tileShape : LongArray // overall shape of the dataset's tile space val indexShape : LongArray // overall shape of the dataset's index space - may be larger than actual variable shape val tileStrider : LongArray // for computing tile index + val nelems: Int init { // convenient to allow tileSize to have (an) extra dimension at the end @@ -30,10 +32,13 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) { for (i in 0 until rank) { this.indexShape[i] = max(varShape[i], chunk[i]) } + this.tileShape = LongArray(rank) for (i in 0 until rank) { tileShape[i] = (this.indexShape[i] + chunk[i] - 1) / chunk[i] } + nelems = tileShape.computeSize().toInt() + tileStrider = LongArray(rank) var accumStride = 1L for (k in rank - 1 downTo 0) { @@ -88,7 +93,7 @@ class Tiling(varShape: LongArray, chunkShape: LongArray) { tile[k] = rem / tileStrider[k] rem = rem - (tile[k] * tileStrider[k]) } - print("tile $order = ${tile.contentToString()}") + // print("tile $order = ${tile.contentToString()}") // convert to index return index(tile) diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1.kt index 375b12dc..f3882047 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1.kt @@ -49,7 +49,7 @@ internal class BTree1( // here both internal and leaf are the same structure // Btree nodes Level 1A1 - Version 1 B-trees - inner class Node(val address: Long, val parent: Node?) : BTreeNodeIF { + inner class Node(val address: Long, val parent: Node?) { val level: Int val nentries: Int private val leftAddress: Long @@ -93,11 +93,11 @@ internal class BTree1( // but most nodes will point to less than that number of children"" } - override fun isLeaf() = (level == 0) + fun isLeaf() = (level == 0) - override fun nentries() = nentries + fun nentries() = nentries - override fun dataChunkEntryAt(idx: Int) = dataChunkEntries[idx] + fun dataChunkEntryAt(idx: Int) = dataChunkEntries[idx] } /** @param key the byte offset into the local heap for the first object name in the subtree which that key describes. */ @@ -129,14 +129,10 @@ internal class BTree1( override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" + ", tile= ${tiling.tile(key.offsets).contentToString()} idx=$idx" } -} -interface BTreeNodeIF { - fun isLeaf(): Boolean - fun nentries(): Int - fun dataChunkEntryAt(idx: Int) : DataChunkIF // only if isLeaf } + interface DataChunkIF { fun childAddress(): Long fun offsets(): LongArray diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt index be88f4d4..0f3f8f49 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/BTree1data.kt @@ -10,58 +10,65 @@ import kotlin.collections.mutableListOf /** a BTree1 that uses OpenFileExtended and tracks its own tiling. */ internal class BTree1data( val raf: OpenFileExtended, - val rootNodeAddress: Long, + rootNodeAddress: Long, varShape: LongArray, chunkShape: LongArray, ) { val tiling = Tiling(varShape, chunkShape) val ndimStorage = chunkShape.size + val rootNode: BTreeNode - fun rootNode(): BTreeNode = BTreeNode(rootNodeAddress, null) + init { + rootNode = BTreeNode(rootNodeAddress, null) + } + + fun asSequence(): Sequence> = sequence { + repeat( tiling.nelems) { + //val startingIndex = tiling.orderToIndex(it.toLong()) + //val indexSpace = IndexSpace(startingIndex, tiling.chunk) + yield(Pair(it.toLong(), findDataChunk(it) ?: missingDataChunk(it))) + } + } + + internal fun findDataChunk(order: Int): DataChunk? { + return rootNode.findDataChunk(order) + } // here both internal and leaf are the same structure // Btree nodes Level 1A1 - Version 1 B-trees inner class BTreeNode(val address: Long, val parent: BTreeNode?) { - val level: Int - val nentries: Int - private val leftAddress: Long - private val rightAddress: Long + var level: Int = 0 + var nentries: Int = 0 - val keys = mutableListOf() - val values = mutableListOf() + val keyValues = mutableListOf>() // tile order to DataChunk val children = mutableListOf() + var lastOrder : Int = 0 + init { - val state = OpenFileState(raf.getFileOffset(address), false) - val magic: String = raf.readString(state, 4) - check(magic == "TREE") { "DataBTree doesnt start with TREE" } - - val type: Int = raf.readByte(state).toInt() - check(type == 1) { "DataBTree must be type 1" } - - level = raf.readByte(state).toInt() // leaf nodes are level 0 - nentries = raf.readShort(state).toInt() // number of children to which this node points - leftAddress = raf.readOffset(state) - rightAddress = raf.readOffset(state) - - if (nentries == 0) { - val chunkSize = raf.readInt(state) - val filterMask = raf.readInt(state) - val inner = LongArray(ndimStorage) { j -> raf.readLong(state) } - val key = DataChunkKey(chunkSize, filterMask, inner) - val childPointer = raf.readAddress(state) - keys.add(inner) - values.add(DataChunkEntry1(this, key, childPointer)) - } else { + if (address > 0) { + val state = OpenFileState(raf.getFileOffset(address), false) + val magic: String = raf.readString(state, 4) + check(magic == "TREE") { "DataBTree doesnt start with TREE" } + + val type: Int = raf.readByte(state).toInt() + check(type == 1) { "DataBTree must be type 1" } + + level = raf.readByte(state).toInt() // leaf nodes are level 0 + nentries = raf.readShort(state).toInt() // number of children to which this node points + val leftAddress = raf.readOffset(state) + val rightAddress = raf.readOffset(state) + repeat(nentries) { val chunkSize = raf.readInt(state) val filterMask = raf.readInt(state) val inner = LongArray(ndimStorage) { j -> raf.readLong(state) } - val key = DataChunkKey(chunkSize, filterMask, inner) + val order = tiling.order(inner).toInt() + val key = DataChunkKey(order, chunkSize, filterMask) val childPointer = raf.readAddress(state) // 4 or 8 bytes, then add fileOffset if (level == 0) { - keys.add(inner) - values.add(DataChunkEntry1( this, key, childPointer)) + keyValues.add(Pair(order, DataChunk(key, childPointer))) + lastOrder = order } else { children.add(BTreeNode(childPointer, this)) } @@ -72,44 +79,52 @@ internal class BTree1data( // but most nodes will point to less than that number of children"" } - // return only the leaf nodes, in any order - fun asSequence(): Sequence> = sequence { + // this does not have missing data. Use iterator on the Btree1data class + // return only the leaf nodes, in depth-first order + fun asSequence(): Sequence> = sequence { // Handle child nodes recursively (in-order traversal) if (children.isNotEmpty()) { children.forEachIndexed { index, childNode -> yieldAll(childNode.asSequence()) // Yield all elements from the child } } else { // If it's a leaf node (no children) - keys.forEachIndexed { index, key -> - yield(tiling.order(key) to values[index]) // Yield all key-value pairs - } + keyValues.forEach { yield(it) } } } - } - data class DataChunkKey(val chunkSize: Int, val filterMask : Int, val offsets: LongArray) { - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (other !is DataChunkKey) return false - if (!offsets.contentEquals(other.offsets)) return false - return true + fun findDataChunk(wantOrder: Int): DataChunk? { + if (children.isNotEmpty()) { // search tree; assumes that chunks are ordered + children.forEach { childNode -> + if (wantOrder <= childNode.lastOrder) + return childNode.findDataChunk(wantOrder) + } + } else { // If it's a leaf node (no children) + val kv = keyValues.find { it.first == wantOrder } + return kv?.second + } + return null } - override fun hashCode(): Int { - return offsets.contentHashCode() - } } + data class DataChunkKey(val order: Int, val chunkSize: Int, val filterMask : Int) + // childAddress = data chunk (level 1) else a child node - data class DataChunkEntry1(val parent : BTreeNode, val key : DataChunkKey, val childAddress : Long) : DataChunkIF { + inner class DataChunk(val key : DataChunkKey, val childAddress : Long) : DataChunkIF { override fun childAddress() = childAddress - override fun offsets() = key.offsets + override fun offsets() = tiling.orderToIndex(key.order.toLong()) override fun isMissing() = (childAddress <= 0L) // may be 0 or -1 override fun chunkSize() = key.chunkSize override fun filterMask() = key.filterMask - override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${key.offsets.contentToString()}" + - ", tile= ${tiling.tile(key.offsets).contentToString()}" + override fun show(tiling : Tiling) : String = "chunkSize=${key.chunkSize}, chunkStart=${offsets().contentToString()}" + + ", tile= ${tiling.tile(offsets() ).contentToString()}" + + fun show() = show(tiling) + } + + fun missingDataChunk(order: Int) : DataChunk { + return DataChunk(DataChunkKey(order, 0, 0), -1L) } } diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt index ea56b658..7a0432e0 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/H5chunkConcurrent.kt @@ -4,11 +4,14 @@ package com.sunya.netchdf.hdf5 import com.sunya.cdm.api.ArraySection import com.sunya.cdm.api.Datatype +import com.sunya.cdm.api.Section +import com.sunya.cdm.api.SectionPartial import com.sunya.cdm.api.Variable import com.sunya.cdm.api.computeSize import com.sunya.cdm.api.toIntArray import com.sunya.cdm.api.toLongArray import com.sunya.cdm.iosp.OpenFileState +import com.sunya.cdm.layout.Chunker import com.sunya.cdm.layout.IndexSpace import com.sunya.cdm.layout.Tiling import com.sunya.cdm.layout.transferMissingNelems @@ -23,18 +26,25 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.yield -class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) { +class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>, wantSection: SectionPartial?) { val h5 = h5file.header val rafext: OpenFileExtended = h5.openFileExtended() + internal val bTree: BTree1data val varShape = v2.shape val chunkShape: IntArray val tiling: Tiling val nchunks: Long - internal val rootNode: BTree1data.BTreeNode - val rootAddress: Long + // internal val rootNode: BTree1data.BTreeNode + // val rootAddress: Long + val wantSpace: IndexSpace + val allData : Boolean init { + val useSection = SectionPartial.fill(wantSection, v2.shape) + wantSpace = IndexSpace(useSection) + allData = (wantSection == null) || (useSection == Section(varShape)) + require(v2.spObject is DataContainerVariable) val vinfo = v2.spObject require(vinfo.mdl is DataLayoutBTreeVer1) @@ -44,17 +54,16 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) { nchunks = tiling.tileShape.computeSize() // its not obvious you actually need a seperate raf - val bTreeExt = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray()) - rootNode = bTreeExt.rootNode() - rootAddress = mdl.btreeAddress + bTree = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray()) + // rootAddress = mdl.btreeAddress } - // TODO section: SectionPartial fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit, done: () -> Unit) { + runBlocking { val jobs = mutableListOf() val workers = mutableListOf() - val chunkProducer = produceChunks(rootNode.asSequence()) + val chunkProducer = produceChunks(bTree.asSequence()) repeat(nthreads) { val worker = Worker() jobs.add( launchJob(worker, chunkProducer, lamda)) @@ -95,9 +104,6 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) { private inner class Worker() { val rafext: OpenFileExtended = h5.openFileExtended() // here we need a seperate raf - // a thread-safe accessor of the btree - // private val btree1 = BTree1data(rafext, rootAddress, varShape, chunkShape.toLongArray()) - val vinfo: DataContainerVariable = v2.spObject as DataContainerVariable val h5type: H5TypeInfo val elemSize: Int @@ -116,31 +122,41 @@ class H5chunkConcurrent(h5file: Hdf5File, val v2: Variable<*>) { } fun work(dataChunk : DataChunkIF) : ArraySection<*>? { - // TODO check if intersect with wantSection before reading in data - val dataSpace = IndexSpace(v2.rank, dataChunk.offsets(), vinfo.storageDims) + if (!allData && !wantSpace.intersects(dataSpace)) { + return null + } + val useEntireChunk = wantSpace.contains(dataSpace) + val intersectSpace = if (useEntireChunk) dataSpace else wantSpace.intersect(dataSpace) + val ba = if (dataChunk.isMissing()) { if (debugChunking) println(" missing ${dataChunk.show(tiling)}") - val sizeBytes = dataSpace.totalElements * elemSize + val sizeBytes = intersectSpace.totalElements * elemSize val bbmissing = ByteArray(sizeBytes.toInt()) - transferMissingNelems(vinfo.fillValue, dataSpace.totalElements.toInt(), bbmissing, 0) - if (debugChunking) println(" missing transfer ${dataSpace.totalElements} fillValue=${vinfo.fillValue}") + transferMissingNelems(vinfo.fillValue, intersectSpace.totalElements.toInt(), bbmissing, 0) + if (debugChunking) println(" missing transfer ${intersectSpace.totalElements} fillValue=${vinfo.fillValue}") bbmissing } else { if (debugChunking) println(" chunkIterator=${dataChunk.show(tiling)}") state.pos = dataChunk.childAddress() val rawdata = rafext.readByteArray(state, dataChunk.chunkSize()) - if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!) + val filteredData = if (dataChunk.filterMask() == null) rawdata else filters.apply(rawdata, dataChunk.filterMask()!!) + if (useEntireChunk) { + filteredData + } else { + val chunker = Chunker(dataSpace, wantSpace) // each DataChunkEntry has its own Chunker iteration + chunker.copyOut(filteredData, 0, elemSize, intersectSpace.totalElements.toInt()) + } } val array = if (h5type.datatype5 == Datatype5.Vlen) { // internal fun H5builder.processVlenIntoArray(h5type: H5TypeInfo, shape: IntArray, ba: ByteArray, nelems: Int, elemSize : Int): ArrayTyped { - h5.processVlenIntoArray(h5type, dataSpace.shape.toIntArray(), ba, dataSpace.totalElements.toInt(), elemSize) + h5.processVlenIntoArray(h5type, intersectSpace.shape.toIntArray(), ba, intersectSpace.totalElements.toInt(), elemSize) } else { - h5.processDataIntoArray(ba, h5type.isBE, datatype, dataSpace.shape.toIntArray(), h5type, elemSize) + h5.processDataIntoArray(ba, h5type.isBE, datatype, intersectSpace.shape.toIntArray(), h5type, elemSize) } - return ArraySection(array, dataSpace.section(v2.shape)) + return ArraySection(array, intersectSpace.section(v2.shape)) } } val debugChunking = false diff --git a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt index b9a2812c..265477f0 100644 --- a/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt +++ b/core/src/commonMain/kotlin/com/sunya/netchdf/hdf5/Hdf5File.kt @@ -36,7 +36,7 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { return("DataContainerAttribute") } val vinfo = (v.spObject as DataContainerVariable) - return if (vinfo.mdl != null) vinfo.mdl.javaClass.simpleName else "none" + return vinfo.mdl.javaClass.simpleName } override fun readArrayData(v2: Variable, section: SectionPartial?): ArrayTyped { @@ -114,24 +114,27 @@ class Hdf5File(val filename : String, strict : Boolean = false) : Netchdf { return listOf>().iterator() } val wantSection = SectionPartial.fill(section, v2.shape) - val vinfo = v2.spObject as DataContainerVariable - - if (vinfo.onlyFillValue) { // fill value only, no data - val tba = TypedByteArray(v2.datatype, vinfo.fillValue, 0, isBE = vinfo.h5type.isBE) - val single = ArraySection(ArraySingle(wantSection.shape.toIntArray(), v2.datatype, tba.get(0)), wantSection) - return listOf(single).iterator() + if (v2.spObject is DataContainerVariable) { + val vinfo = v2.spObject + if (vinfo.onlyFillValue) { // fill value only, no data + val tba = TypedByteArray(v2.datatype, vinfo.fillValue, 0, isBE = vinfo.h5type.isBE) + val single = + ArraySection(ArraySingle(wantSection.shape.toIntArray(), v2.datatype, tba.get(0)), wantSection) + return listOf(single).iterator() + } } // TODO can we use concurrent reading ?? - return if (vinfo.mdl is DataLayoutBTreeVer1) { + return if (this.layoutName(v2) == "DataLayoutBTreeVer1") { H5chunkIterator(header, v2, wantSection) } else { H5maxIterator(this, v2, wantSection, maxElements ?: 100_000) } } - override fun readChunksConcurrent(v2: Variable, lamda : (ArraySection<*>) -> Unit, done : () -> Unit, nthreads: Int?) { - val reader = H5chunkConcurrent(this, v2) + override fun readChunksConcurrent(v2: Variable, lamda : (ArraySection<*>) -> Unit, done : () -> Unit, + wantSection: SectionPartial?, nthreads: Int?) { + val reader = H5chunkConcurrent(this, v2, wantSection) // TODO default nthreads ?? reader.readChunks(nthreads ?: 20, lamda, done = { done() }) } diff --git a/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt b/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1dataTest.kt similarity index 87% rename from core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt rename to core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1dataTest.kt index 6c185395..d0f689c5 100644 --- a/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1extTest.kt +++ b/core/src/commonTest/kotlin/com/sunya/netchdf/hdf5/Btree1dataTest.kt @@ -10,25 +10,25 @@ import com.sunya.netchdf.testutil.testData import kotlin.system.measureNanoTime import kotlin.test.Test -class Btree1extTest { +class Btree1dataTest { // 227322 == /home/all/testdata/cdmUnitTest/formats/netcdf4/ds.mint.nc#Minimum_temperature_surface_12_Hour_Minimum#DataLayoutBTreeVer1 } files @Test - fun testBTree1ext() { + fun testBTree1data() { val filename = testData + "netcdf4/ds.mint.nc" val varname = "Minimum_temperature_surface_12_Hour_Minimum" Hdf5File(filename).use { myfile -> println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes") - println(myfile.cdl()) + // println(myfile.cdl()) val h5 = myfile.header val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } ?: throw RuntimeException("cant find $varname") + println(" ${myvar.nameAndShape()}") val rafext: OpenFileExtended = h5.openFileExtended() - val varShape = myvar.shape require(myvar.spObject is DataContainerVariable) @@ -39,9 +39,9 @@ class Btree1extTest { // a thread-safe accessor of the btree val bTreeExt = BTree1data(rafext, mdl.btreeAddress, varShape, chunkShape.toLongArray()) - val rootNode = bTreeExt.rootNode() + // val rootNode = bTreeExt.rootNode() - rootNode.asSequence().forEach { (key, value) -> println("Key: ${key}, Value: $value") } + bTreeExt.asSequence().forEach { (key, value) -> println("Key: ${key}, Value: ${value.show()}") } } } @@ -60,9 +60,9 @@ class Btree1extTest { for (nthreads in listOf(1, 2, 4, 8, 10, 16, 20, 24, 32, 40, 48)) { val time = measureNanoTime { // fun readChunks(nthreads: Int, lamda: (ArraySection<*>) -> Unit, done: () -> Unit) { - val reader = H5chunkConcurrent(myfile, myvar) + val reader = H5chunkConcurrent(myfile, myvar, null) reader.readChunks(nthreads, { asect: ArraySection<*> -> - // println(" section = ${asect.section}") + println(" section = ${asect.section}") }, { }, ) } println("$nthreads, ${time * nano}") diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt index 1a6f6d06..83e68cf1 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/hdf5/H5readConcurrentTest.kt @@ -2,17 +2,9 @@ package com.sunya.netchdf.hdf5 -import com.sunya.cdm.util.nearlyEquals -import com.sunya.cdm.api.Variable -import com.sunya.cdm.array.ArrayTyped import com.sunya.netchdf.testfiles.H5Files -import com.sunya.netchdf.testutils.AtomicDouble -import com.sunya.netchdf.testutils.Stats import com.sunya.netchdf.testutils.compareChunkReading -import com.sunya.netchdf.testutils.compareChunkReadingForVar import com.sunya.netchdf.testutils.testData -import kotlin.collections.iterator -import kotlin.concurrent.atomics.AtomicInt import kotlin.concurrent.atomics.ExperimentalAtomicApi import kotlin.system.measureNanoTime @@ -30,19 +22,13 @@ class H5readConcurrentTest { @Test fun sanity() { - readH5concurrent(testData + "cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc", "salt") - } - - @Test - fun testReadConcurrent() { - files().forEach { filename -> - readH5concurrent(filename, null) - } + compareChunkReading(testData + "cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc", "salt") } + // array reading is failing, btree address == -1 @Test fun compareChunkReadingProblem() { - compareChunkReading(testData + "cdmUnitTest/formats/hdf5/HIRDLS/HIRDLS2-AFGL_b027_na.he5", "/HDFEOS/SWATHS/HIRDLS/Data_Fields/Altitude") + compareChunkReading(testData + "cdmUnitTest/formats/hdf5/HIRDLS/HIRPROF-AFGL_b038_na.he5", "/HDFEOS/SWATHS/HIRDLS/Data_Fields/12.20MicronAerosolExtinction") } @Test @@ -53,7 +39,16 @@ class H5readConcurrentTest { } @Test - fun testH5readConcurrent() { + fun compareChunkReadingTestStartFrom() { + var skip = true + files().forEach { filename -> + if (filename.endsWith("HIRPROF-AFGL_b038_na.he5")) skip = false + if (!skip) compareChunkReading(filename, null) + } + } + + @Test + fun timeH5readConcurrentThreads() { val filename = "/home/all/testdata/cdmUnitTest/formats/netcdf4/hiig_forec_20140208.nc" val varname = "salt" Hdf5File(filename).use { myfile : Hdf5File -> @@ -67,7 +62,7 @@ class H5readConcurrentTest { for (nthreads in listOf(1, 2, 4, 8, 10, 16, 20, 24, 32, 40, 48)) { val time = measureNanoTime { // fun readChunksConcurrent(v2: Variable, lamda : (ArraySection<*>) -> Unit, done : () -> Unit, nthreads: Int?) { - myfile.readChunksConcurrent(myvar, lamda = { it -> println(" section = ${it.section}") }, { }, nthreads) + myfile.readChunksConcurrent(myvar, lamda = { it -> println(" section = ${it.section}") }, { }, wantSection = null, nthreads) } println("$nthreads, ${time * nano}") } @@ -75,87 +70,4 @@ class H5readConcurrentTest { } } -val nano = 1.0e-9 - -fun readH5concurrent(filename: String, varname: String? = null) { - Hdf5File(filename).use { myfile -> - println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes") - println(myfile.cdl()) - var countChunks = 0 - if (varname != null) { - val myvar = myfile.rootGroup().allVariables().find { it.fullname() == varname } - ?: throw RuntimeException("cant find $varname") - countChunks += compareChunkReadingForVar(myfile, myvar) - } else { - myfile.rootGroup().allVariables().forEach { it -> - if (it.datatype.isNumber) { - countChunks += compareChunkReadingForVar(myfile, it) - } - } - } - if (countChunks > 0) { - println("${myfile.type()} $filename ${myfile.size / 1000.0 / 1000.0} Mbytes chunks = $countChunks") - } - } -} - -/* -fun testOneVarConcurrent(hdf5: Hdf5File, myvar: Variable<*>): Int { - val filename = hdf5.location().substringAfterLast('/') - - Stats.clear() - - var sumChunkIterator = 0.0 - var countChunkIterator = 0 - val time1 = measureNanoTime { - val chunkIter = hdf5.chunkIterator(myvar) - for (pair in chunkIter) { - // println(" ${pair.section} = ${pair.array.shape.contentToString()}") - sumChunkIterator += sumValues(pair.array) - countChunkIterator++ - } - } - Stats.of("chunkIterator", filename, "chunk").accum(time1, countChunkIterator) - - var sumArrayRead = 0.0 - val time3 = measureNanoTime { - val arrayData = hdf5.readArrayData(myvar, null) - sumArrayRead += sumValues(arrayData) - } - Stats.of("readArrayData", filename, "chunk").accum(time3, 1) - assertTrue(nearlyEquals(sumChunkIterator, sumArrayRead), "sumChunkIterator $sumChunkIterator != $sumArrayRead sumArrayRead") - - val counta = AtomicInt(0) - val suma = AtomicDouble(0.0) - val layout = hdf5.layoutName(myvar) - if (layout == "DataLayoutBTreeVer1") { - val time2 = measureNanoTime { - hdf5.readChunksConcurrent(myvar, { it -> - suma.getAndAdd(sumValues(it.array)) - counta.fetchAndAdd(1) - }, done = { }) - } - Stats.of("concurrentSum", filename, "chunk").accum(time2, counta.load()) - val sumConcurrentRead = suma.get() - assertTrue(nearlyEquals(sumConcurrentRead, sumArrayRead), "sumConcurrentRead $sumConcurrentRead != $sumArrayRead sumArrayRead") - } - - Stats.show() - - return countChunkIterator -} - -fun sumValues(array: ArrayTyped<*>): Double { - var result = 0.0 - if (!array.datatype.isNumber) return result - - for (value in array) { - val number = (value as Number) - val numberd: Double = number.toDouble() - if (numberd.isFinite()) { - result += numberd - } - } - return result -} -*/ \ No newline at end of file +val nano = 1.0e-9 \ No newline at end of file diff --git a/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt b/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt index e182d067..fb178988 100644 --- a/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt +++ b/testfiles/src/test/kotlin/com/sunya/netchdf/testutils/CompareChunkReading.kt @@ -40,7 +40,7 @@ fun compareChunkReading(filename: String, varname : String? = null) { fun compareChunkReadingForVar(myfile: Netchdf, myvar: Variable<*>): Int { val filename = myfile.location().substringAfterLast('/') - + println(" ${myvar.nameAndShape()}") Stats.clear() var sumChunkIterator = 0.0 @@ -49,8 +49,14 @@ fun compareChunkReadingForVar(myfile: Netchdf, myvar: Variable<*>): Int { val chunkIter = myfile.chunkIterator(myvar) for (pair in chunkIter) { // println(" ${pair.section} = ${pair.array.shape.contentToString()}") - sumChunkIterator += sumValues(pair.array) + val sum = sumValues(pair.array) + sumChunkIterator += sum countChunkIterator++ + // println("chunk ${pair.section} sum $sum") + /* if (pair.section.toString().contains("[0:0][0:17][0:97][148:295]")) { + println(pair) + } */ + } } Stats.of("chunkIterator", filename, "chunk").accum(time1, countChunkIterator) @@ -71,11 +77,17 @@ fun compareChunkReadingForVar(myfile: Netchdf, myvar: Variable<*>): Int { if (layout == "DataLayoutBTreeVer1") { val time2 = measureNanoTime { hdf5.readChunksConcurrent(myvar, { it -> - suma.getAndAdd(sumValues(it.array)) + val sum = sumValues(it.array) + suma.getAndAdd(sum) counta.fetchAndAdd(1) + // println(" chunk ${it.section} sum $sum") + /* if (it.section.toString().contains("[0:0][0:17][0:97][148:295]")) { + println(it) + } */ }, done = { }) } - Stats.of("concurrentSum", filename, "chunk").accum(time2, counta.load()) + val countConcurrentRead = counta.load() + Stats.of("concurrentSum", filename, "chunk").accum(time2,countConcurrentRead ) val sumConcurrentRead = suma.get() assertTrue( nearlyEquals(sumConcurrentRead, sumArrayRead), @@ -84,60 +96,40 @@ fun compareChunkReadingForVar(myfile: Netchdf, myvar: Variable<*>): Int { } } - Stats.show() + // Stats.show() return countChunkIterator } -/* compare readArrayData with chunkIterator -private fun compareChunkReadingForVar(myFile: Netchdf, myvar: Variable<*>, compare : Boolean = true): Int { - println(" compareChunkReadingForVar ${myvar.nameAndShape()}") - val filename = myFile.location().substringAfterLast('/') - val varBytes = myvar.nelems - if (varBytes >= maxBytes) { - println(" *** ${myvar.nameAndShape()} skip reading ArrayData too many bytes= $varBytes max = $maxBytes") - return 0 - } +private fun sumValues(array : ArrayTyped<*>): Double { + var result = 0.0 - var sumArrayData = 0.0 - if (compare) { - val time3 = measureNanoTime { - val arrayData = myFile.readArrayData(myvar, null) - sumArrayData = sumValues(arrayData) + if (array.datatype.isNumber) { + for (value in array) { + val number = (value as Number) + val numberd: Double = number.toDouble() + if (numberd.isFinite()) { + result += numberd + } } - Stats.of("readArrayData", filename, "chunk").accum(time3, 1) - } - - var nchunks = 0 - - if (myFile is Hdf5File) { - val layout = myFile.layoutName(myvar) - if (layout == "DataLayoutBTreeVer1") { - val countChunks = AtomicInt(0) - val sumChunkIterator = AtomicDouble(0.0) - val time1 = measureNanoTime { - myFile.readChunksConcurrent( - myvar, - lamda = { - countChunks.fetchAndAdd(1) - sumChunkIterator.getAndAdd(sumValues(it.array)) - }, - done = {}, - 10 - ) + } else if (array.datatype.isIntegral) { + for (value in array) { + val useValue = when (value) { + is UByte -> value.toByte() + is UShort -> value.toShort() + is UInt -> value.toInt() + is ULong -> value.toLong() + else -> value + } + val number = (useValue as Number) + val numberd: Double = number.toDouble() + if (numberd.isFinite()) { + result += numberd } - nchunks = countChunks.load() - val sumChunksConcurrent = sumChunkIterator.get() - Stats.of("chunkIterator", filename, "chunk").accum(time1, nchunks) - - assertTrue( - nearlyEquals(sumChunksConcurrent, sumArrayData), - "sumChunksConcurrent $sumChunksConcurrent != $sumArrayData sumArrayData for variable '${myvar.fullname()}'" - ) } } - return nchunks -} */ + return result +} ////////////////////////////////////////////////////////////////////////////////////// // compare reading data chunkIterate API with Netch and NC @@ -197,39 +189,5 @@ private fun compareOneVarIterate(myvar: Variable<*>, myfile: Netchdf, ncvar : Va } } -/////////////////////////////////////////////////////////// -private fun sumValues(array : ArrayTyped<*>): Double { - var result = 0.0 - if (array is ArraySingle || array is ArrayEmpty) { - return result // test fillValue the same ?? - } - - if (array.datatype.isNumber) { - for (value in array) { - val number = (value as Number) - val numberd: Double = number.toDouble() - if (numberd.isFinite()) { - result += numberd - } - } - } else if (array.datatype.isIntegral) { - for (value in array) { - val useValue = when (value) { - is UByte -> value.toByte() - is UShort -> value.toShort() - is UInt -> value.toInt() - is ULong -> value.toLong() - else -> value - } - val number = (useValue as Number) - val numberd: Double = number.toDouble() - if (numberd.isFinite()) { - result += numberd - } - } - } - return result -} -