|
17 | 17 |
|
18 | 18 | use arrow::array::{Array, ArrayRef, StringArray}; |
19 | 19 | use arrow::util::test_util::seedable_rng; |
| 20 | +use arrow_schema::{DataType, Field, Fields}; |
| 21 | +use chrono::{DateTime, Utc}; |
20 | 22 | use criterion::{Criterion, criterion_group, criterion_main}; |
21 | | -use parquet_variant::{Variant, VariantBuilder}; |
| 23 | +use parquet_variant::{Uuid, Variant, VariantBuilder, VariantBuilderExt}; |
22 | 24 | use parquet_variant_compute::{ |
23 | | - GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, variant_get, |
| 25 | + GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, shred_variant, variant_get, |
24 | 26 | }; |
25 | 27 | use rand::Rng; |
26 | 28 | use rand::SeedableRng; |
27 | 29 | use rand::distr::Alphanumeric; |
28 | 30 | use rand::rngs::StdRng; |
29 | 31 | use std::fmt::Write; |
30 | 32 | use std::sync::Arc; |
31 | | -fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { |
| 33 | + |
| 34 | +fn json_to_variant_bench(c: &mut Criterion) { |
32 | 35 | let input_array = StringArray::from_iter_values(json_repeated_struct(8000)); |
33 | 36 | let array_ref: ArrayRef = Arc::new(input_array); |
34 | 37 | c.bench_function( |
@@ -94,14 +97,58 @@ pub fn variant_get_bench(c: &mut Criterion) { |
94 | 97 | }; |
95 | 98 |
|
96 | 99 | c.bench_function("variant_get_primitive", |b| { |
97 | | - b.iter(|| variant_get(&input.clone(), options.clone())) |
| 100 | + b.iter(|| variant_get(&input, options.clone())) |
| 101 | + }); |
| 102 | +} |
| 103 | + |
| 104 | +pub fn shred_variant_bench(c: &mut Criterion) { |
| 105 | + // This benchmark models shredding semi-structured log entries |
| 106 | + // where each entry has a common set of fields, some optional fields, |
| 107 | + // and some random extra fields. |
| 108 | + // |
| 109 | + let mut generator = VariantLogGenerator { |
| 110 | + rows_per_batch: 8192, |
| 111 | + optional_field_prob: 0.0, |
| 112 | + extra_field_prob: 0.0, |
| 113 | + rng: StdRng::seed_from_u64(42), |
| 114 | + }; |
| 115 | + |
| 116 | + // shred out the common and optional fields |
| 117 | + // leaving the rest in the value field |
| 118 | + let shredding_schema = DataType::Struct(Fields::from(vec![ |
| 119 | + Field::new("timestamp", DataType::Utf8, true), |
| 120 | + Field::new("level", DataType::Utf8, true), |
| 121 | + Field::new("message", DataType::Utf8, true), |
| 122 | + Field::new("user_id", DataType::Int64, true), |
| 123 | + Field::new("session_id", DataType::Utf8, true), |
| 124 | + ])); |
| 125 | + |
| 126 | + // Variants have only required fields |
| 127 | + let variant_array = generator.next().unwrap(); |
| 128 | + c.bench_function("shred_variant common fields", |b| { |
| 129 | + b.iter(|| shred_variant(&variant_array, &shredding_schema)) |
| 130 | + }); |
| 131 | + |
| 132 | + // Variants with some optional fields |
| 133 | + generator.optional_field_prob = 0.5; |
| 134 | + let variant_array = generator.next().unwrap(); |
| 135 | + c.bench_function("shred_variant optional fields", |b| { |
| 136 | + b.iter(|| shred_variant(&variant_array, &shredding_schema)) |
| 137 | + }); |
| 138 | + |
| 139 | + // Variants with optional fields and random extra fields |
| 140 | + generator.extra_field_prob = 0.7; |
| 141 | + let variant_array = generator.next().unwrap(); |
| 142 | + c.bench_function("shred_variant extra fields", |b| { |
| 143 | + b.iter(|| shred_variant(&variant_array, &shredding_schema)) |
98 | 144 | }); |
99 | 145 | } |
100 | 146 |
|
101 | 147 | criterion_group!( |
102 | 148 | benches, |
103 | 149 | variant_get_bench, |
104 | | - benchmark_batch_json_string_to_variant |
| 150 | + shred_variant_bench, |
| 151 | + json_to_variant_bench |
105 | 152 | ); |
106 | 153 | criterion_main!(benches); |
107 | 154 |
|
@@ -362,3 +409,116 @@ impl RandomJsonGenerator { |
362 | 409 | panic!("Random value did not match any type"); |
363 | 410 | } |
364 | 411 | } |
| 412 | + |
| 413 | +/// Data generator for VariantArrays that simulate structured log entries. |
| 414 | +/// |
| 415 | +/// Each entry is an object with |
| 416 | +/// 1. fields like "timestamp", "level", "message", that always appear |
| 417 | +/// 2. Fields like "user_id", "session_id", that appear in some entries |
| 418 | +/// 3. Arbitrary extra fields that should be preserved in the value field |
| 419 | +/// |
| 420 | +/// |
| 421 | +/// Example entries: |
| 422 | +/// ```json |
| 423 | +/// { |
| 424 | +/// "timestamp": "2024-10-01T12:00:00", -- always present |
| 425 | +/// "level": "INFO", -- always present |
| 426 | +/// "message": "User logged in", -- always present |
| 427 | +/// "user_id": 12345, -- optional |
| 428 | +/// "session_id": "abcde", -- optional |
| 429 | +/// "extra_field_123": "extra_value_456" -- arbitrary extra field |
| 430 | +/// } |
| 431 | +/// ``` |
| 432 | +struct VariantLogGenerator { |
| 433 | + /// How many rows per batch |
| 434 | + rows_per_batch: usize, |
| 435 | + /// Probability of including optional fields (0 to 1) |
| 436 | + optional_field_prob: f64, |
| 437 | + /// Probability of including extra arbitrary fields (0 to 1) |
| 438 | + extra_field_prob: f64, |
| 439 | + /// Random number generator |
| 440 | + rng: StdRng, |
| 441 | +} |
| 442 | + |
| 443 | +impl Iterator for VariantLogGenerator { |
| 444 | + type Item = VariantArray; |
| 445 | + |
| 446 | + fn next(&mut self) -> Option<Self::Item> { |
| 447 | + Some(self.next_array()) |
| 448 | + } |
| 449 | +} |
| 450 | + |
| 451 | +impl VariantLogGenerator { |
| 452 | + fn next_array(&mut self) -> VariantArray { |
| 453 | + let mut builder = VariantArrayBuilder::new(1000); |
| 454 | + for _ in 0..self.rows_per_batch { |
| 455 | + let mut obj_builder = builder.new_object(); |
| 456 | + |
| 457 | + obj_builder = obj_builder |
| 458 | + .with_field("timestamp", self.random_timestamp()) |
| 459 | + .with_field("level", self.random_level()) |
| 460 | + .with_field("message", self.random_message()); |
| 461 | + |
| 462 | + // Optional fields |
| 463 | + if self.rng.random::<f64>() < self.optional_field_prob { |
| 464 | + obj_builder = obj_builder.with_field("user_id", self.rng.random_range(1000..9999)); |
| 465 | + } |
| 466 | + if self.rng.random::<f64>() < self.optional_field_prob { |
| 467 | + obj_builder = obj_builder.with_field("session_id", Uuid::new_v4()); |
| 468 | + } |
| 469 | + |
| 470 | + // Random extra fields |
| 471 | + if self.rng.random::<f64>() < self.extra_field_prob { |
| 472 | + let num_extra_fields = self.rng.random_range(1..4); |
| 473 | + for _ in 0..num_extra_fields { |
| 474 | + // totally random field name and value (modeling random logging payloads) |
| 475 | + let field_name = format!("extra_field_{}", self.rng.random_range(1..100000000)); |
| 476 | + let field_value = |
| 477 | + format!("extra_value_{}", self.rng.random_range(1..100000000)); |
| 478 | + obj_builder = obj_builder.with_field(field_name.as_str(), field_value.as_str()); |
| 479 | + } |
| 480 | + } |
| 481 | + |
| 482 | + obj_builder.finish(); |
| 483 | + } |
| 484 | + builder.build() |
| 485 | + } |
| 486 | + |
| 487 | + fn random_timestamp(&mut self) -> DateTime<Utc> { |
| 488 | + // random timestamp |
| 489 | + let hour = self.rng.random_range(0..24); |
| 490 | + let minute = self.rng.random_range(0..60); |
| 491 | + let second = self.rng.random_range(0..60); |
| 492 | + let day = self.rng.random_range(1..28); |
| 493 | + let month = self.rng.random_range(1..=12); |
| 494 | + let year = self.rng.random_range(2020..=2024); |
| 495 | + let naive = chrono::NaiveDate::from_ymd_opt(year, month, day) |
| 496 | + .unwrap() |
| 497 | + .and_hms_opt(hour, minute, second) |
| 498 | + .unwrap(); |
| 499 | + DateTime::from_naive_utc_and_offset(naive, Utc) |
| 500 | + } |
| 501 | + |
| 502 | + /// Random level from ["DEBUG", "INFO", "WARN", "ERROR"] |
| 503 | + fn random_level(&mut self) -> &'static str { |
| 504 | + let levels = ["DEBUG", "INFO", "WARN", "ERROR"]; |
| 505 | + levels[self.rng.random_range(0..levels.len())] |
| 506 | + } |
| 507 | + |
| 508 | + /// Generate a random log message |
| 509 | + fn random_message(&mut self) -> &str { |
| 510 | + let messages = [ |
| 511 | + "User logged in", |
| 512 | + "User logged out", |
| 513 | + "File not found", |
| 514 | + "Connection established", |
| 515 | + "Error processing request", |
| 516 | + // a few longer messages |
| 517 | + "Database connection timed out after multiple attempts", |
| 518 | + "User attempted to access restricted resource without proper authorization", |
| 519 | + "Scheduled maintenance will occur at midnight UTC", |
| 520 | + ]; |
| 521 | + |
| 522 | + messages[self.rng.random_range(0..messages.len())] |
| 523 | + } |
| 524 | +} |
0 commit comments