Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions db/api_role.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Read-only API role for query connections.
-- Defense-in-depth: even if the query validator is bypassed,
-- this role cannot modify data, execute arbitrary functions,
-- or exhaust server resources.
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tidx_api') THEN
CREATE ROLE tidx_api WITH LOGIN PASSWORD 'tidx_api' NOSUPERUSER NOCREATEDB NOCREATEROLE;
END IF;
END $$;

-- Revoke all privileges first (deny-by-default)
REVOKE ALL ON ALL TABLES IN SCHEMA public FROM tidx_api;
REVOKE ALL ON ALL SEQUENCES IN SCHEMA public FROM tidx_api;
REVOKE EXECUTE ON ALL FUNCTIONS IN SCHEMA public FROM tidx_api;

-- Grant read-only access to indexed tables only
GRANT USAGE ON SCHEMA public TO tidx_api;
GRANT SELECT ON blocks, txs, logs, receipts, token_holders, token_balances TO tidx_api;

-- Grant execute only on ABI decode helper functions
GRANT EXECUTE ON FUNCTION abi_uint(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_int(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_address(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_bool(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_bytes(bytea, int) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_string(bytea, int) TO tidx_api;
GRANT EXECUTE ON FUNCTION format_address(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION format_uint(bytea) TO tidx_api;

-- Resource limits (prevent DoS)
ALTER ROLE tidx_api CONNECTION LIMIT 64;
ALTER ROLE tidx_api SET statement_timeout = '30s';
ALTER ROLE tidx_api SET work_mem = '64MB';
ALTER ROLE tidx_api SET temp_file_limit = '256MB';
147 changes: 102 additions & 45 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ fn default_timeout() -> u64 {
5000
}
fn default_limit() -> i64 {
10000
crate::query::HARD_LIMIT_MAX
}

#[derive(Serialize)]
Expand Down Expand Up @@ -299,7 +299,7 @@ async fn handle_query_once(

let options = QueryOptions {
timeout_ms: params.timeout_ms.clamp(100, 30000),
limit: params.limit.clamp(1, 100000),
limit: params.limit.clamp(1, crate::query::HARD_LIMIT_MAX),
};

// Route to appropriate engine
Expand Down Expand Up @@ -397,7 +397,7 @@ async fn handle_query_live(
let signature = params.signature;
let options = QueryOptions {
timeout_ms: params.timeout_ms.clamp(100, 30000),
limit: params.limit.clamp(1, 100000),
limit: params.limit.clamp(1, crate::query::HARD_LIMIT_MAX),
};

// Detect if this is an OLAP query (aggregations, etc.)
Expand Down Expand Up @@ -477,7 +477,16 @@ async fn handle_query_live(
} else {
let catch_up_start = last_block_num + 1;
for block_num in catch_up_start..=end {
let filtered_sql = inject_block_filter(&sql, block_num);
let filtered_sql = match inject_block_filter(&sql, block_num) {
Ok(s) => s,
Err(e) => {
yield Ok(SseEvent::default()
.event("error")
.json_data(serde_json::json!({ "ok": false, "error": e.to_string() }))
.unwrap());
return;
}
};
match crate::service::execute_query_postgres(&pool, &filtered_sql, signature.as_deref(), &options).await {
Ok(result) => {
yield Ok(SseEvent::default()
Expand Down Expand Up @@ -516,50 +525,85 @@ async fn handle_query_live(
/// Inject a block number filter into SQL query for live streaming.
/// Transforms queries to only return data for the specific block.
/// Uses 'num' for blocks table, 'block_num' for txs/logs tables.
///
/// Uses sqlparser AST manipulation to safely add the filter condition,
/// avoiding SQL injection risks from string-based splicing.
#[doc(hidden)]
pub fn inject_block_filter(sql: &str, block_num: u64) -> String {
let sql_upper = sql.to_uppercase();

// Determine column name based on table being queried
let col = if sql_upper.contains("FROM BLOCKS") || sql_upper.contains("FROM \"BLOCKS\"") {
"num"
} else {
"block_num"
pub fn inject_block_filter(sql: &str, block_num: u64) -> Result<String, ApiError> {
use sqlparser::ast::{
BinaryOperator, Expr, Ident, SetExpr, Statement, Value,
};

// Find WHERE clause position
if let Some(where_pos) = sql_upper.find("WHERE") {
// Insert after WHERE
let insert_pos = where_pos + 5;
format!(
"{} {} = {} AND {}",
&sql[..insert_pos],
col,
block_num,
&sql[insert_pos..]
)
} else if let Some(order_pos) = sql_upper.find("ORDER BY") {
// Insert WHERE before ORDER BY
format!(
"{} WHERE {} = {} {}",
&sql[..order_pos],
col,
block_num,
&sql[order_pos..]
)
} else if let Some(limit_pos) = sql_upper.find("LIMIT") {
// Insert WHERE before LIMIT
format!(
"{} WHERE {} = {} {}",
&sql[..limit_pos],
col,
block_num,
&sql[limit_pos..]
)
} else {
// Append WHERE at end
format!("{sql} WHERE {col} = {block_num}")
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;

let dialect = GenericDialect {};
let mut statements = Parser::parse_sql(&dialect, sql)
.map_err(|e| ApiError::BadRequest(format!("SQL parse error: {e}")))?;

if statements.len() != 1 {
return Err(ApiError::BadRequest(
"Live mode requires exactly one SQL statement".to_string(),
));
}

let stmt = &mut statements[0];
let query = match stmt {
Statement::Query(q) => q,
_ => {
return Err(ApiError::BadRequest(
"Live mode requires a SELECT query".to_string(),
))
}
};

let select = match query.body.as_mut() {
SetExpr::Select(s) => s,
_ => {
return Err(ApiError::BadRequest(
"Live mode requires a simple SELECT query (UNION/INTERSECT not supported)"
.to_string(),
))
}
};

let table_name: String = select
.from
.first()
.and_then(|twj| match &twj.relation {
sqlparser::ast::TableFactor::Table { name, .. } => {
name.0.last().and_then(|part| part.as_ident()).map(|ident| ident.value.to_lowercase())
}
_ => None,
})
.ok_or_else(|| {
ApiError::BadRequest(
"Live mode requires a query with a FROM table clause".to_string(),
)
})?;

let col_name = if table_name == "blocks" { "num" } else { "block_num" };

let col_expr = Expr::CompoundIdentifier(vec![
Ident::new(&table_name),
Ident::new(col_name),
]);

let block_filter = Expr::BinaryOp {
left: Box::new(col_expr),
op: BinaryOperator::Eq,
right: Box::new(Expr::Value(Value::Number(block_num.to_string(), false).into())),
};

select.selection = Some(match select.selection.take() {
Some(existing) => Expr::BinaryOp {
left: Box::new(Expr::Nested(Box::new(existing))),
op: BinaryOperator::And,
right: Box::new(block_filter),
},
None => block_filter,
});

Ok(stmt.to_string())
}

/// Rewrite analytics table references to include chain-specific database prefix.
Expand Down Expand Up @@ -599,6 +643,19 @@ pub enum ApiError {
NotFound(String),
}

impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ApiError::BadRequest(msg) => write!(f, "{msg}"),
ApiError::Timeout => write!(f, "Query timeout"),
ApiError::QueryError(msg) => write!(f, "{msg}"),
ApiError::Internal(msg) => write!(f, "{msg}"),
ApiError::Forbidden(msg) => write!(f, "{msg}"),
ApiError::NotFound(msg) => write!(f, "{msg}"),
}
}
}

impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
let (status, message) = match self {
Expand Down
3 changes: 3 additions & 0 deletions src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub async fn run_migrations(pool: &Pool) -> Result<()> {
// Load any optional extensions
conn.batch_execute(include_str!("../../db/extensions.sql")).await?;

// Create read-only API role with SELECT-only access to indexed tables
conn.batch_execute(include_str!("../../db/api_role.sql")).await?;

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use parser::{
extract_order_by_columns, AbiParam, AbiType, EventSignature,
};
pub use router::{route_query, QueryEngine};
pub use validator::validate_query;
pub use validator::{validate_query, HARD_LIMIT_MAX};

use regex_lite::Regex;
use std::sync::LazyLock;
Expand Down
Loading