Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/autopilot/src/domain/auction/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct Interaction {
}

/// Source from which the sellAmount should be drawn upon order fulfillment
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SellTokenSource {
/// Direct ERC20 allowances to the Vault relayer contract
Erc20,
Expand All @@ -90,7 +90,7 @@ pub enum SellTokenSource {
Internal,
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum BuyTokenDestination {
/// Pay trade proceeds as an ERC20 token transfer
Erc20,
Expand Down
13 changes: 5 additions & 8 deletions crates/autopilot/src/infra/persistence/dto/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,20 @@ use {
std::collections::BTreeMap,
};

pub fn from_domain(auction: domain::RawAuctionData) -> RawAuctionData {
pub fn from_domain(auction: &domain::RawAuctionData) -> RawAuctionData {
RawAuctionData {
block: auction.block,
orders: auction
.orders
.into_iter()
.iter()
.map(super::order::from_domain)
.collect(),
prices: auction
.prices
.into_iter()
.map(|(key, value)| (*key, value.get().0))
.collect(),
surplus_capturing_jit_order_owners: auction
.surplus_capturing_jit_order_owners
.into_iter()
.iter()
.map(|(key, value)| (**key, value.get().0))
.collect(),
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners.to_vec(),
}
}

Expand Down
23 changes: 15 additions & 8 deletions crates/autopilot/src/infra/persistence/dto/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Order {
pub quote: Option<Quote>,
}

pub fn from_domain(order: domain::Order) -> Order {
pub fn from_domain(order: &domain::Order) -> Order {
Order {
uid: order.uid.into(),
sell_token: order.sell.token.into(),
Expand All @@ -53,7 +53,8 @@ pub fn from_domain(order: domain::Order) -> Order {
buy_amount: order.buy.amount.into(),
protocol_fees: order
.protocol_fees
.into_iter()
.iter()
.cloned()
.map(FeePolicy::from_domain)
.collect(),
created: order.created,
Expand All @@ -63,18 +64,24 @@ pub fn from_domain(order: domain::Order) -> Order {
owner: order.owner,
partially_fillable: order.partially_fillable,
executed: order.executed.into(),
pre_interactions: order.pre_interactions.into_iter().map(Into::into).collect(),
pre_interactions: order
.pre_interactions
.iter()
.cloned()
.map(Into::into)
.collect(),
post_interactions: order
.post_interactions
.into_iter()
.iter()
.cloned()
.map(Into::into)
.collect(),
sell_token_balance: order.sell_token_balance.into(),
buy_token_balance: order.buy_token_balance.into(),
class: boundary::OrderClass::Limit,
app_data: order.app_data.into(),
signature: order.signature.into(),
quote: order.quote.map(Quote::from_domain),
app_data: order.app_data.clone().into(),
signature: order.signature.clone().into(),
quote: order.quote.as_ref().map(Quote::from_domain),
}
}

Expand Down Expand Up @@ -346,7 +353,7 @@ pub struct Quote {
}

impl Quote {
fn from_domain(quote: domain::Quote) -> Self {
fn from_domain(quote: &domain::Quote) -> Self {
Quote {
sell_amount: quote.sell_amount.0,
buy_amount: quote.buy_amount.0,
Expand Down
11 changes: 7 additions & 4 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,25 @@ impl Persistence {
self.upload_queue
.send(AuctionUpload {
auction_id: new_auction_id,
auction_data: dto::auction::from_domain(new_auction_data.clone()),
auction_data: dto::auction::from_domain(new_auction_data),
})
.expect("upload queue should be alive at all times");
}

/// Spawns a background task that uploads the auction to S3.
pub fn upload_auction_to_s3(&self, id: domain::auction::Id, auction: &domain::RawAuctionData) {
pub fn upload_auction_to_s3(
&self,
id: domain::auction::Id,
auction: Arc<domain::RawAuctionData>,
) {
if auction.orders.is_empty() {
return;
}
let Some(s3) = self.s3.clone() else {
return;
};
let auction = auction.clone();
tokio::task::spawn(async move {
let auction_dto = dto::auction::from_domain(auction);
let auction_dto = dto::auction::from_domain(auction.as_ref());
match s3.upload(id.to_string(), auction_dto).await {
Ok(key) => tracing::info!(?key, "uploaded auction to s3"),
Err(err) => tracing::warn!(?err, "failed to upload auction to s3"),
Expand Down
7 changes: 1 addition & 6 deletions crates/autopilot/src/infra/solvers/dto/solve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ impl Request {
observe::metrics::metrics().on_auction_overhead_start("autopilot", "serialize_request");
let helper = RequestHelper {
id: auction.id,
orders: auction
.orders
.clone()
.into_iter()
.map(dto::order::from_domain)
.collect(),
orders: auction.orders.iter().map(dto::order::from_domain).collect(),
tokens: auction
.prices
.iter()
Expand Down
109 changes: 71 additions & 38 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ pub struct Probes {
pub startup: Arc<Option<AtomicBool>>,
}

#[derive(Debug)]
struct CutAuction {
id: Id,
auction: Arc<domain::RawAuctionData>,
}

pub struct RunLoop {
config: Config,
eth: infra::Ethereum,
Expand Down Expand Up @@ -156,7 +162,7 @@ impl RunLoop {
}

pub async fn run_forever(self, mut control: ShutdownController) {
let mut last_auction = None;
let mut last_auction: Option<Arc<domain::RawAuctionData>> = None;
let mut last_block = None;

let self_arc = Arc::new(self);
Expand Down Expand Up @@ -281,28 +287,35 @@ impl RunLoop {
async fn next_auction(
&self,
start_block: BlockInfo,
prev_auction: &mut Option<domain::Auction>,
prev_auction: &mut Option<Arc<domain::RawAuctionData>>,
prev_block: &mut Option<B256>,
) -> Option<domain::Auction> {
// wait for appropriate time to start building the auction
let auction = self.cut_auction().await?;
tracing::trace!(auction_id = ?auction.id, "auction cut");
let CutAuction { id, auction } = self.cut_auction().await?;
tracing::trace!(auction_id = ?id, "auction cut");

// Only run the solvers if the auction or block has changed.
let previous = prev_auction.replace(auction.clone());
if previous.as_ref() == Some(&auction)
let previous = prev_auction.replace(Arc::clone(&auction));
if previous.as_deref() == Some(auction.as_ref())
&& prev_block.replace(start_block.hash) == Some(start_block.hash)
{
return None;
}
Comment thread
ashleychandy marked this conversation as resolved.

observe::log_auction_delta(&previous, &auction, &start_block);
observe::log_raw_auction_delta(id, previous.as_deref(), auction.as_ref(), &start_block);
self.probes.liveness.auction();
Metrics::auction_ready(start_block.observed_at);
Some(auction)

Some(domain::Auction {
id,
block: auction.block,
orders: auction.orders.clone(),
prices: auction.prices.clone(),
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners.clone(),
})
}

async fn cut_auction(&self) -> Option<domain::Auction> {
async fn cut_auction(&self) -> Option<CutAuction> {
let Some(auction) = self.solvable_orders_cache.current_auction().await else {
tracing::debug!("no current auction");
return None;
Expand All @@ -316,22 +329,18 @@ impl RunLoop {
Metrics::auction(id);

// always update the auction because the tests use this as a readiness probe
self.persistence.replace_current_auction_in_db(id, &auction);
self.persistence.upload_auction_to_s3(id, &auction);
self.persistence
.replace_current_auction_in_db(id, auction.as_ref());
self.persistence
.upload_auction_to_s3(id, Arc::clone(&auction));

if auction.orders.is_empty() {
// Updating liveness probe to not report unhealthy due to this optimization
self.probes.liveness.auction();
tracing::debug!("skipping empty auction");
return None;
}
Some(domain::Auction {
id,
block: auction.block,
orders: auction.orders,
prices: auction.prices,
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners,
})
Some(CutAuction { id, auction })
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -1169,37 +1178,61 @@ pub mod observe {
std::collections::HashSet,
};

pub fn log_auction_delta(
previous: &Option<domain::Auction>,
current: &domain::Auction,
fn log_order_delta<I, J>(
id: domain::auction::Id,
previous: I,
current: J,
start_block: &BlockInfo,
) {
let previous_uids = match previous {
Some(previous) => previous
.orders
.iter()
.map(|order| order.uid)
.collect::<HashSet<_>>(),
None => HashSet::new(),
};
let current_uids = current
.orders
.iter()
.map(|order| order.uid)
.collect::<HashSet<_>>();
) where
I: IntoIterator<Item = domain::OrderUid>,
J: IntoIterator<Item = domain::OrderUid>,
{
let previous_uids = previous.into_iter().collect::<HashSet<_>>();
let current_uids = current.into_iter().collect::<HashSet<_>>();
let added = current_uids.difference(&previous_uids);
let removed = previous_uids.difference(&current_uids);
tracing::debug!(
id = current.id,
id,
added = ?added,
"New orders in auction"
);
tracing::debug!(
id = current.id,
id,
removed = ?removed,
"Orders no longer in auction"
);
tracing::debug!(auction_id = current.id, ?start_block);
tracing::debug!(auction_id = id, ?start_block);
}

pub fn log_raw_auction_delta(
id: domain::auction::Id,
previous: Option<&domain::RawAuctionData>,
current: &domain::RawAuctionData,
start_block: &BlockInfo,
) {
log_order_delta(
id,
previous
.into_iter()
.flat_map(|auction| auction.orders.iter().map(|order| order.uid)),
current.orders.iter().map(|order| order.uid),
start_block,
);
}

pub fn log_auction_delta(
previous: &Option<domain::Auction>,
current: &domain::Auction,
start_block: &BlockInfo,
) {
log_order_delta(
current.id,
previous
.iter()
.flat_map(|auction| auction.orders.iter().map(|order| order.uid)),
current.orders.iter().map(|order| order.uid),
start_block,
);
}

pub fn bids(bids: &[domain::competition::Bid<Unscored>]) {
Expand Down
8 changes: 4 additions & 4 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct SolvableOrdersCache {
type Balances = HashMap<Query, U256>;

struct Inner {
auction: domain::RawAuctionData,
auction: Arc<domain::RawAuctionData>,
solvable_orders: boundary::SolvableOrders,
}

Expand Down Expand Up @@ -186,12 +186,12 @@ impl SolvableOrdersCache {
})
}

pub async fn current_auction(&self) -> Option<domain::RawAuctionData> {
pub async fn current_auction(&self) -> Option<Arc<domain::RawAuctionData>> {
self.cache
.lock()
.await
.as_ref()
.map(|inner| inner.auction.clone())
.map(|inner| Arc::clone(&inner.auction))
}

/// Manually update solvable orders. Usually called by the background
Expand Down Expand Up @@ -358,7 +358,7 @@ impl SolvableOrdersCache {
};

*self.cache.lock().await = Some(Inner {
auction,
auction: Arc::new(auction),
solvable_orders: db_solvable_orders,
});

Expand Down
Loading