diff --git a/tests/test_step_rewards.py b/tests/test_step_rewards.py new file mode 100644 index 0000000000..4dc8a44c67 --- /dev/null +++ b/tests/test_step_rewards.py @@ -0,0 +1,352 @@ +"""Tests for step-level dense rewards.""" + +import pytest + +from verifiers import MultiTurnEnv, Parser, Rubric, State, step_reward +from verifiers.rubrics.step_reward_rubric import StepRewardRubric +from verifiers.utils.step_reward_utils import ( + apply_step_advantages, + compute_discounted_returns, +) + + +class TestComputeDiscountedReturns: + """Test cases for compute_discounted_returns.""" + + def test_empty(self): + assert compute_discounted_returns([]) == [] + + def test_single_step(self): + assert compute_discounted_returns([5.0]) == [5.0] + + def test_no_discounting(self): + result = compute_discounted_returns([1.0, 2.0, 3.0], gamma=1.0) + assert result == [6.0, 5.0, 3.0] + + def test_full_discounting(self): + result = compute_discounted_returns([1.0, 2.0, 3.0], gamma=0.0) + assert result == [1.0, 2.0, 3.0] + + def test_partial_discounting(self): + result = compute_discounted_returns([1.0, 1.0, 1.0], gamma=0.5) + # R_2 = 1.0 + # R_1 = 1.0 + 0.5*1.0 = 1.5 + # R_0 = 1.0 + 0.5*1.5 = 1.75 + assert result[2] == pytest.approx(1.0) + assert result[1] == pytest.approx(1.5) + assert result[0] == pytest.approx(1.75) + + +class TestApplyStepAdvantages: + """Test cases for apply_step_advantages.""" + + def _make_state(self, step_rewards): + trajectory = [ + {"reward": r, "advantage": None, "prompt": [], "completion": []} + for r in step_rewards + ] + return State(trajectory=trajectory, reward=None, advantage=None) + + def test_single_state_uniform_rewards(self): + states = [self._make_state([1.0, 1.0, 1.0])] + apply_step_advantages(states, gamma=1.0) + # All returns equal (3.0, 2.0, 1.0) -> normalized advantages + for step in states[0]["trajectory"]: + assert step["advantage"] is not None + + def test_two_states_advantage_normalization(self): + states = [ + self._make_state([1.0, 0.0]), + self._make_state([0.0, 1.0]), + ] + apply_step_advantages(states, gamma=1.0) + # All steps should have advantages that sum to ~0 + all_advantages = [] + for state in states: + for step in state["trajectory"]: + all_advantages.append(step["advantage"]) + assert sum(all_advantages) == pytest.approx(0.0, abs=1e-6) + + def test_sets_rollout_level_advantage(self): + states = [ + self._make_state([2.0, 1.0]), + self._make_state([0.0, 0.0]), + ] + apply_step_advantages(states, gamma=1.0) + assert states[0]["advantage"] is not None + assert states[1]["advantage"] is not None + # State with higher rewards should have higher advantage + assert states[0]["advantage"] > states[1]["advantage"] + + def test_empty_trajectory(self): + states = [State(trajectory=[], reward=None, advantage=None)] + apply_step_advantages(states, gamma=1.0) + assert states[0].get("advantage") is None + + def test_gamma_affects_returns(self): + states = [self._make_state([0.0, 0.0, 10.0])] + apply_step_advantages(states, gamma=0.5) + traj = states[0]["trajectory"] + # With gamma=0.5, earlier steps get less credit for future reward + assert traj[0]["reward"] < traj[1]["reward"] < traj[2]["reward"] + + +class TestStepRewardDecorator: + """Test cases for the @step_reward decorator.""" + + def test_bare_decorator(self): + @step_reward + def my_func(state): + return 1.0 + + assert getattr(my_func, "step_reward") is True + assert getattr(my_func, "step_reward_priority") == 0 + assert getattr(my_func, "step_reward_weight") == 1.0 + + def test_parameterized_decorator(self): + @step_reward(weight=0.5, priority=10) + def my_func(state): + return 1.0 + + assert getattr(my_func, "step_reward") is True + assert getattr(my_func, "step_reward_priority") == 10 + assert getattr(my_func, "step_reward_weight") == 0.5 + + +class TestStepRewardInRollout: + """Test cases for step rewards during rollout.""" + + @pytest.mark.asyncio + async def test_step_reward_called_per_turn( + self, mock_client, sample_chat_dataset, make_input + ): + class StepRewardEnv(MultiTurnEnv): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.step_reward_calls = 0 + + @step_reward + async def count_reward(self, state: State) -> float: + self.step_reward_calls += 1 + return 1.0 + + async def env_response(self, messages, state, **kwargs): + return [{"role": "user", "content": "Continue"}] + + mock_client.set_default_response("response") + env = StepRewardEnv( + client=mock_client, + model="test-model", + dataset=sample_chat_dataset, + max_turns=3, + parser=Parser(), + rubric=Rubric(), + ) + + state = await env.rollout( + input=make_input( + prompt=[{"role": "user", "content": "Start"}], + answer="answer", + ), + client=mock_client, + model="test-model", + ) + + assert env.step_reward_calls == 3 + for step in state["trajectory"]: + assert step["reward"] == 1.0 + + @pytest.mark.asyncio + async def test_weighted_step_rewards( + self, mock_client, sample_chat_dataset, make_input + ): + class WeightedEnv(MultiTurnEnv): + @step_reward(weight=2.0) + async def double_reward(self, state: State) -> float: + return 1.0 + + async def env_response(self, messages, state, **kwargs): + return [{"role": "user", "content": "Continue"}] + + mock_client.set_default_response("response") + env = WeightedEnv( + client=mock_client, + model="test-model", + dataset=sample_chat_dataset, + max_turns=2, + parser=Parser(), + rubric=Rubric(), + ) + + state = await env.rollout( + input=make_input( + prompt=[{"role": "user", "content": "Start"}], + answer="answer", + ), + client=mock_client, + model="test-model", + ) + + for step in state["trajectory"]: + assert step["reward"] == 2.0 + + @pytest.mark.asyncio + async def test_multiple_step_rewards( + self, mock_client, sample_chat_dataset, make_input + ): + class MultiRewardEnv(MultiTurnEnv): + @step_reward(priority=10) + async def reward_a(self, state: State) -> float: + return 1.0 + + @step_reward(weight=0.5) + async def reward_b(self, state: State) -> float: + return 2.0 + + async def env_response(self, messages, state, **kwargs): + return [{"role": "user", "content": "Continue"}] + + mock_client.set_default_response("response") + env = MultiRewardEnv( + client=mock_client, + model="test-model", + dataset=sample_chat_dataset, + max_turns=1, + parser=Parser(), + rubric=Rubric(), + ) + + state = await env.rollout( + input=make_input( + prompt=[{"role": "user", "content": "Start"}], + answer="answer", + ), + client=mock_client, + model="test-model", + ) + + # 1.0 * 1.0 + 2.0 * 0.5 = 2.0 + assert state["trajectory"][0]["reward"] == 2.0 + + @pytest.mark.asyncio + async def test_no_step_reward_handlers( + self, mock_client, sample_chat_dataset, make_input + ): + class PlainEnv(MultiTurnEnv): + async def env_response(self, messages, state, **kwargs): + return [{"role": "user", "content": "Continue"}] + + mock_client.set_default_response("response") + env = PlainEnv( + client=mock_client, + model="test-model", + dataset=sample_chat_dataset, + max_turns=2, + parser=Parser(), + rubric=Rubric(), + ) + + state = await env.rollout( + input=make_input( + prompt=[{"role": "user", "content": "Start"}], + answer="answer", + ), + client=mock_client, + model="test-model", + ) + + for step in state["trajectory"]: + assert step["reward"] is None + + @pytest.mark.asyncio + async def test_sync_step_reward_handler( + self, mock_client, sample_chat_dataset, make_input + ): + """Synchronous step_reward handlers should work without raising TypeError.""" + + class SyncEnv(MultiTurnEnv): + @step_reward + def sync_reward(self, state: State) -> float: + return 0.5 + + async def env_response(self, messages, state, **kwargs): + return [{"role": "user", "content": "Continue"}] + + mock_client.set_default_response("response") + env = SyncEnv( + client=mock_client, + model="test-model", + dataset=sample_chat_dataset, + max_turns=2, + parser=Parser(), + rubric=Rubric(), + ) + + state = await env.rollout( + input=make_input( + prompt=[{"role": "user", "content": "Start"}], + answer="answer", + ), + client=mock_client, + model="test-model", + ) + + for step in state["trajectory"]: + assert step["reward"] == 0.5 + + +class TestStepRewardRubric: + """Test cases for StepRewardRubric.""" + + @pytest.mark.asyncio + async def test_score_group(self): + rubric = StepRewardRubric(gamma=1.0) + states = [ + State( + trajectory=[ + {"reward": 1.0, "advantage": None}, + {"reward": 2.0, "advantage": None}, + ], + reward=None, + advantage=None, + ), + State( + trajectory=[ + {"reward": 0.0, "advantage": None}, + {"reward": 0.0, "advantage": None}, + ], + reward=None, + advantage=None, + ), + ] + + await rubric.score_group(states) + + assert states[0]["reward"] == 3.0 + assert states[1]["reward"] == 0.0 + assert states[0]["advantage"] > states[1]["advantage"] + for state in states: + for step in state["trajectory"]: + assert step["advantage"] is not None + + @pytest.mark.asyncio + async def test_score_group_with_discount(self): + rubric = StepRewardRubric(gamma=0.5) + states = [ + State( + trajectory=[ + {"reward": 0.0, "advantage": None}, + {"reward": 10.0, "advantage": None}, + ], + reward=None, + advantage=None, + ), + ] + + await rubric.score_group(states) + + traj = states[0]["trajectory"] + # step 0 return: 0.0 + 0.5*10.0 = 5.0 + # step 1 return: 10.0 + assert traj[0]["reward"] == pytest.approx(5.0) + assert traj[1]["reward"] == pytest.approx(10.0) diff --git a/verifiers/__init__.py b/verifiers/__init__.py index 35fbec78f0..db435a5655 100644 --- a/verifiers/__init__.py +++ b/verifiers/__init__.py @@ -19,6 +19,7 @@ metric, reward, setup, + step_reward, stop, teardown, update, @@ -75,6 +76,7 @@ "JudgeRubric", "RubricGroup", "MathRubric", + "StepRewardRubric", "TextArenaEnv", "ReasoningGymEnv", "GymEnv", @@ -139,6 +141,7 @@ "cleanup", "metric", "reward", + "step_reward", "advantage", "setup", "stop", @@ -185,6 +188,7 @@ "ToolEnv": "verifiers.envs.tool_env:ToolEnv", "EnvGroup": "verifiers.envs.env_group:EnvGroup", "JudgeRubric": "verifiers.rubrics.judge_rubric:JudgeRubric", + "StepRewardRubric": "verifiers.rubrics.step_reward_rubric:StepRewardRubric", "load_environment": "verifiers.utils.env_utils:load_environment", "load_harness": "verifiers.utils.env_utils:load_harness", "load_taskset": "verifiers.utils.env_utils:load_taskset", @@ -318,6 +322,7 @@ def __getattr__(name: str): from .envs.tool_env import ToolEnv # noqa: F401 from .rubrics.judge_rubric import JudgeRubric # noqa: F401 from .rubrics.math_rubric import MathRubric # noqa: F401 + from .rubrics.step_reward_rubric import StepRewardRubric # noqa: F401 from .utils.env_utils import ( # noqa: F401 load_environment, load_harness, diff --git a/verifiers/decorators.py b/verifiers/decorators.py index 9b0da8edc3..e83670f826 100644 --- a/verifiers/decorators.py +++ b/verifiers/decorators.py @@ -250,6 +250,40 @@ def decorator(f: F) -> F: return decorator(func) +@overload +def step_reward( + func: F, + weight: float = 1.0, + priority: int = 0, +) -> F: ... + + +@overload +def step_reward( + func: None = None, + weight: float = 1.0, + priority: int = 0, +) -> Callable[[F], F]: ... + + +def step_reward( + func: F | None = None, + weight: float = 1.0, + priority: int = 0, +) -> F | Callable[[F], F]: + """Decorator to mark a per-step reward function called after each turn.""" + + def decorator(f: F) -> F: + setattr(f, "step_reward", True) + setattr(f, "step_reward_priority", priority) + setattr(f, "step_reward_weight", weight) + return f + + if func is None: + return decorator + return decorator(func) + + @overload def teardown(func: F, priority: int = 0) -> F: ... diff --git a/verifiers/envs/multiturn_env.py b/verifiers/envs/multiturn_env.py index 40d1aa3c9a..e340ce9036 100644 --- a/verifiers/envs/multiturn_env.py +++ b/verifiers/envs/multiturn_env.py @@ -6,6 +6,7 @@ import verifiers as vf from verifiers.clients import Client +from verifiers.decorators import discover_decorated from verifiers.types import ( Messages, Response, @@ -15,6 +16,7 @@ TimeSpan, TrajectoryStep, ) +from verifiers.utils.async_utils import maybe_await from verifiers.utils.message_utils import ( concat_messages, maybe_normalize_messages, @@ -48,6 +50,8 @@ def __init__( self.timeout_seconds = timeout_seconds self.max_total_completion_tokens: int = -1 + self._step_reward_handlers = discover_decorated(self, "step_reward") + self.add_rubric(MultiTurnMonitorRubric()) def set_max_total_completion_tokens(self, max_total_completion_tokens: int) -> None: @@ -136,6 +140,16 @@ async def add_trajectory_step(self, state: State, trajectory_step: TrajectorySte """Override to set intermediate rewards, advantages, or extra metadata.""" state["trajectory"].append(trajectory_step) + async def _apply_step_rewards(self, state: State) -> None: + """Invoke all @vf.step_reward handlers and accumulate into the latest step.""" + if not self._step_reward_handlers: + return + for handler in self._step_reward_handlers: + reward = await maybe_await(handler, state) + if reward is not None: + weight = getattr(handler, "step_reward_weight", 1.0) + state.add_step_reward(float(reward) * weight) + async def add_model_response( self, state: State, @@ -205,7 +219,10 @@ async def rollout_loop() -> None: response = await self.get_model_response(state, prompt_messages) end_time = time.time() timing.model.spans.append(TimeSpan(start=start_time, end=end_time)) + prev_len = len(state["trajectory"]) await self.add_model_response(state, prompt_messages, response) + if len(state["trajectory"]) > prev_len: + await self._apply_step_rewards(state) except vf.Error as e: if isinstance(e, vf.OverlongPromptError): state["prompt_too_long"] = True diff --git a/verifiers/rubrics/step_reward_rubric.py b/verifiers/rubrics/step_reward_rubric.py new file mode 100644 index 0000000000..49a75026b0 --- /dev/null +++ b/verifiers/rubrics/step_reward_rubric.py @@ -0,0 +1,33 @@ +from typing import cast + +import verifiers as vf +from verifiers.types import State +from verifiers.utils.step_reward_utils import apply_step_advantages + + +def _sum_step_rewards(state: State) -> float: + return state.total_step_reward() + + +class StepRewardRubric(vf.Rubric): + """Rubric that aggregates step-level rewards with discounted advantages.""" + + def __init__(self, gamma: float = 1.0, weight: float = 1.0, **kwargs): + super().__init__(funcs=[_sum_step_rewards], weights=[weight], **kwargs) + self.gamma = gamma + + async def score_rollout(self, state: State): + """Score a single rollout by summing step rewards.""" + state["reward"] = _sum_step_rewards(state) + state["metrics"] = {"step_reward_sum": cast(float, state["reward"])} + + async def score_group(self, states: list[State]): + num_states = len(states) + if num_states == 0: + return + + for state in states: + state["reward"] = _sum_step_rewards(state) + state["metrics"] = {"step_reward_sum": cast(float, state["reward"])} + + apply_step_advantages(states, gamma=self.gamma) diff --git a/verifiers/utils/step_reward_utils.py b/verifiers/utils/step_reward_utils.py new file mode 100644 index 0000000000..153c3c1077 --- /dev/null +++ b/verifiers/utils/step_reward_utils.py @@ -0,0 +1,72 @@ +from verifiers.types import State + + +def compute_discounted_returns( + step_rewards: list[float], + gamma: float = 1.0, +) -> list[float]: + """Compute γ-discounted future returns for each step in a trajectory.""" + n = len(step_rewards) + if n == 0: + return [] + returns = [0.0] * n + returns[n - 1] = step_rewards[n - 1] + for t in range(n - 2, -1, -1): + returns[t] = step_rewards[t] + gamma * returns[t + 1] + return returns + + +def apply_step_advantages( + states: list[State], + gamma: float = 1.0, +) -> None: + """Assign per-step discounted advantages normalised across a group.""" + all_returns: list[float] = [] + index_map: list[tuple[int, int]] = [] + + for state_idx, state in enumerate(states): + trajectory = state.get("trajectory", []) + if not isinstance(trajectory, list): + continue + step_rewards = [] + for step in trajectory: + if not isinstance(step, dict): + continue + r = step.get("reward") + step_rewards.append(float(r) if r is not None else 0.0) + + if not step_rewards: + continue + + returns = compute_discounted_returns(step_rewards, gamma) + for step_idx, ret in enumerate(returns): + all_returns.append(ret) + index_map.append((state_idx, step_idx)) + + if not all_returns: + return + + n = len(all_returns) + mean = sum(all_returns) / n + variance = sum((r - mean) ** 2 for r in all_returns) / n + std = variance**0.5 + eps = 1e-8 + + for pos, (state_idx, step_idx) in enumerate(index_map): + advantage = (all_returns[pos] - mean) / (std + eps) + state = states[state_idx] + trajectory = state["trajectory"] + trajectory[step_idx]["advantage"] = advantage + trajectory[step_idx]["reward"] = all_returns[pos] + + for state_idx, state in enumerate(states): + trajectory = state.get("trajectory", []) + if not isinstance(trajectory, list) or not trajectory: + continue + step_advantages = [ + s["advantage"] + for s in trajectory + if isinstance(s, dict) and s.get("advantage") is not None + ] + if step_advantages: + state["advantage"] = sum(step_advantages) / len(step_advantages)