Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 53 additions & 52 deletions aggregator/src/aggregator.rs

Large diffs are not rendered by default.

118 changes: 54 additions & 64 deletions aggregator/src/aggregator/aggregation_job_continue.rs

Large diffs are not rendered by default.

166 changes: 84 additions & 82 deletions aggregator/src/aggregator/aggregation_job_driver.rs

Large diffs are not rendered by default.

210 changes: 105 additions & 105 deletions aggregator/src/aggregator/aggregation_job_driver/tests.rs

Large diffs are not rendered by default.

252 changes: 118 additions & 134 deletions aggregator/src/aggregator/aggregation_job_init.rs

Large diffs are not rendered by default.

50 changes: 25 additions & 25 deletions aggregator/src/aggregator/aggregation_job_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use janus_aggregator_core::{
};
use janus_core::{report_id::ReportIdChecksumExt as _, time::Clock, vdaf::VdafInstance};
use janus_messages::{
AggregationJobId, Interval, PrepareResp, PrepareStepResult, ReportError, ReportId,
ReportIdChecksum, Time,
AggregationJobId, Interval, ReportError, ReportId, ReportIdChecksum, Time, VerifyResp,
VerifyStepResult,
};
use opentelemetry::{
KeyValue,
Expand Down Expand Up @@ -166,10 +166,10 @@ where
/// operation (aggregation into a collected batch is not allowed). These report aggregations
/// will be written with a `Failed(BatchCollected)` state.
///
/// A map from aggregation job ID to the associated preparation responses (if any) will be
/// A map from aggregation job ID to the associated verification responses (if any) will be
/// returned, along with aggregation counters indicating occurrences of aggregation-related
/// events. In the case that a report aggregation was unaggregatable, these preparation
/// responses will be updated from the preparation responses originally included in the given
/// events. In the case that a report aggregation was unaggregatable, these verification
/// responses will be updated from the verification responses originally included in the given
/// report aggregations.
#[tracing::instrument(
name = "AggregationJobWriter::write",
Expand All @@ -180,7 +180,7 @@ where
&self,
tx: &Transaction<'_, C>,
vdaf: Arc<A>,
) -> Result<HashMap<AggregationJobId, Vec<PrepareResp>>, Error> {
) -> Result<HashMap<AggregationJobId, Vec<VerifyResp>>, Error> {
// Read & update state based on the aggregation jobs to be written. We will read batch
// aggregations, then update aggregation jobs/report aggregations/batch aggregations based
// on the state we read.
Expand Down Expand Up @@ -241,7 +241,7 @@ where
.report_aggregations
.iter()
.map(AsRef::as_ref)
.filter_map(RA::Borrowed::last_prep_resp)
.filter_map(RA::Borrowed::last_verify_resp)
.cloned()
.collect(),
)
Expand Down Expand Up @@ -732,8 +732,8 @@ where

#[cfg(feature = "test-util")]
Fake { rounds: _ }
| FakeFailsPrepInit
| FakeFailsPrepStep => metrics
| FakeFailsVerifyInit
| FakeFailsVerifyStep => metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "Fake")]),
_ => metrics
Expand All @@ -757,11 +757,11 @@ where
});
self.writer
.task_aggregation_counters
.increment_with_report_error(ReportError::VdafPrepError);
.increment_with_report_error(ReportError::VdafVerifyError);
*report_aggregation.to_mut() = report_aggregation
.as_ref()
.clone()
.with_failure(ReportError::VdafPrepError);
.with_failure(ReportError::VdafVerifyError);
}
}
}
Expand Down Expand Up @@ -917,8 +917,8 @@ pub trait ReportAggregationUpdate<const SEED_SIZE: usize, A: AsyncAggregator<SEE
/// the "Failed" state, with the given [`ReportError`].
fn with_failure(self, report_error: ReportError) -> Self;

/// Returns the last preparation response from this report aggregation, if any.
fn last_prep_resp(&self) -> Option<&PrepareResp>;
/// Returns the last verification response from this report aggregation, if any.
fn last_verify_resp(&self) -> Option<&VerifyResp>;

/// Write this report aggregation to the datastore. This must be used only with newly-created
/// report aggregations.
Expand Down Expand Up @@ -972,13 +972,13 @@ impl<const SEED_SIZE: usize, A: AsyncAggregator<SEED_SIZE>> ReportAggregationUpd
.with_state(ReportAggregationState::Failed { report_error });

