From e3ed0b8ee57d6a485af0977c11d73fcbd7e81230 Mon Sep 17 00:00:00 2001 From: ehsk Date: Wed, 26 Nov 2025 20:39:27 +0000 Subject: [PATCH 01/18] argument for freezing vision encoder parameters added --- conf/chartqa.yaml | 1 + conf/finetune/base.yaml | 6 +++++ pipelinerl/finetune_loop.py | 51 +++++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+) diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index 154db7ca..4b9c3875 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -8,6 +8,7 @@ finetune: seq_length: 8000 gradient_accumulation_passes: 512 seq_packing: false + freeze_vision_tower: true llm: parameters: diff --git a/conf/finetune/base.yaml b/conf/finetune/base.yaml index 4998a8bf..c80c3423 100644 --- a/conf/finetune/base.yaml +++ b/conf/finetune/base.yaml @@ -3,6 +3,12 @@ data: null model_class: causal-language-modeling # Model name or path of model to be trained. config_name: ${..model_path} +# Freeze vision tower for vision-language models (only applicable for vision2seq-language-modeling) +# Auto-detects common patterns: visual., vision_tower., vision_model., vision_embed_tokens., vit., qformer. +freeze_vision_tower: false +# Optional: Manually specify parameter prefixes to freeze (e.g., ["visual.", "qformer."]) +# If null, auto-detection will be used when freeze_vision_tower=true +vision_encoder_prefixes: null # Optimizer type, supported: adamw_torch, adafactor, cpuadam, lion optim: adamw_torch # use half precision training, full bf16 without mixed precision copies at all diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index da39938e..75018932 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -352,6 +352,57 @@ def run_finetuning_loop( model = load_model(args, args.model_class, current_dir) logger.info(f"Model loaded in dtype {model.dtype}") + # Freeze vision tower if specified + freeze_vision_tower = getattr(args, "freeze_vision_tower", False) + vision_encoder_prefixes = getattr(args, "vision_encoder_prefixes", None) + + if freeze_vision_tower: + # Auto-detect common vision encoder patterns if not specified + if vision_encoder_prefixes is None: + common_prefixes = [ + "visual.", # Qwen-VL, Qwen2-VL + "vision_tower.", # LLaVA + "vision_model.", # InstructBLIP, BLIP-2 + "vision_embed_tokens.", # Phi-3-Vision + "vit.", # CogVLM + "qformer.", # BLIP-2 Q-Former + ] + vision_encoder_prefixes = common_prefixes + + vision_encoder_parameters = set() + + # Check which prefixes exist in the model + for prefix in common_prefixes: + if any(name.startswith(prefix) for name, _ in model.named_parameters()): + vision_encoder_parameters.add(prefix) + + if not vision_encoder_parameters: + logger.warning( + "freeze_vision_tower=True but could not auto-detect vision encoder. " + "No parameters matching common patterns: " + ", ".join(common_prefixes) + ". " + "Set 'vision_encoder_prefixes' in config to specify manually." + ) + else: + logger.debug(f"Freezing vision encoder with prefixes: {vision_encoder_prefixes}") + total_params = 0 + frozen_params = 0 + frozen_param_names = [] + + for name, param in model.named_parameters(): + total_params += param.numel() + if name in vision_encoder_parameters: + param.requires_grad = False + frozen_params += param.numel() + frozen_param_names.append(name) + + trainable_params = total_params - frozen_params + logger.info( + f"Frozen vision encoder: {frozen_params:,} params | " + f"Trainable: {trainable_params:,} params | " + f"Total: {total_params:,} params | " + f"Trainable%: {100 * trainable_params / total_params:.2f}%" + ) + dt = log_time(dt, time_stats, "finetune/model_load") data_stream = SingleStreamSpec( From 97943e21ba6fb92cf58efdbb68c7e25be4bf453b Mon Sep 17 00:00:00 2001 From: ehsk Date: Wed, 26 Nov 2025 21:12:33 +0000 Subject: [PATCH 02/18] freezing vision tower code simplified --- conf/finetune/base.yaml | 4 --- pipelinerl/finetune/checkpoints.py | 33 +++++++++++++++++++ pipelinerl/finetune_loop.py | 52 ------------------------------ 3 files changed, 33 insertions(+), 56 deletions(-) diff --git a/conf/finetune/base.yaml b/conf/finetune/base.yaml index c80c3423..0741249d 100644 --- a/conf/finetune/base.yaml +++ b/conf/finetune/base.yaml @@ -4,11 +4,7 @@ model_class: causal-language-modeling # Model name or path of model to be trained. config_name: ${..model_path} # Freeze vision tower for vision-language models (only applicable for vision2seq-language-modeling) -# Auto-detects common patterns: visual., vision_tower., vision_model., vision_embed_tokens., vit., qformer. freeze_vision_tower: false -# Optional: Manually specify parameter prefixes to freeze (e.g., ["visual.", "qformer."]) -# If null, auto-detection will be used when freeze_vision_tower=true -vision_encoder_prefixes: null # Optimizer type, supported: adamw_torch, adafactor, cpuadam, lion optim: adamw_torch # use half precision training, full bf16 without mixed precision copies at all diff --git a/pipelinerl/finetune/checkpoints.py b/pipelinerl/finetune/checkpoints.py index 9d949e17..f6ddc521 100644 --- a/pipelinerl/finetune/checkpoints.py +++ b/pipelinerl/finetune/checkpoints.py @@ -129,6 +129,39 @@ def load_model(args, model_class, current_dir): gradient_checkpointing_kwargs={"use_reentrant": args.reentrant_checkpointing} ) + # Freeze vision tower if specified + freeze_vision_tower = getattr(args, "freeze_vision_tower", False) + if freeze_vision_tower: + # Try to get vision tower module from the model + vision_tower = None + if hasattr(model, "visual"): + vision_tower = model.visual # Qwen-VL, Qwen2-VL, Qwen2.5-VL, Qwen3-VL + elif hasattr(model, "vision_tower"): + vision_tower = model.vision_tower # LLaVA + elif hasattr(model, "vision_model"): + vision_tower = model.vision_model # BLIP-2, InstructBLIP + + if vision_tower is not None: + vision_tower.requires_grad_(False) + + # Count frozen parameters + total_params = sum(p.numel() for p in model.parameters()) + frozen_params = sum(p.numel() for p in vision_tower.parameters()) + trainable_params = total_params - frozen_params + + logger.info( + f"Vision tower frozen: {frozen_params:,} params | " + f"Trainable: {trainable_params:,} params | " + f"Total: {total_params:,} params | " + f"Trainable%: {trainable_params / total_params:.2%}" + ) + else: + logger.warning( + "freeze_vision_tower=True but could not find vision tower. " + "Checked attributes: model.visual (Qwen*-VL), model.vision_tower (LlaVA), model.vision_model (BLIP-2, InstructBLIP). " + "So setting this parameter does not have any effect." + ) + get_accelerator().wait_for_everyone() return model diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 75018932..1cc9d2b1 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -351,58 +351,6 @@ def run_finetuning_loop( logger.info("About to load model") model = load_model(args, args.model_class, current_dir) logger.info(f"Model loaded in dtype {model.dtype}") - - # Freeze vision tower if specified - freeze_vision_tower = getattr(args, "freeze_vision_tower", False) - vision_encoder_prefixes = getattr(args, "vision_encoder_prefixes", None) - - if freeze_vision_tower: - # Auto-detect common vision encoder patterns if not specified - if vision_encoder_prefixes is None: - common_prefixes = [ - "visual.", # Qwen-VL, Qwen2-VL - "vision_tower.", # LLaVA - "vision_model.", # InstructBLIP, BLIP-2 - "vision_embed_tokens.", # Phi-3-Vision - "vit.", # CogVLM - "qformer.", # BLIP-2 Q-Former - ] - vision_encoder_prefixes = common_prefixes - - vision_encoder_parameters = set() - - # Check which prefixes exist in the model - for prefix in common_prefixes: - if any(name.startswith(prefix) for name, _ in model.named_parameters()): - vision_encoder_parameters.add(prefix) - - if not vision_encoder_parameters: - logger.warning( - "freeze_vision_tower=True but could not auto-detect vision encoder. " - "No parameters matching common patterns: " + ", ".join(common_prefixes) + ". " - "Set 'vision_encoder_prefixes' in config to specify manually." - ) - else: - logger.debug(f"Freezing vision encoder with prefixes: {vision_encoder_prefixes}") - total_params = 0 - frozen_params = 0 - frozen_param_names = [] - - for name, param in model.named_parameters(): - total_params += param.numel() - if name in vision_encoder_parameters: - param.requires_grad = False - frozen_params += param.numel() - frozen_param_names.append(name) - - trainable_params = total_params - frozen_params - logger.info( - f"Frozen vision encoder: {frozen_params:,} params | " - f"Trainable: {trainable_params:,} params | " - f"Total: {total_params:,} params | " - f"Trainable%: {100 * trainable_params / total_params:.2f}%" - ) - dt = log_time(dt, time_stats, "finetune/model_load") data_stream = SingleStreamSpec( From 0e336be664dbe23b32f242a6776a5fda102dab3d Mon Sep 17 00:00:00 2001 From: ehsk Date: Wed, 26 Nov 2025 21:18:14 +0000 Subject: [PATCH 03/18] minor issue fixed --- pipelinerl/finetune/checkpoints.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pipelinerl/finetune/checkpoints.py b/pipelinerl/finetune/checkpoints.py index f6ddc521..d7a5d040 100644 --- a/pipelinerl/finetune/checkpoints.py +++ b/pipelinerl/finetune/checkpoints.py @@ -149,12 +149,18 @@ def load_model(args, model_class, current_dir): frozen_params = sum(p.numel() for p in vision_tower.parameters()) trainable_params = total_params - frozen_params - logger.info( - f"Vision tower frozen: {frozen_params:,} params | " - f"Trainable: {trainable_params:,} params | " - f"Total: {total_params:,} params | " - f"Trainable%: {trainable_params / total_params:.2%}" - ) + if total_params > 0: + logger.info( + f"Vision tower frozen: {frozen_params:,} params | " + f"Trainable: {trainable_params:,} params | " + f"Total: {total_params:,} params | " + f"Trainable%: {trainable_params / total_params:.2%}" + ) + else: + logger.warning( + "Total parameters is 0, cannot compute trainable percentage. " + "This indicates freeze_vision_tower may not have been applied correctly or the model has a different structure than expected." + ) else: logger.warning( "freeze_vision_tower=True but could not find vision tower. " From 7d17b5ac98c2332c971c4ef2dd3f2a99a1ee167b Mon Sep 17 00:00:00 2001 From: ehsk Date: Thu, 27 Nov 2025 19:35:22 +0000 Subject: [PATCH 04/18] removed unnecessary logs --- pipelinerl/finetune/checkpoints.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pipelinerl/finetune/checkpoints.py b/pipelinerl/finetune/checkpoints.py index d7a5d040..210d1029 100644 --- a/pipelinerl/finetune/checkpoints.py +++ b/pipelinerl/finetune/checkpoints.py @@ -143,24 +143,7 @@ def load_model(args, model_class, current_dir): if vision_tower is not None: vision_tower.requires_grad_(False) - - # Count frozen parameters - total_params = sum(p.numel() for p in model.parameters()) - frozen_params = sum(p.numel() for p in vision_tower.parameters()) - trainable_params = total_params - frozen_params - - if total_params > 0: - logger.info( - f"Vision tower frozen: {frozen_params:,} params | " - f"Trainable: {trainable_params:,} params | " - f"Total: {total_params:,} params | " - f"Trainable%: {trainable_params / total_params:.2%}" - ) - else: - logger.warning( - "Total parameters is 0, cannot compute trainable percentage. " - "This indicates freeze_vision_tower may not have been applied correctly or the model has a different structure than expected." - ) + logger.info("Vision tower parameters frozen successfully (i.e. its parameters will be excluded from optimizer)") else: logger.warning( "freeze_vision_tower=True but could not find vision tower. " From 691073dcfbffbc52cf6601f4d85eb0eb6bfa681f Mon Sep 17 00:00:00 2001 From: ehsk Date: Thu, 27 Nov 2025 19:35:54 +0000 Subject: [PATCH 05/18] non-trainable parameters excluded from grouped_parameters --- pipelinerl/finetune/optim.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pipelinerl/finetune/optim.py b/pipelinerl/finetune/optim.py index 268e88cf..b1a7f738 100644 --- a/pipelinerl/finetune/optim.py +++ b/pipelinerl/finetune/optim.py @@ -12,6 +12,9 @@ def get_grouped_params( ): params_with_wd, params_without_wd = [], [] for n, p in model.named_parameters(): + # Skip frozen parameters + if not p.requires_grad: + continue if any(nd in n for nd in no_decay): params_without_wd.append(p) else: From 32d8985c376594bc5cba63923de020a025de1b2d Mon Sep 17 00:00:00 2001 From: ehsk Date: Fri, 28 Nov 2025 15:42:06 +0000 Subject: [PATCH 06/18] replace "python" with current executable python depending on current env --- pipelinerl/launch.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pipelinerl/launch.py b/pipelinerl/launch.py index 77017602..a7973856 100644 --- a/pipelinerl/launch.py +++ b/pipelinerl/launch.py @@ -88,7 +88,7 @@ def run_ref_llm(cfg: DictConfig, preprocessor_llm_idx: int, local_idx: int, gpus os.makedirs(log_dir, exist_ok=True) cmd = [ - "python", + sys.executable, "-m", "vllm.entrypoints.openai.api_server", "--model", @@ -140,7 +140,7 @@ def run_actor_llm( "pipelinerl.entrypoints.run_vllm0" ) cmd = [ - "python", + sys.executable, "-m", entrypoint, "--model", @@ -190,7 +190,7 @@ def run_actor(world_map: WorldMap, actor_idx: int, exp_dir: Path): raise NotImplementedError("Can only do 1 actor yet") llm_urls = "+".join(world_map.get_actor_urls()) cmd = [ - "python", + sys.executable, "-m", "pipelinerl.entrypoints.run_actor", "--config-dir", @@ -215,7 +215,7 @@ def run_environment(cfg: DictConfig, job: Job): # run in a subprocess like in the rest of the code run_dir = Path(cfg.output_dir) / f"environment_{job.replica_idx}" cmd = [ - "python", + sys.executable, "-m", "pipelinerl.entrypoints.run_environment", "--config-dir", @@ -246,7 +246,7 @@ def run_finetune(cfg: DictConfig, world_map: WorldMap, gpus: list[int], exp_dir: if cfg.use_fsdp and cfg.use_deepspeed: raise ValueError("Cannot use both FSDP and DeepSpeed") cmd = [ - "python", + sys.executable, "-m", "accelerate.commands.launch", ] @@ -343,7 +343,7 @@ def run_preprocess(world_map: WorldMap, preprocessor_idx: int, exp_dir: Path): raise NotImplementedError("Can only do 1 preprocessor yet") llm_urls = "+".join(world_map.get_preprocessor_urls()) cmd = [ - "python", + sys.executable, "-m", "pipelinerl.entrypoints.run_preprocess", "--config-dir", From 71db4a883049dd01209314321854a84e9c615533 Mon Sep 17 00:00:00 2001 From: ehsk Date: Fri, 28 Nov 2025 15:42:20 +0000 Subject: [PATCH 07/18] add processor args to vllm --- conf/chartqa.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index 4b9c3875..95d5e23c 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -52,3 +52,4 @@ vllm_config: vllm_kwargs: max-num-seqs: 64 max-num-batched-tokens: 32768 + mm-processor-kwargs: '{"min_pixels": 784, "max_pixels": 1003520, "use_fast": true}' # 28*28 to 1280*28*28 From 14b017b2c6ebdebee00978ab83b07d2f29bb1e2b Mon Sep 17 00:00:00 2001 From: ehsk Date: Tue, 2 Dec 2025 20:52:43 +0000 Subject: [PATCH 08/18] epsilons for chartqa added --- conf/chartqa.yaml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index 95d5e23c..c839dbf5 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -8,7 +8,8 @@ finetune: seq_length: 8000 gradient_accumulation_passes: 512 seq_packing: false - freeze_vision_tower: true + epsilon_high: 4.0 + epsilon_low: 0.0 llm: parameters: @@ -25,13 +26,15 @@ actor: system_prompt: You are an expert at analyzing charts and graphs. Please examine the chart carefully and answer the question accurately. Remember to provide your final answer in a boxed format, like \\boxed{{your answer}}. task_template: |- Question: {question} - + Please analyze the chart step by step and put your final answer within \\boxed{{}}. llm_max_rollouts: 16 shared_memory_entry_size: 2000000000 + max_stream_size: 1000 preprocess: shared_memory_entry_size: 2000000000 + max_stream_size: 1000 environment: null From fe00beab5b83d35b75f7365369eec9b39a895a51 Mon Sep 17 00:00:00 2001 From: ehsk Date: Tue, 2 Dec 2025 20:57:30 +0000 Subject: [PATCH 09/18] max_stream_size added for redis to avoid OOM --- conf/base.yaml | 8 ++++++-- pipelinerl/actor.py | 2 +- pipelinerl/preprocess.py | 18 ++++++++++++++++-- pipelinerl/streams.py | 37 +++++++++++++++++++++++++++++-------- 4 files changed, 52 insertions(+), 13 deletions(-) diff --git a/conf/base.yaml b/conf/base.yaml index e3122f5a..b99b8c07 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -18,6 +18,8 @@ actor: result_queue_size: 64 throughput_window_size: 50 shared_memory_entry_size: 10000000 + # Maximum number of entries to retain in the actor data stream (Redis only for now) + max_stream_size: 1000000 environment: null preprocess: input: actor @@ -26,7 +28,7 @@ preprocess: chunk_n_groups: 2 # queue for loaded raw groups raw_queue_size: 8 - # queue for processed chunks of multiple groups + # queue for processed chunks of multiple groups input_queue_size: 32 # queue for ready chunks for multiple groups output_queue_size: 32 @@ -36,9 +38,11 @@ preprocess: ring_buffer_size: 128 # "virtual" sample queue per lead trainer max_ready_samples_per_lead: 64 - pop_old_data: ${..pop_old_data} + pop_old_data: ${..pop_old_data} shared_memory_entry_size: 100000000 log_every_n_samples: 128 + # Maximum number of entries to retain in the training data stream (Redis only for now) + max_stream_size: 1000000 llm: parameters: # changed diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 1c238ff9..1c0e5af3 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -438,7 +438,7 @@ def run(self, dataset: list[tuple[str, dict]]): logger.info(f"Start {'train' if self.is_training else 'test'} actor loop") with ( - write_to_streams(self.data_stream, "a") as data_stream_writer, + write_to_streams(self.data_stream, "a", max_stream_size=self.cfg.actor.max_stream_size) as data_stream_writer, write_to_streams(self.stats_stream, "a") as stats_writer, ): while True: diff --git a/pipelinerl/preprocess.py b/pipelinerl/preprocess.py index 65fcee47..1c5d666a 100644 --- a/pipelinerl/preprocess.py +++ b/pipelinerl/preprocess.py @@ -157,7 +157,20 @@ def preprocess_dataset( entry["step_index"] = entry["metadata"]["step_index"] if not isinstance(tokenizer.eos_token_id, int): raise ValueError(f"Tokenizer {tokenizer} does not have an eos_token_id") - dataset = populate_rl_data(dataset=dataset, eos_token_id=tokenizer.eos_token_id, config=rl_config) + try: + dataset = populate_rl_data(dataset=dataset, eos_token_id=tokenizer.eos_token_id, config=rl_config) + except Exception as e: + logger.exception( + "Error in populate_rl_data: {}".format({ + "Data": data, + "Dataset": dataset, + "Tokenizer eos_token_id": tokenizer.eos_token_id, + "RL config": rl_config, + "LLM": llm, + "Seq length": seq_length, + }), + ) + raise e return dataset @@ -450,7 +463,8 @@ def run_preprocessing_loop( # Per-trainer sample tracking (similar to finetune_loop.py) total_filtered_out = 0 # Track total filtered samples across all batches - with write_to_streams(output_stream) as data_writer, write_to_streams(stats_streams) as stats_writer: + max_stream_size = cfg.preprocess.max_stream_size + with write_to_streams(output_stream, max_stream_size=max_stream_size) as data_writer, write_to_streams(stats_streams) as stats_writer: with SharedMemoryManager() as smm: # Create shared memory queues without the manager parameter input_queue = SharedMemoryQueue(smm, cfg.preprocess.input_queue_size, cfg.preprocess.shared_memory_entry_size) diff --git a/pipelinerl/streams.py b/pipelinerl/streams.py index 632b760e..b4ac7851 100644 --- a/pipelinerl/streams.py +++ b/pipelinerl/streams.py @@ -110,7 +110,7 @@ def connect_to_redis(config: RedisConfig): logger.debug(f"Trying to connect to Redis server at {config.host}:{config.port}") client = redis.Redis(host=config.host, port=config.port) client.ping() - logger.info(f"Connected to Redis server") + logger.debug("Connected to Redis server") return client except (redis.exceptions.TimeoutError, redis.ConnectionError) as e: logger.warning(f"Waiting for Redis server ({type(e)}). Retrying in 5 seconds.") @@ -118,8 +118,15 @@ def connect_to_redis(config: RedisConfig): class RedisStreamWriter(StreamWriter): - def __init__(self, stream: SingleStreamSpec, mode: Literal["w", "a"] = "a"): + def __init__(self, stream: SingleStreamSpec, mode: Literal["w", "a"] = "a", max_stream_size: int = 1000000): + """ + Args: + stream: The stream specification + mode: Write mode - 'w' for write (new stream) or 'a' for append + max_stream_size: Maximum number of entries to retain in the stream (Redis only) + """ self.stream = stream + self.max_stream_size = max_stream_size assert isinstance(_backend, RedisConfig) self._stream_name = str(self.stream) self._redis = connect_to_redis(_backend) @@ -155,7 +162,7 @@ def write(self, data, partition: int | None = None): if isinstance(data, BaseModel): data = data.model_dump() data = pickle.dumps(data) - self._redis.xadd(self._stream_name, {"index": self._index, "data": data}, maxlen=1000000, approximate=True) + self._redis.xadd(self._stream_name, {"index": self._index, "data": data}, maxlen=self.max_stream_size, approximate=True) self._index += 1 @@ -195,7 +202,13 @@ def read(self): class RoundRobinRedisStreamWriter(StreamWriter): # TODO: share the connection across writers - def __init__(self, streams: StreamRangeSpec, mode: Literal["w", "a"] = "a"): + def __init__(self, streams: StreamRangeSpec, mode: Literal["w", "a"] = "a", max_stream_size: int = 1000000): + """ + Args: + streams: The stream range specification + mode: Write mode - 'w' for write (new stream) or 'a' for append + max_stream_size: Maximum number of entries to retain in the stream (Redis only) + """ self.streams = streams self._next_stream = 0 self._writers = [ @@ -207,6 +220,7 @@ def __init__(self, streams: StreamRangeSpec, mode: Literal["w", "a"] = "a"): partition=i, ), mode=mode, + max_stream_size=max_stream_size, ) for i in range(*self.streams.partition_range) ] @@ -400,16 +414,23 @@ def read_stream(stream: SingleStreamSpec) -> StreamReader: assert False -def write_to_streams(streams: StreamSpec, mode: Literal["w", "a"] = "a") -> StreamWriter: - """Append to the end of the stream.""" +def write_to_streams(streams: StreamSpec, mode: Literal["w", "a"] = "a", max_stream_size: int = 1000000) -> StreamWriter: + """ + Append to the end of the stream. + + Args: + streams: The stream specification + mode: Write mode - 'w' for write (new stream) or 'a' for append + max_stream_size: Maximum number of entries to retain in the stream (Redis only) + """ raise_if_backend_not_set() if not isinstance(streams, (SingleStreamSpec, StreamRangeSpec)): raise ValueError(f"Invalid stream spec: {streams}") if isinstance(_backend, RedisConfig): if isinstance(streams, SingleStreamSpec): - return RedisStreamWriter(streams, mode) + return RedisStreamWriter(streams, mode, max_stream_size) elif isinstance(streams, StreamRangeSpec): - return RoundRobinRedisStreamWriter(streams, mode) + return RoundRobinRedisStreamWriter(streams, mode, max_stream_size) else: assert False elif _backend == "files": From 84c43494c4487dcf190965ee6c4d521f797c9889 Mon Sep 17 00:00:00 2001 From: ehsk Date: Fri, 5 Dec 2025 03:17:15 +0000 Subject: [PATCH 10/18] mini-batch size can be greater than 1 --- pipelinerl/finetune/data.py | 46 ++++++++++++++++++++++++++++--------- pipelinerl/launch.py | 3 +-- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/pipelinerl/finetune/data.py b/pipelinerl/finetune/data.py index 4e395e3b..e12b2821 100644 --- a/pipelinerl/finetune/data.py +++ b/pipelinerl/finetune/data.py @@ -172,17 +172,41 @@ def collate( if seq_length % pad_to_multiple_of: seq_length += pad_to_multiple_of - (seq_length % pad_to_multiple_of) result = {} - - # Visual feature fields that should be stacked, not padded - if "visual_features" in example_dict and isinstance(example_dict["visual_features"][0], dict): - for k, seq_list in example_dict["visual_features"][0].items(): - if k == "image_grid_thw": - # image_grid_thw should remain as a list - result[k] = seq_list - else: - # Other visual fields like pixel_values can be stacked as tensors - valid_tensors = [torch.tensor(seq) for seq in seq_list] - result[k] = torch.stack(valid_tensors) + + # Handle visual features with dynamic batching + if "visual_features" in example_dict: + visual_features_list = example_dict["visual_features"] + + if visual_features_list and visual_features_list[0] is not None: + first_vf = visual_features_list[0] + + for key in first_vf.keys(): + if key == "image_grid_thw": + # Concatenate all image_grid_thw arrays into a single tensor + # Each sample has shape (num_images, 3), concatenate along image dimension + all_grids = [torch.as_tensor(vf[key]) for vf in visual_features_list] + result[key] = torch.cat(all_grids, dim=0) + else: + # Convert to torch tensors (zero-copy for numpy arrays) + all_tensors = [torch.as_tensor(vf[key]) for vf in visual_features_list] + + # Find max number of images in this batch + max_num_images = max(t.shape[0] for t in all_tensors) + + # Get shape of single image: (C, H, W) + single_shape = all_tensors[0].shape[1:] + dtype = all_tensors[0].dtype + + # Pre-allocate batch tensor: (batch_size, max_num_images, C, H, W) + batch_shape = (len(all_tensors), max_num_images) + single_shape + batched = torch.zeros(batch_shape, dtype=dtype) + + # Fill in actual data (padding is already zeros) + for i, tensor in enumerate(all_tensors): + num_images = tensor.shape[0] + batched[i, :num_images] = tensor + + result[key] = batched for k, seq_list in example_dict.items(): if k == "model_version": diff --git a/pipelinerl/launch.py b/pipelinerl/launch.py index a7973856..19c05999 100644 --- a/pipelinerl/launch.py +++ b/pipelinerl/launch.py @@ -60,8 +60,6 @@ def validate_config(cfg: DictConfig): raise ValueError("Only Qwen2.5-VL models are supported for vision language modeling") if cfg.finetune.seq_packing: raise ValueError("Vision language models cannot use sequence packing (seq_packing must be false)") - if cfg.finetune.train_batch_size > 1: - raise ValueError("Vision language models cannot use batch size > 1 (train_batch_size must be 1)") if cfg.finetune.seq_parallel > 1: if not cfg.finetune.seq_packing: @@ -494,6 +492,7 @@ def debug_link_streams(cfg: DictConfig, topics: list[str]): if not cfg.debug.streams_from: raise ValueError("Need to specify streams_from for debug mode") stream_dir = Path(cfg.output_dir) / "streams" + stream_dir.mkdir(parents=True, exist_ok=True) for topic in topics: source_topic_dir = Path(cfg.debug.streams_from) / "streams" / topic target_topic_dir = stream_dir / topic From 3b5e2a06e77cb6e956c3dd659c856e6d03c0ce81 Mon Sep 17 00:00:00 2001 From: ehsk Date: Thu, 18 Dec 2025 16:51:17 +0000 Subject: [PATCH 11/18] a fix for configs in VLMs like Qwen3-VL or Apriel where there's a text_config inside config that creates problems for deepspeed integration --- pipelinerl/finetune_loop.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 1cc9d2b1..282cae90 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -353,6 +353,24 @@ def run_finetuning_loop( logger.info(f"Model loaded in dtype {model.dtype}") dt = log_time(dt, time_stats, "finetune/model_load") + # Fix for multimodal models (e.g., Apriel, Qwen3-VL) with Accelerate+DeepSpeed + # Accelerate's _prepare_deepspeed() doesn't check text_config.hidden_size + if not hasattr(model.config, "hidden_size") and not hasattr(model.config, "hidden_sizes"): + if hasattr(model.config, "text_config"): + hidden_size = None + if hasattr(model.config.text_config, "hidden_size"): + hidden_size = model.config.text_config.hidden_size + elif hasattr(model.config.text_config, "hidden_sizes"): + hidden_size = max(model.config.text_config.hidden_sizes) + + if hidden_size is not None: + if get_accelerator().is_main_process: + logger.info( + f"Detected multimodal model with text_config.hidden_size={hidden_size}. " + f"Setting config.hidden_size to enable DeepSpeed auto-configuration." + ) + model.config.hidden_size = hidden_size + data_stream = SingleStreamSpec( exp_path=exp_root_dir, topic=args.input, From c3a30398c4dadfcd29bf14fb258f55a5b853e019 Mon Sep 17 00:00:00 2001 From: ehsk Date: Mon, 26 Jan 2026 17:52:45 +0000 Subject: [PATCH 12/18] refactorings and improvements --- conf/base.yaml | 3 + conf/chartqa.yaml | 13 ++- pipelinerl/actor.py | 2 + pipelinerl/async_llm.py | 11 +-- pipelinerl/finetune/data.py | 35 +------- pipelinerl/finetune/rl/__init__.py | 11 ++- pipelinerl/finetune/types.py | 24 +++--- pipelinerl/launch.py | 2 - pipelinerl/llm.py | 1 + pipelinerl/preprocess.py | 1 + pipelinerl/processor_factory.py | 19 ----- pipelinerl/vision_processor_utils.py | 122 +++++++++++++++++++++++++++ 12 files changed, 169 insertions(+), 75 deletions(-) delete mode 100644 pipelinerl/processor_factory.py create mode 100644 pipelinerl/vision_processor_utils.py diff --git a/conf/base.yaml b/conf/base.yaml index b99b8c07..c80eb08a 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -91,6 +91,9 @@ eval_every_n_versions: 78000 # changed model_path: Qwen/Qwen2.5-7B +# Processor configuration for vision-language models (multimodal) +mm_processor_kwargs: {} + # will use default based on the chosen backend accelerate_config: null use_deepspeed: true diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index c839dbf5..2ed7b4cb 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -8,8 +8,9 @@ finetune: seq_length: 8000 gradient_accumulation_passes: 512 seq_packing: false - epsilon_high: 4.0 - epsilon_low: 0.0 + rl: + epsilon_high: 4.0 + epsilon_low: 0.0 llm: parameters: @@ -49,6 +50,14 @@ test_dataset_names: # Use vision-language model for multimodal support model_path: Qwen/Qwen2.5-VL-3B-Instruct +eval_every_n_versions: 12500 + +# Processor configuration for vision-language models (shared between training and inference) +mm_processor_kwargs: + min_pixels: 784 # 28*28 + max_pixels: 1003520 # 1280*28*28 + use_fast: true + # Override vLLM config for multimodal support vllm_config: use_v1: true diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 1c0e5af3..8c321b82 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -622,6 +622,7 @@ def run_actor_loop(cfg: DictConfig): tokenizer_name=str(actor_model_path), parameters=cfg.llm.parameters, collect_logprobs=True, + mm_processor_kwargs=cfg.get("mm_processor_kwargs", {}), ) for url in llm_urls ] @@ -632,6 +633,7 @@ def run_actor_loop(cfg: DictConfig): tokenizer_name=str(actor_model_path), parameters=cfg.test_llm.parameters, collect_logprobs=True, + mm_processor_kwargs=cfg.get("mm_processor_kwargs", {}), ) for url in llm_urls ] diff --git a/pipelinerl/async_llm.py b/pipelinerl/async_llm.py index 4e78ebf9..a8b45b26 100644 --- a/pipelinerl/async_llm.py +++ b/pipelinerl/async_llm.py @@ -4,12 +4,13 @@ import aiohttp import numpy as np +import torch from PIL import Image from pipelinerl.llm import LLMCall, LLMOutput, Prompt, TokenLogprob, TrainableLLM from pipelinerl.finetune.data import MASKED_TOKEN_ID from pipelinerl.rollouts import TrainingText -from pipelinerl.processor_factory import get_processor +from pipelinerl.vision_processor_utils import get_mm_processor from omegaconf import DictConfig, ListConfig, OmegaConf logger = logging.getLogger(__name__) @@ -157,7 +158,7 @@ def make_training_text(llm: TrainableLLM, llm_call: LLMCall) -> TrainingText: if use_processor: # Use processor for vision-language models - processor = get_processor(llm.model_name) + processor = get_mm_processor(llm.model_name, mm_processor_kwargs=llm.mm_processor_kwargs) try: # Apply chat template using processor for proper image token handling @@ -189,11 +190,11 @@ def make_training_text(llm: TrainableLLM, llm_call: LLMCall) -> TrainingText: processed = processor( text=[prompt_text], images=images, padding=True, return_tensors=None ) + # Convert PyTorch tensors to numpy arrays visual_features = { - key: value + key: value.cpu().numpy() if torch.is_tensor(value) else value for key, value in processed.items() - if isinstance(value, np.ndarray) - and key not in ["input_ids", "attention_mask"] + if key not in ["input_ids", "attention_mask"] } except Exception as e: diff --git a/pipelinerl/finetune/data.py b/pipelinerl/finetune/data.py index e12b2821..833f6490 100644 --- a/pipelinerl/finetune/data.py +++ b/pipelinerl/finetune/data.py @@ -15,6 +15,7 @@ from pipelinerl.finetune.utils import create_sentinel_example from pipelinerl.rollouts import TrainingText +from pipelinerl.vision_processor_utils import collate_visual_features from .context import get_accelerator, logger from .rl import RL_DATA_COLUMNS, prepare_rl_fields @@ -176,37 +177,9 @@ def collate( # Handle visual features with dynamic batching if "visual_features" in example_dict: visual_features_list = example_dict["visual_features"] - - if visual_features_list and visual_features_list[0] is not None: - first_vf = visual_features_list[0] - - for key in first_vf.keys(): - if key == "image_grid_thw": - # Concatenate all image_grid_thw arrays into a single tensor - # Each sample has shape (num_images, 3), concatenate along image dimension - all_grids = [torch.as_tensor(vf[key]) for vf in visual_features_list] - result[key] = torch.cat(all_grids, dim=0) - else: - # Convert to torch tensors (zero-copy for numpy arrays) - all_tensors = [torch.as_tensor(vf[key]) for vf in visual_features_list] - - # Find max number of images in this batch - max_num_images = max(t.shape[0] for t in all_tensors) - - # Get shape of single image: (C, H, W) - single_shape = all_tensors[0].shape[1:] - dtype = all_tensors[0].dtype - - # Pre-allocate batch tensor: (batch_size, max_num_images, C, H, W) - batch_shape = (len(all_tensors), max_num_images) + single_shape - batched = torch.zeros(batch_shape, dtype=dtype) - - # Fill in actual data (padding is already zeros) - for i, tensor in enumerate(all_tensors): - num_images = tensor.shape[0] - batched[i, :num_images] = tensor - - result[key] = batched + batched_visual_features = collate_visual_features(visual_features_list) + if batched_visual_features: + result["visual_features"] = batched_visual_features for k, seq_list in example_dict.items(): if k == "model_version": diff --git a/pipelinerl/finetune/rl/__init__.py b/pipelinerl/finetune/rl/__init__.py index d33e1961..adcba384 100644 --- a/pipelinerl/finetune/rl/__init__.py +++ b/pipelinerl/finetune/rl/__init__.py @@ -190,13 +190,12 @@ def rl_step( } if batch.is_packed: model_inputs["position_ids"] = batch.position_ids - + # Add visual features if present (for multimodal models) - if hasattr(batch, 'pixel_values') and batch.pixel_values is not None: - model_inputs["pixel_values"] = batch.pixel_values - if hasattr(batch, 'image_grid_thw') and batch.image_grid_thw is not None: - model_inputs["image_grid_thw"] = batch.image_grid_thw #torch.tensor(.reshape((1, 3)) - + # Unpack all visual features from the dict (e.g., pixel_values, image_grid_thw, image_sizes) + if hasattr(batch, 'visual_features') and batch.visual_features is not None: + model_inputs.update(batch.visual_features) + outputs = model(**model_inputs) # compute log probs and entropy diff --git a/pipelinerl/finetune/types.py b/pipelinerl/finetune/types.py index a3c16f2e..eda751f7 100644 --- a/pipelinerl/finetune/types.py +++ b/pipelinerl/finetune/types.py @@ -70,11 +70,11 @@ class PipelineBatchEncoding(BaseModel): is_packed: bool = False seq_boundaries: torch.IntTensor | None = None # Required when seq_packing=True - # Visual feature fields (optional, for multimodal models) - pixel_values: torch.FloatTensor | None = None - image_grid_thw: torch.LongTensor | None = None + # Visual features (optional, for multimodal models) + # Dict containing model-specific visual features (e.g., pixel_values, image_grid_thw, image_sizes) + visual_features: dict[str, torch.Tensor] | None = None - @field_validator('input_ids', 'attention_mask', 'labels', 'position_ids', 'image_grid_thw', 'segment_ids', mode='before') + @field_validator('input_ids', 'attention_mask', 'labels', 'position_ids', 'segment_ids', mode='before') @classmethod def convert_to_long_tensor(cls, v: List[int] | torch.Tensor | None) -> torch.LongTensor | None: """Handle initialization of long tensors from different types.""" @@ -95,9 +95,8 @@ def convert_to_int_tensor(cls, v: List[int] | torch.Tensor | None) -> torch.IntT if isinstance(v, torch.Tensor): return v.int() # type: ignore return torch.tensor(v, dtype=torch.int) - - # TODO: am i needed? - @field_validator('rewards', 'advantages', 'ref_logprobs', 'old_logprobs', 'group_tokens', 'num_labels', 'overflow', 'pixel_values', mode='before') + + @field_validator('rewards', 'advantages', 'ref_logprobs', 'old_logprobs', 'group_tokens', 'num_labels', 'overflow', mode='before') @classmethod def convert_to_float_tensor(cls, v: List[float] | torch.Tensor | None) -> torch.FloatTensor | None: """Handle initialization of float tensors from different types.""" @@ -111,10 +110,16 @@ def convert_to_float_tensor(cls, v: List[float] | torch.Tensor | None) -> torch. def to_device(self, device: Union[str, torch.device]) -> 'PipelineBatchEncoding': """Move all tensors to the specified device and return updated instance.""" - for field_name in self.model_fields: + for field_name in type(self).model_fields: field_value = getattr(self, field_name) if isinstance(field_value, torch.Tensor): setattr(self, field_name, field_value.to(device)) + elif isinstance(field_value, dict): + setattr( + self, + field_name, + {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in field_value.items()} + ) return self @classmethod @@ -173,8 +178,7 @@ def make_slices(self, num_slices: int) -> list['PipelineBatchEncoding']: "is_packed": self.is_packed, "padding": self.padding, "seq_boundaries": self.seq_boundaries, - "pixel_values": self.pixel_values, - "image_grid_thw": self.image_grid_thw + "visual_features": self.visual_features } slices.append(PipelineBatchEncoding(**result)) return slices diff --git a/pipelinerl/launch.py b/pipelinerl/launch.py index 19c05999..a0df771b 100644 --- a/pipelinerl/launch.py +++ b/pipelinerl/launch.py @@ -56,8 +56,6 @@ def validate_config(cfg: DictConfig): # Check for vision language model constraints if cfg.finetune.model_class == "vision2seq-language-modeling": - if "Qwen2.5-VL" not in cfg.model_path: - raise ValueError("Only Qwen2.5-VL models are supported for vision language modeling") if cfg.finetune.seq_packing: raise ValueError("Vision language models cannot use sequence packing (seq_packing must be false)") diff --git a/pipelinerl/llm.py b/pipelinerl/llm.py index cc099c15..9d76a848 100644 --- a/pipelinerl/llm.py +++ b/pipelinerl/llm.py @@ -403,6 +403,7 @@ class TrainableLLM(LLM): max_parallel_requests: int = 32 max_retries: int = 5 base_delay: float = 0.5 + mm_processor_kwargs: dict = Field(default_factory=dict) _semaphore: asyncio.Semaphore def model_post_init(self, __context): diff --git a/pipelinerl/preprocess.py b/pipelinerl/preprocess.py index 1c5d666a..3d402049 100644 --- a/pipelinerl/preprocess.py +++ b/pipelinerl/preprocess.py @@ -420,6 +420,7 @@ def run_preprocessing_loop( model_name=cfg.finetune.config_name, tokenizer_name=cfg.finetune.config_name, parameters=cfg.llm.parameters, + mm_processor_kwargs=cfg.get("mm_processor_kwargs", {}), ) for url in llm_urls ] diff --git a/pipelinerl/processor_factory.py b/pipelinerl/processor_factory.py deleted file mode 100644 index 06f0fc2b..00000000 --- a/pipelinerl/processor_factory.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Simple cache for AutoProcessor instances.""" -from typing import Dict -from transformers import AutoProcessor -import logging -logger = logging.getLogger(__name__) - -_processors: Dict[str, AutoProcessor] = {} - -def get_processor(model_name: str) -> AutoProcessor: - """Get or create an AutoProcessor for the given model.""" - if model_name not in _processors: - logger.info(f"Loading processor for model: {model_name}") - #TODO: should be args - _processors[model_name] = AutoProcessor.from_pretrained(model_name, min_pixels=28*28, max_pixels=1280*28*28) - return _processors[model_name] - -def clear_cache() -> None: - """Clear all cached processors.""" - _processors.clear() \ No newline at end of file diff --git a/pipelinerl/vision_processor_utils.py b/pipelinerl/vision_processor_utils.py new file mode 100644 index 00000000..ee2e8635 --- /dev/null +++ b/pipelinerl/vision_processor_utils.py @@ -0,0 +1,122 @@ +""" +Vision processor utilities for multimodal models. + +This module provides processor caching and management for vision-language models. + +Supported models: +- Qwen2.5-VL: Uses image_grid_thw (B, 3) and flattened pixel_values +- Pixtral/Apriel: Uses image_sizes (B, 2) and standard pixel_values (B, C, H, W) +""" +import logging +from typing import Dict +import torch +from transformers import AutoProcessor + +logger = logging.getLogger(__name__) + +# Processor cache +_processors: Dict[str, AutoProcessor] = {} + + +def get_mm_processor(model_name: str, mm_processor_kwargs: dict | None = None) -> AutoProcessor: + """ + Get or create an AutoProcessor for multimodal models. + + Args: + model_name: HuggingFace model identifier + mm_processor_kwargs: Optional kwargs to pass to AutoProcessor.from_pretrained() + + Returns: + AutoProcessor instance + """ + if model_name not in _processors: + if mm_processor_kwargs is None: + mm_processor_kwargs = {} + + logger.info(f"Loading processor for model: {model_name} with kwargs: {mm_processor_kwargs}") + _processors[model_name] = AutoProcessor.from_pretrained( + model_name, **mm_processor_kwargs + ) + return _processors[model_name] + + +def clear_cache() -> None: + """Clear all cached processors.""" + _processors.clear() + + +def collate_visual_features(visual_features_list: list[dict]) -> dict[str, torch.Tensor]: + """ + Collate visual features from multiple samples into batched tensors. + + Handles different formats: + - Metadata (image_grid_thw, image_sizes): Concatenate along image dimension + - Qwen pixel_values (2D): Concatenate flattened features + - Pixtral pixel_values (4D): Pad to max_num_images + - Other features: Pad to max_num_images + + Args: + visual_features_list: List of visual feature dicts from individual samples + + Returns: + Dict mapping feature names to batched tensors + """ + if not visual_features_list or visual_features_list[0] is None: + return {} + + first_vf = visual_features_list[0] + batched_visual_features = {} + + for key in first_vf.keys(): + if key in ("image_grid_thw", "image_sizes"): + # Concatenate metadata arrays (image_grid_thw or image_sizes) + # Each sample has shape (num_images, 2 or 3), concatenate along image dimension + all_metadata = [torch.as_tensor(vf[key]) for vf in visual_features_list] + batched_visual_features[key] = torch.cat(all_metadata, dim=0) + + elif key == "pixel_values": + # Handle pixel_values - format differs by model: + # - Qwen: (total_pixels, hidden_dim) - flattened, concatenate along pixel dimension + # - Pixtral: (num_images, C, H, W) - standard, needs padding to max_num_images + all_tensors = [torch.as_tensor(vf[key]) for vf in visual_features_list] + + # Check if this is flattened format (2D) or image format (4D) + if all_tensors[0].ndim == 2: + # Qwen format: (total_pixels, hidden_dim) - just concatenate + batched_visual_features[key] = torch.cat(all_tensors, dim=0) + elif all_tensors[0].ndim == 4: + # Pixtral format: (num_images, C, H, W) - pad to max_num_images + max_num_images = max(t.shape[0] for t in all_tensors) + single_shape = all_tensors[0].shape[1:] # (C, H, W) + dtype = all_tensors[0].dtype + + # Pre-allocate: (batch_size, max_num_images, C, H, W) + batch_shape = (len(all_tensors), max_num_images) + single_shape + batched = torch.zeros(batch_shape, dtype=dtype) + + # Fill in actual data + for i, tensor in enumerate(all_tensors): + num_images = tensor.shape[0] + batched[i, :num_images] = tensor + + batched_visual_features[key] = batched + else: + raise ValueError(f"Unexpected pixel_values shape: {all_tensors[0].shape}") + + else: + # Other visual features - assume they need padding like Pixtral pixel_values + all_tensors = [torch.as_tensor(vf[key]) for vf in visual_features_list] + max_num_images = max(t.shape[0] for t in all_tensors) + single_shape = all_tensors[0].shape[1:] + dtype = all_tensors[0].dtype + + batch_shape = (len(all_tensors), max_num_images) + single_shape + batched = torch.zeros(batch_shape, dtype=dtype) + + for i, tensor in enumerate(all_tensors): + num_images = tensor.shape[0] + batched[i, :num_images] = tensor + + batched_visual_features[key] = batched + + return batched_visual_features From a10ccd44a78639672691d3cefb2de00e173497a5 Mon Sep 17 00:00:00 2001 From: ehsk Date: Fri, 8 May 2026 20:11:31 +0000 Subject: [PATCH 13/18] mm_processor_kwargs moved inside llm in configs --- conf/base.yaml | 5 +---- conf/chartqa.yaml | 15 ++++++++------- pipelinerl/actor.py | 4 ++-- pipelinerl/llm.py | 4 ++-- pipelinerl/preprocess.py | 2 +- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/conf/base.yaml b/conf/base.yaml index af17ba45..b973b840 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -54,7 +54,7 @@ llm: # changed temperature: 1.0 test_llm: - parameters: + parameters: max_tokens: 8192 temperature: 1.0 top_p: 0.95 @@ -94,9 +94,6 @@ eval_every_n_versions: 78000 model_path: Qwen/Qwen2.5-7B -# Processor configuration for vision-language models (multimodal) -mm_processor_kwargs: {} - # will use default based on the chosen backend accelerate_config: null use_deepspeed: true diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index 86bc5c3a..97bae69f 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -16,11 +16,17 @@ llm: parameters: max_tokens: 2048 temperature: 0.7 + # Processor configuration for vision-language models (shared between training and inference) + mm_processor_kwargs: + min_pixels: 784 # 28*28 + max_pixels: 1003520 # 1280*28*28 + use_fast: true test_llm: parameters: max_tokens: 2048 temperature: 0.7 + mm_processor_kwargs: ${llm.mm_processor_kwargs} actor: rollout_policy: pipelinerl.domains.chartqa.generate_chartqa_rollout @@ -48,16 +54,11 @@ test_dataset_names: - chartqa_test # Use vision-language model for multimodal support -model_path: Qwen/Qwen2.5-VL-3B-Instruct +model_path: Qwen/Qwen3-VL-4B-Instruct +# model_path: Qwen/Qwen2.5-VL-3B-Instruct eval_every_n_versions: 12500 -# Processor configuration for vision-language models (shared between training and inference) -mm_processor_kwargs: - min_pixels: 784 # 28*28 - max_pixels: 1003520 # 1280*28*28 - use_fast: true - # Override vLLM config for multimodal support vllm_config: vllm_kwargs: diff --git a/pipelinerl/actor.py b/pipelinerl/actor.py index 21a85837..259b1a1e 100644 --- a/pipelinerl/actor.py +++ b/pipelinerl/actor.py @@ -825,7 +825,7 @@ def run_actor_loop(cfg: DictConfig): parameters=cfg.llm.parameters, collect_logprobs=True, chat_template_kwargs=cfg.llm.get("chat_template_kwargs", {}), - mm_processor_kwargs=cfg.get("mm_processor_kwargs", {}), + mm_processor_kwargs=cfg.llm.get("mm_processor_kwargs", {}), ) for url in llm_urls ] @@ -837,7 +837,7 @@ def run_actor_loop(cfg: DictConfig): parameters=cfg.test_llm.parameters, collect_logprobs=True, chat_template_kwargs=cfg.test_llm.get("chat_template_kwargs", {}), - mm_processor_kwargs=cfg.get("mm_processor_kwargs", {}), + mm_processor_kwargs=cfg.test_llm.get("mm_processor_kwargs", {}), ) for url in llm_urls ] diff --git a/pipelinerl/llm.py b/pipelinerl/llm.py index c7e24732..ab35608d 100644 --- a/pipelinerl/llm.py +++ b/pipelinerl/llm.py @@ -18,7 +18,7 @@ import requests import transformers from omegaconf import DictConfig, OmegaConf -from pydantic import BaseModel, Field, TypeAdapter +from pydantic import BaseModel, Field, PrivateAttr, TypeAdapter from tenacity import retry, stop_after_attempt, wait_exponential from pipelinerl.rollouts import TrainingText @@ -372,7 +372,7 @@ class TrainableLLM(LLM): base_delay: float = 0.5 chat_template_kwargs: dict = {} mm_processor_kwargs: dict = Field(default_factory=dict) - _semaphore: asyncio.Semaphore + _semaphore: asyncio.Semaphore = PrivateAttr() def model_post_init(self, __context): super().model_post_init(__context) diff --git a/pipelinerl/preprocess.py b/pipelinerl/preprocess.py index de0cd065..a7c39d31 100644 --- a/pipelinerl/preprocess.py +++ b/pipelinerl/preprocess.py @@ -419,7 +419,7 @@ def run_preprocessing_loop( model_name=cfg.finetune.config_name, tokenizer_name=cfg.finetune.config_name, parameters=cfg.llm.parameters, - mm_processor_kwargs=cfg.get("mm_processor_kwargs", {}), + mm_processor_kwargs=cfg.llm.get("mm_processor_kwargs", {}), ) for url in llm_urls ] From e3e2ecf1fc006eaafc533c977daca646d941856d Mon Sep 17 00:00:00 2001 From: ehsk Date: Mon, 11 May 2026 18:54:01 +0000 Subject: [PATCH 14/18] improved error message for samples discarded while reading from redis --- pipelinerl/streams.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pipelinerl/streams.py b/pipelinerl/streams.py index b4ac7851..7e62b32f 100644 --- a/pipelinerl/streams.py +++ b/pipelinerl/streams.py @@ -192,8 +192,13 @@ def read(self): assert stream_name.decode("utf-8") == self._stream_name assert isinstance(result, list) and len(result) == 1 entry_id, entry = result[0] - if int(entry[b"index"].decode("utf-8")) != self._index: - raise ValueError(f"Index mismatch: expected {self._index}, got {entry['index']}") + entry_index = int(entry[b"index"].decode("utf-8")) + if entry_index != self._index: + raise ValueError( + f"Index mismatch on stream {self._stream_name}: expected {self._index}, got {entry_index} " + f"(gap of {entry_index - self._index} entries — likely Redis trimmed unread entries; " + f"raise actor.max_stream_size or set max_lag to throttle the producer)" + ) self._last_id = entry_id self._index += 1 yield pickle.loads(entry[b"data"]) From 8299e4aa9d84d73d1838bfde4dab91f76fc0d1b9 Mon Sep 17 00:00:00 2001 From: ehsk Date: Mon, 11 May 2026 18:54:57 +0000 Subject: [PATCH 15/18] AutoModel for vision models updated to match latest transformers --- pipelinerl/finetune/checkpoints.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelinerl/finetune/checkpoints.py b/pipelinerl/finetune/checkpoints.py index 5f49a867..0f8e0d84 100644 --- a/pipelinerl/finetune/checkpoints.py +++ b/pipelinerl/finetune/checkpoints.py @@ -13,7 +13,7 @@ from transformers import ( AutoModelForCausalLM, AutoModelForSeq2SeqLM, - AutoModelForVision2Seq, + AutoModelForImageTextToText, AutoTokenizer, AutoProcessor, BitsAndBytesConfig, @@ -117,7 +117,7 @@ def get_auto_model_class( case "seq2seq-language-modeling": return AutoModelForSeq2SeqLM case "vision2seq-language-modeling": - return AutoModelForVision2Seq + return AutoModelForImageTextToText case _: raise ValueError(f"Unsupported model class: {model_class}") From 5ea03d1545f17107d3d763f476526148f0091be0 Mon Sep 17 00:00:00 2001 From: ehsk Date: Mon, 11 May 2026 18:56:21 +0000 Subject: [PATCH 16/18] use queue.Queue instead of multiprocessing.Queue for raw_chunk_queue --- conf/chartqa.yaml | 5 +++-- pipelinerl/preprocess.py | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index 97bae69f..6fc11eb4 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -37,11 +37,11 @@ actor: Please analyze the chart step by step and put your final answer within \\boxed{{}}. llm_max_rollouts: 16 shared_memory_entry_size: 2000000000 - max_stream_size: 1000 + max_stream_size: 10000 preprocess: shared_memory_entry_size: 2000000000 - max_stream_size: 1000 + max_stream_size: 10000 environment: null @@ -64,4 +64,5 @@ vllm_config: vllm_kwargs: max-num-seqs: 64 max-num-batched-tokens: 32768 + max_model_len: 8000 mm-processor-kwargs: '{"min_pixels": 784, "max_pixels": 1003520, "use_fast": true}' # 28*28 to 1280*28*28 diff --git a/pipelinerl/preprocess.py b/pipelinerl/preprocess.py index a7c39d31..177887b9 100644 --- a/pipelinerl/preprocess.py +++ b/pipelinerl/preprocess.py @@ -8,10 +8,10 @@ import threading import time from functools import partial -from multiprocessing import Process, Queue +from multiprocessing import Process from multiprocessing.managers import SharedMemoryManager from pathlib import Path -from queue import Empty +from queue import Empty, Queue from typing import List import datasets @@ -150,7 +150,7 @@ def preprocess_dataset( entry = dict(data[i]) for k, v in preprocess(data[i]).items(): entry[k] = v - dataset.append(entry) + dataset.append(entry) for entry in dataset: entry["model_version"] = entry["metadata"]["model_version"] entry["rollout_index"] = entry["metadata"]["rollout_index"] @@ -510,7 +510,7 @@ def run_preprocessing_loop( raw_chunk = raw_chunk_queue.get(timeout=0.001) if isinstance(raw_chunk, Exception): raise raw_chunk - + # Put chunk in the input queue for workers input_queue.put(raw_chunk) submitted_chunks += 1 From d185900183f8d55ac06eb4ebcb94f28bbee56d32 Mon Sep 17 00:00:00 2001 From: ehsk Date: Mon, 11 May 2026 19:40:35 +0000 Subject: [PATCH 17/18] stream size reduced and processor args updated for Qwen3-VL --- conf/chartqa.yaml | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/conf/chartqa.yaml b/conf/chartqa.yaml index 6fc11eb4..f2d02ead 100644 --- a/conf/chartqa.yaml +++ b/conf/chartqa.yaml @@ -17,9 +17,11 @@ llm: max_tokens: 2048 temperature: 0.7 # Processor configuration for vision-language models (shared between training and inference) + # Defaults are for Qwen3-VL (patch=16, merge=2 -> effective 32x32 per visual token). + # For Qwen2.5-VL (effective 28x28 per visual token), use: min_pixels=784, max_pixels=1003520. mm_processor_kwargs: - min_pixels: 784 # 28*28 - max_pixels: 1003520 # 1280*28*28 + min_pixels: 1024 # 32*32 + max_pixels: 1310720 # 1280*32*32 use_fast: true test_llm: @@ -37,11 +39,11 @@ actor: Please analyze the chart step by step and put your final answer within \\boxed{{}}. llm_max_rollouts: 16 shared_memory_entry_size: 2000000000 - max_stream_size: 10000 + max_stream_size: 1000 preprocess: shared_memory_entry_size: 2000000000 - max_stream_size: 10000 + max_stream_size: 1000 environment: null @@ -65,4 +67,5 @@ vllm_config: max-num-seqs: 64 max-num-batched-tokens: 32768 max_model_len: 8000 - mm-processor-kwargs: '{"min_pixels": 784, "max_pixels": 1003520, "use_fast": true}' # 28*28 to 1280*28*28 + # Qwen3-VL defaults (effective 32x32 per visual token); for Qwen2.5-VL use min_pixels=784, max_pixels=1003520 + mm-processor-kwargs: '{"min_pixels": 1024, "max_pixels": 1310720, "use_fast": true}' # 32*32 to 1280*32*32 From acfc3fb1fe03d255c797100ce3796bae1184665a Mon Sep 17 00:00:00 2001 From: ehsk Date: Mon, 11 May 2026 20:07:49 +0000 Subject: [PATCH 18/18] new task, visual math, added --- conf/mathv.yaml | 71 +++++++++++ pipelinerl/domains/mathv/README.md | 12 ++ pipelinerl/domains/mathv/__init__.py | 6 + pipelinerl/domains/mathv/evaluation.py | 56 +++++++++ pipelinerl/domains/mathv/load_datasets.py | 117 ++++++++++++++++++ pipelinerl/domains/mathv/mathv.py | 143 ++++++++++++++++++++++ 6 files changed, 405 insertions(+) create mode 100644 conf/mathv.yaml create mode 100644 pipelinerl/domains/mathv/README.md create mode 100644 pipelinerl/domains/mathv/__init__.py create mode 100644 pipelinerl/domains/mathv/evaluation.py create mode 100644 pipelinerl/domains/mathv/load_datasets.py create mode 100644 pipelinerl/domains/mathv/mathv.py diff --git a/conf/mathv.yaml b/conf/mathv.yaml new file mode 100644 index 00000000..0e0c0c41 --- /dev/null +++ b/conf/mathv.yaml @@ -0,0 +1,71 @@ +defaults: + - base + - override streams: redis + - _self_ + +finetune: + model_class: vision2seq-language-modeling + seq_length: 12000 + gradient_accumulation_passes: 512 + seq_packing: false + rl: + epsilon_high: 4.0 + epsilon_low: 0.0 + +llm: + parameters: + max_tokens: 4096 + temperature: 0.7 + # Processor configuration for vision-language models (shared between training and inference) + # Defaults are for Qwen3-VL (patch=16, merge=2 -> effective 32x32 per visual token). + # For Qwen2.5-VL (effective 28x28 per visual token), use: min_pixels=784, max_pixels=1003520. + mm_processor_kwargs: + min_pixels: 1024 # 32*32 + max_pixels: 1310720 # 1280*32*32 + use_fast: true + +test_llm: + parameters: + max_tokens: 4096 + temperature: 0.7 + mm_processor_kwargs: ${llm.mm_processor_kwargs} + +actor: + rollout_policy: pipelinerl.domains.mathv.generate_mathv_rollout + system_prompt: You are an expert at solving math problems involving geometric figures, charts, and diagrams. Examine the image carefully and reason step by step. Provide your final answer in a boxed format, like \\boxed{{your answer}}. + task_template: |- + Question: {question} + + Solve step by step and put your final answer within \\boxed{{}}. + llm_max_rollouts: 16 + shared_memory_entry_size: 2000000000 + max_stream_size: 1000 + +preprocess: + shared_memory_entry_size: 2000000000 + max_stream_size: 1000 + +environment: null + +dataset_loader: pipelinerl.domains.mathv.load_problems + +train_dataset_names: + - geometry3k_train + +test_dataset_names: + - mathvista_testmini + +# Use vision-language model for multimodal support +model_path: Qwen/Qwen3-VL-4B-Instruct +# model_path: Qwen/Qwen2.5-VL-3B-Instruct + +eval_every_n_versions: 12500 + +# Override vLLM config for multimodal support +vllm_config: + vllm_kwargs: + max-num-seqs: 64 + max_model_len: 12000 + max-num-batched-tokens: 32768 + # Qwen3-VL defaults (effective 32x32 per visual token); for Qwen2.5-VL use min_pixels=784, max_pixels=1003520 + mm-processor-kwargs: '{"min_pixels": 1024, "max_pixels": 1310720, "use_fast": true}' # 32*32 to 1280*32*32 diff --git a/pipelinerl/domains/mathv/README.md b/pipelinerl/domains/mathv/README.md new file mode 100644 index 00000000..62aa8db8 --- /dev/null +++ b/pipelinerl/domains/mathv/README.md @@ -0,0 +1,12 @@ +# Math Visual Reasoning (mathv) + +A Vision Language Model (VLM) RL example for math reasoning over images. +Trains on [Geometry3K](https://huggingface.co/datasets/hiyouga/geometry3k) +and evaluates on [MathVista](https://huggingface.co/datasets/AI4Math/MathVista) +(`testmini` split). + +## Usage + +```bash +python -m pipelinerl.launch output_dir=results/mathv --config-name mathv +``` diff --git a/pipelinerl/domains/mathv/__init__.py b/pipelinerl/domains/mathv/__init__.py new file mode 100644 index 00000000..827f339b --- /dev/null +++ b/pipelinerl/domains/mathv/__init__.py @@ -0,0 +1,6 @@ +"""Math visual reasoning domain (Geometry3K for training, MathVista for eval).""" + +from .mathv import generate_mathv_rollout +from .load_datasets import load_problems + +__all__ = ["generate_mathv_rollout", "load_problems"] diff --git a/pipelinerl/domains/mathv/evaluation.py b/pipelinerl/domains/mathv/evaluation.py new file mode 100644 index 00000000..8f2a6f82 --- /dev/null +++ b/pipelinerl/domains/mathv/evaluation.py @@ -0,0 +1,56 @@ +import re +from typing import Optional + + +def relaxed_correctness(target: str, + prediction: str, + max_relative_change: float = 0.05) -> bool: + """Numeric answers within ``max_relative_change`` are correct; otherwise + fall back to case-insensitive exact match (handles letter answers like + "A"/"B"/"C"/"D" and short strings). + """ + + def _to_float(text: str) -> Optional[float]: + try: + if text.endswith("%"): + return float(text.rstrip("%")) / 100.0 + return float(text) + except ValueError: + return None + + prediction_float = _to_float(prediction) + target_float = _to_float(target) + if prediction_float is not None and target_float: + relative_change = abs(prediction_float - target_float) / abs(target_float) + return relative_change <= max_relative_change + return prediction.strip().lower() == target.strip().lower() + + +def extract_boxed_answer(text: str) -> str | None: + """Extract answer from \\boxed{} format.""" + boxed_pattern = r'\\boxed\{([^}]*)\}' + matches = re.findall(boxed_pattern, text, re.IGNORECASE) + if matches: + return matches[-1].strip() + return None + + +def evaluate_answer(predicted: str, ground_truth: str) -> str: + """ + Evaluate math-visual answer and return status. + + Returns: + - "correct": Answer is correct + - "wrong": Answer is incorrect + - "no_answer": No \\boxed{} found + - "unparsable": Could not parse answer + """ + try: + boxed_answer = extract_boxed_answer(predicted) + if not boxed_answer: + return "no_answer" + if relaxed_correctness(ground_truth, boxed_answer): + return "correct" + return "wrong" + except Exception: + return "unparsable" diff --git a/pipelinerl/domains/mathv/load_datasets.py b/pipelinerl/domains/mathv/load_datasets.py new file mode 100644 index 00000000..ac0e75a9 --- /dev/null +++ b/pipelinerl/domains/mathv/load_datasets.py @@ -0,0 +1,117 @@ +import logging +from typing import List + +from datasets import load_dataset + +logger = logging.getLogger(__name__) + +DOMAIN = "mathv" + + +def _format_choices(choices) -> str: + if not choices: + return "" + letters = ["A", "B", "C", "D", "E", "F", "G", "H"] + return "\n".join(f"{letters[i]}) {c}" for i, c in enumerate(choices)) + + +def _first_image(item: dict): + # MathVista ships both `image` (string path) and `decoded_image` (PIL Image); + # geometry3k ships `images` (list of PIL Images). Prefer fields that hold the + # actual decoded image, and skip string paths since downstream code expects PIL. + # NOTE: geometry3k and MathVista testmini are single-image-per-item, so taking + # [0] of `images` loses nothing here. If a multi-image dataset is added (e.g. + # MMMU, MathVista full test), this needs to return a list and the rollout's + # create_multimodal_message must emit multiple image_url content blocks. + img = item.get("decoded_image") + if img is not None and not isinstance(img, str): + return img + if "images" in item and item["images"]: + first = item["images"][0] + if not isinstance(first, str): + return first + img = item.get("image") + if img is not None and not isinstance(img, str): + return img + return None + + +def process_geometry3k(dataset, dataset_name: str): + """hiyouga/geometry3k schema: {images, problem, answer}. The `problem` + field already contains the choices inline.""" + for item in dataset: + image = _first_image(item) + if image is None or "problem" not in item or "answer" not in item: + continue + try: + yield { + "dataset": dataset_name, + "image": image, + "question": item["problem"], + "answer": str(item["answer"]).strip(), + } + except Exception as e: + logger.error(f"Error processing geometry3k item: {e}") + continue + + +def process_mathvista(dataset, dataset_name: str): + """AI4Math/MathVista schema: {pid, question, choices, answer, + question_type, decoded_image, ...}.""" + for item in dataset: + image = _first_image(item) + if image is None or "question" not in item or item.get("answer") is None: + continue + try: + question = item["question"] + choices_block = _format_choices(item.get("choices")) + if choices_block: + question = f"{question}\n\nChoices:\n{choices_block}" + yield { + "dataset": dataset_name, + "image": image, + "question": question, + "answer": str(item["answer"]).strip(), + } + except Exception as e: + logger.error(f"Error processing mathvista item: {e}") + continue + + +def add_ids(dataset: list[dict]): + for i, entry in enumerate(dataset): + entry["id"] = i + entry.setdefault("domain", DOMAIN) + return dataset + + +def load_problems(dataset_names: List[str] | str | None) -> List[dict]: + """Load math-visual datasets and return a list of standardized problems.""" + if dataset_names is None: + return [] + + if isinstance(dataset_names, str): + dataset_names = [dataset_names] + + out: list[dict] = [] + + if "geometry3k_train" in dataset_names: + ds = load_dataset("hiyouga/geometry3k", split="train", trust_remote_code=True) + out += add_ids(list(process_geometry3k(ds, "geometry3k_train"))) + + if "geometry3k_test" in dataset_names: + ds = load_dataset("hiyouga/geometry3k", split="test", trust_remote_code=True) + out += add_ids(list(process_geometry3k(ds, "geometry3k_test"))) + + if "geometry3k_val" in dataset_names: + ds = load_dataset("hiyouga/geometry3k", split="validation", trust_remote_code=True) + out += add_ids(list(process_geometry3k(ds, "geometry3k_val"))) + + if "mathvista_testmini" in dataset_names: + ds = load_dataset("AI4Math/MathVista", split="testmini", trust_remote_code=True) + out += add_ids(list(process_mathvista(ds, "mathvista_testmini"))) + + if not out: + raise ValueError(f"No mathv datasets loaded from {dataset_names!r}") + + return out diff --git a/pipelinerl/domains/mathv/mathv.py b/pipelinerl/domains/mathv/mathv.py new file mode 100644 index 00000000..a696d782 --- /dev/null +++ b/pipelinerl/domains/mathv/mathv.py @@ -0,0 +1,143 @@ +import base64 +import io +import logging +import time +from typing import Any, Dict + +import aiohttp +from omegaconf import DictConfig +from PIL import Image +from pydantic import BaseModel + +from pipelinerl.async_llm import llm_async_generate, make_training_text +from pipelinerl.llm import Prompt, TrainableLLM +from pipelinerl.rollouts import BaseMetrics, RolloutResult + +from .evaluation import evaluate_answer + +logger = logging.getLogger(__name__) + + +class MathVRewardTable(BaseModel): + wrong_answer_not_finished: float + wrong_answer_finished: float + no_answer_not_finished: float + no_answer_finished: float + unparsable_not_finished: float + unparsable_finished: float + correct_answer_not_finished: float + correct_answer_finished: float + + +def encode_image_to_base64(image: Image.Image) -> str: + """Convert PIL Image to base64 string.""" + buffered = io.BytesIO() + image.save(buffered, format="PNG") + img_str = base64.b64encode(buffered.getvalue()).decode() + return f"data:image;base64,{img_str}" + + +def create_multimodal_message(image: Image.Image, question: str) -> Dict[str, Any]: + """Create a multimodal message with image and text.""" + image_base64 = encode_image_to_base64(image) + + return { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": image_base64 + } + }, + { + "type": "text", + "text": question + } + ] + } + + +async def generate_mathv_rollout( + cfg: DictConfig, + llm: TrainableLLM, + problem: dict, + session: aiohttp.ClientSession, +) -> RolloutResult: + """Generate a rollout for the math visual reasoning domain.""" + messages = [] + + if cfg.actor.system_prompt: + messages.append({"role": "system", "content": cfg.actor.system_prompt}) + + question_text = cfg.actor.task_template.format(question=problem["question"]) + multimodal_message = create_multimodal_message(problem["image"], question_text) + messages.append(multimodal_message) + + prompt = Prompt(messages=messages) + + time_start = time.time() + llm_call = await llm_async_generate(llm, prompt, session) + latency = time.time() - time_start + + assert llm_call.output.content is not None + rewards = MathVRewardTable(**dict(cfg.rewards)) + discount_factor = cfg.actor.discount_factor + + if llm.tokenizer.eos_token is not None and llm_call.output.content.endswith(llm.tokenizer.eos_token): + content = llm_call.output.content[:-len(llm.tokenizer.eos_token)] + else: + content = llm_call.output.content + try: + answer_status = evaluate_answer(content, problem["answer"]) + except Exception as e: + logger.error(f"Error evaluating answer: {e}") + answer_status = "unparsable" + + try: + trace = make_training_text(llm, llm_call) + except Exception as e: + logger.error(f"Error creating training text: {e}") + raise + + try: + match (answer_status, trace.finished): + case ("wrong", False): + reward = rewards.wrong_answer_not_finished + case ("wrong", True): + reward = rewards.wrong_answer_finished + case ("no_answer", False): + reward = rewards.no_answer_not_finished + case ("no_answer", True): + reward = rewards.no_answer_finished + case ("unparsable", False): + reward = rewards.unparsable_not_finished + case ("unparsable", True): + reward = rewards.unparsable_finished + case ("correct", False): + reward = rewards.correct_answer_not_finished + case ("correct", True): + reward = rewards.correct_answer_finished + case _: + raise ValueError(f"Invalid answer_status/finished combination: {answer_status}/{trace.finished}") + + reward *= discount_factor**llm_call.output_length_tokens + trace.reward = reward + except Exception as e: + logger.error(f"Error calculating reward: {e}") + raise + + metrics = BaseMetrics( + reward=reward, + success=answer_status == "correct", + no_error=answer_status != "unparsable", + no_answer=answer_status == "no_answer", + ) + + return RolloutResult( + training_texts=[trace], + metrics=metrics, + dataset_name=problem.get("dataset"), + latency=latency, + domain="mathv", + )