-
Notifications
You must be signed in to change notification settings - Fork 93
Expand file tree
/
Copy pathbench_batch_arrow_vs_dataframe.py
More file actions
226 lines (195 loc) · 6.68 KB
/
bench_batch_arrow_vs_dataframe.py
File metadata and controls
226 lines (195 loc) · 6.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Benchmark: query_table_batch + read_arrow_batch vs query_table + read_data_frame.
Compares throughput and elapsed time when reading the same table data via
- Arrow path: query_table_batch(batch_size=N) then read_arrow_batch() in a loop
- DataFrame path: query_table() then result.next() + read_data_frame(N) in a loop
Run from project root or python/tests, e.g.:
python -m pytest tests/bench_batch_arrow_vs_dataframe.py -v -s
python tests/bench_batch_arrow_vs_dataframe.py # if run as script
"""
import os
import sys
import time
from os import remove
import pandas as pd
import pyarrow as pa
import pytest
from tsfile import (
ColumnSchema,
ColumnCategory,
TSDataType,
TableSchema,
TsFileReader,
TsFileTableWriter,
)
# Default benchmark size
DEFAULT_ROW_COUNT = 50_000
DEFAULT_BATCH_SIZE = 4096
DEFAULT_TIMED_ROUNDS = 3
BENCH_FILE = "bench_arrow_vs_dataframe.tsfile"
TABLE_NAME = "bench_table"
COLUMNS = ["device", "value1", "value2"]
def _ensure_bench_tsfile(file_path: str, row_count: int) -> None:
"""Create tsfile with table data if not present. Uses DataFrame for fast data generation."""
if os.path.exists(file_path):
remove(file_path)
# Build data with pandas/numpy (vectorized, much faster than row-by-row Tablet)
import numpy as np
df = pd.DataFrame(
{
"time": np.arange(row_count, dtype=np.int64),
"device": pd.Series([f"device" for i in range(row_count)]),
"value1": np.arange(0, row_count * 10, 10, dtype=np.int64),
"value2": np.arange(row_count, dtype=np.float64) * 1.5,
}
)
table = TableSchema(
TABLE_NAME,
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
with TsFileTableWriter(file_path, table) as writer:
writer.write_dataframe(df)
def _read_via_arrow(file_path: str, batch_size: int, end_time: int) -> int:
"""Read all rows using query_table_batch + read_arrow_batch. Returns total rows."""
reader = TsFileReader(file_path)
result_set = reader.query_table(
table_name=TABLE_NAME,
column_names=COLUMNS,
start_time=0,
end_time=end_time,
batch_size=batch_size,
)
total_rows = 0
try:
while True:
batch = result_set.read_arrow_batch()
if batch is None:
break
total_rows += len(batch)
finally:
result_set.close()
reader.close()
return total_rows
def _read_via_dataframe(file_path: str, batch_size: int, end_time: int) -> int:
"""Read all rows using query_table + next + read_data_frame. Returns total rows."""
reader = TsFileReader(file_path)
result_set = reader.query_table(
TABLE_NAME,
COLUMNS,
start_time=0,
end_time=end_time,
)
total_rows = 0
try:
while result_set.next():
df = result_set.read_data_frame(max_row_num=batch_size)
if df is None or len(df) == 0:
break
total_rows += len(df)
finally:
result_set.close()
reader.close()
return total_rows
def _run_timed(name: str, func, *args, rounds: int = DEFAULT_TIMED_ROUNDS):
times = []
for _ in range(rounds):
start = time.perf_counter()
n = func(*args)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = sum(times) / len(times)
total_rows = n
rows_per_sec = total_rows / avg if avg > 0 else 0
print(
f" {name}: {avg:.3f}s avg ({min(times):.3f}s min) rows={total_rows} {rows_per_sec:.0f} rows/s"
)
return avg, total_rows
def run_benchmark(
row_count: int = DEFAULT_ROW_COUNT,
batch_size: int = DEFAULT_BATCH_SIZE,
timed_rounds: int = DEFAULT_TIMED_ROUNDS,
file_path: str = BENCH_FILE,
):
_ensure_bench_tsfile(file_path, row_count)
end_time = row_count + 1
print(
f"Benchmark: {row_count} rows, batch_size={batch_size}, timed_rounds={timed_rounds}"
)
df_avg, df_rows = _run_timed(
"query_table + read_data_frame",
_read_via_dataframe,
file_path,
batch_size,
end_time,
rounds=timed_rounds,
)
arrow_avg, arrow_rows = _run_timed(
"query_table_batch + read_arrow_batch",
_read_via_arrow,
file_path,
batch_size,
end_time,
rounds=timed_rounds,
)
print()
if df_avg > 0:
speedup = arrow_avg / df_avg
print(
f" Arrow vs DataFrame time ratio: {speedup:.2f}x ({'Arrow faster' if speedup < 1 else 'DataFrame faster'})"
)
assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}"
assert arrow_rows == row_count, f"Arrow path row count {arrow_rows} != {row_count}"
print()
return df_avg, arrow_avg
def test_bench_arrow_vs_dataframe_default():
"""Run benchmark with default size (quick sanity check)."""
run_benchmark(
row_count=5_000,
batch_size=1024,
timed_rounds=2,
)
def test_bench_arrow_vs_dataframe_medium():
"""Run benchmark with medium size."""
run_benchmark(
row_count=DEFAULT_ROW_COUNT,
batch_size=DEFAULT_BATCH_SIZE,
timed_rounds=DEFAULT_TIMED_ROUNDS,
)
def test_bench_arrow_vs_dataframe_large():
run_benchmark(
row_count=2000_000,
batch_size=8192,
timed_rounds=3,
)
if __name__ == "__main__":
row_count = DEFAULT_ROW_COUNT
batch_size = DEFAULT_BATCH_SIZE
if len(sys.argv) > 1:
row_count = int(sys.argv[1])
if len(sys.argv) > 2:
batch_size = int(sys.argv[2])
run_benchmark(row_count=row_count, batch_size=batch_size)
# Clean up bench file when run as script (optional)
if os.path.exists(BENCH_FILE):
os.remove(BENCH_FILE)