// This check effectively checks if we are the Helper. (The Helper will always set
// last_prep_resp for all non-failed report aggregations, and most failed report
// last_verify_resp for all non-failed report aggregations, and most failed report
// aggregations [everything but ReportDropped].)
if report_aggregation.last_prep_resp().is_some() {
if report_aggregation.last_verify_resp().is_some() {
let report_id = *report_aggregation.report_id();
report_aggregation = report_aggregation.with_last_prep_resp(Some(PrepareResp::new(
report_aggregation = report_aggregation.with_last_verify_resp(Some(VerifyResp::new(
report_id,
PrepareStepResult::Reject(report_error),
VerifyStepResult::Reject(report_error),
)));
}

Expand All @@ -988,9 +988,9 @@ impl<const SEED_SIZE: usize, A: AsyncAggregator<SEED_SIZE>> ReportAggregationUpd
}
}

/// Returns the last preparation response from this report aggregation, if any.
fn last_prep_resp(&self) -> Option<&PrepareResp> {
self.report_aggregation.last_prep_resp()
/// Returns the last verification response from this report aggregation, if any.
fn last_verify_resp(&self) -> Option<&VerifyResp> {
self.report_aggregation.last_verify_resp()
}

async fn write_new(&self, tx: &Transaction<impl Clock>) -> Result<(), Error> {
Expand Down Expand Up @@ -1037,8 +1037,8 @@ where
self.with_state(ReportAggregationMetadataState::Failed { report_error })
}

/// Returns the last preparation response from this report aggregation, if any.
fn last_prep_resp(&self) -> Option<&PrepareResp> {
/// Returns the last verification response from this report aggregation, if any.
fn last_verify_resp(&self) -> Option<&VerifyResp> {
None
}

Expand Down Expand Up @@ -1089,9 +1089,9 @@ where
Self::Owned(self.into_owned().with_failure(report_error))
}

/// Returns the last preparation response from this report aggregation, if any.
fn last_prep_resp(&self) -> Option<&PrepareResp> {
self.as_ref().last_prep_resp()
/// Returns the last verification response from this report aggregation, if any.
fn last_verify_resp(&self) -> Option<&VerifyResp> {
self.as_ref().last_verify_resp()
}

async fn write_new(&self, tx: &Transaction<impl Clock>) -> Result<(), Error> {
Expand Down
12 changes: 6 additions & 6 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,15 @@ pub(crate) fn handle_ping_pong_error(
let (error_desc, value) = match ping_pong_error {
PingPongError::VdafVerifyInit(_) => (
"Couldn't helper_initialize report share".to_string(),
"prepare_init_failure".to_string(),
"verify_init_failure".to_string(),
),
PingPongError::VdafVerifierSharesToMessage(_) => (
"Couldn't compute prepare message".to_string(),
"prepare_message_failure".to_string(),
"verify_message_failure".to_string(),
),
PingPongError::VdafVerifyNext(_) => (
"Prepare next failed".to_string(),
"prepare_next_failure".to_string(),
"Verify next failed".to_string(),
"verify_next_failure".to_string(),
),
PingPongError::CodecVerifierShare(_) => (
format!("Couldn't decode {peer_role} prepare share"),
Expand Down Expand Up @@ -442,6 +442,6 @@ pub(crate) fn handle_ping_pong_error(
aggregate_step_failure_counter.add(1, &[KeyValue::new("type", value)]);

// Per DAP, any occurrence of state Rejected() from a ping-pong routime is translated to
// VdafPrepError
ReportError::VdafPrepError
// VdafVerifyError
ReportError::VdafVerifyError
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn aggregate_share_request_to_leader() {
..
} = HttpHandlerTest::new().await;

// Prepare parameters.
// Set up parameters.
let task = TaskBuilder::new(
BatchMode::TimeInterval,
AggregationMode::Synchronous,
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn aggregate_share_request_invalid_batch_interval() {
..
} = HttpHandlerTest::new().await;

// Prepare parameters.
// Set up parameters.
let time_precision = TimePrecision::from_hours(8);
let task = TaskBuilder::new(
BatchMode::TimeInterval,
Expand Down
Loading
Loading