Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ pub async fn discard(
Ok((updated_experiment, config_version_id))
}

pub async fn get_applicable_variants_helper(
pub fn get_applicable_variants_helper(
db_conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
context: Map<String, Value>,
dimensions_info: &HashMap<String, DimensionInfo>,
Expand Down Expand Up @@ -956,8 +956,7 @@ async fn get_applicable_variants_handler(
&dimensions_info,
identifier,
&workspace_context,
)
.await?;
)?;

let variants = exps
.into_iter()
Expand Down
56 changes: 56 additions & 0 deletions crates/service_utils/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,63 @@ use diesel::{
PgConnection,
r2d2::{ConnectionManager, Pool},
};
use superposition_types::{DBConnection, result};

pub mod utils;

pub type PgSchemaConnectionPool = Pool<ConnectionManager<PgConnection>>;

/// Helper macro to run a database query with connection management and error handling.
/// Example usage:
/// ```rust,ignore
/// run_query!(db_pool, conn, {
/// // Your query logic here, using `conn` as the database connection
/// });
/// ```
Comment thread
ayushjain17 marked this conversation as resolved.
#[macro_export]
macro_rules! run_query {
($db_pool:expr, $conn:ident, $body:expr) => {{
let mut $conn = $db_pool.get().map_err(|e| {
superposition_macros::unexpected_error!(
"Unable to get db connection from pool, error: {}",
e
)
})?;
diesel::Connection::set_prepared_statement_cache_size(
&mut $conn,
diesel::connection::CacheSize::Disabled,
);

$body
}};
}

/// Helper function to run a database transaction with connection management and error handling.
/// Example usage:
/// ```rust,ignore
/// run_transaction(&db_pool, |conn| {
/// // Your transactional query logic here, using `conn` as the database
/// // connection within the transaction
/// Ok(result) // Return a result from the transaction block
/// });
/// ```
pub fn run_transaction<F, T>(
db_pool: &PgSchemaConnectionPool,
query_fn: F,
) -> result::Result<T>
where
F: FnOnce(&mut DBConnection) -> result::Result<T>,
{
let mut conn = db_pool.get().map_err(|e| {
superposition_macros::unexpected_error!(
"Unable to get db connection from pool, error: {}",
e
)
})?;
diesel::Connection::set_prepared_statement_cache_size(
&mut conn,
diesel::connection::CacheSize::Disabled,
);

diesel::Connection::transaction(&mut conn, query_fn)
}
25 changes: 18 additions & 7 deletions crates/service_utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ use superposition_types::{
},
superposition_schema::superposition::workspaces,
},
result::{self},
result,
};

use crate::encryption::{EncryptionError, decrypt_secret, decrypt_workspace_key};
use crate::service::types::{AppState, SchemaName, WorkspaceContext};
use crate::{
db::PgSchemaConnectionPool,
encryption::{EncryptionError, decrypt_secret, decrypt_workspace_key},
run_query,
service::types::{AppState, SchemaName, WorkspaceContext},
};

// using named group to capture which type (secrets/variables) the regex was
// because variables and secrets need to be handled differently inside webhook execution
Expand Down Expand Up @@ -206,11 +210,18 @@ pub fn parse_config_tags(

pub fn get_workspace(
workspace_schema_name: &SchemaName,
db_conn: &mut DBConnection,
db_pool: &PgSchemaConnectionPool,
) -> result::Result<Workspace> {
let workspace = workspaces::dsl::workspaces
.filter(workspaces::workspace_schema_name.eq(workspace_schema_name.to_string()))
.get_result::<Workspace>(db_conn)?;
let workspace = run_query!(
db_pool,
conn,
workspaces::dsl::workspaces
.filter(
workspaces::workspace_schema_name.eq(workspace_schema_name.to_string()),
)
.get_result::<Workspace>(&mut conn)
)?;

Ok(workspace)
}

Expand Down
5 changes: 1 addition & 4 deletions crates/service_utils/src/middlewares/auth_n/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use actix_web::{HttpRequest, web::Data};
use diesel::{
Connection, ExpressionMethods, RunQueryDsl,
ExpressionMethods, RunQueryDsl,
query_dsl::methods::{OrderDsl, SelectDsl},
};
use superposition_types::database::superposition_schema::superposition::organisations;
Expand All @@ -20,9 +20,6 @@ pub(super) fn fetch_org_ids_from_db(

match app_state.db_pool.get() {
Ok(mut conn) => {
conn.set_prepared_statement_cache_size(
diesel::connection::CacheSize::Disabled,
);
let orgs = organisations::table
.order(organisations::created_at.desc())
.select(organisations::id)
Expand Down
12 changes: 3 additions & 9 deletions crates/service_utils/src/middlewares/workspace_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use actix_web::{
};
use futures_util::future::LocalBoxFuture;
use regex::Regex;
use superposition_macros::{bad_argument, unexpected_error};
use superposition_macros::bad_argument;

use crate::helpers::get_workspace;
use crate::{
Expand Down Expand Up @@ -137,14 +137,8 @@ where
(true, Some(workspace_id)) => {
let schema = format!("{}_{}", *organisation, *workspace_id);
let schema_name = SchemaName(schema);
let workspace_settings = {
let mut db_conn = app_state
.db_pool
.get()
.map_err(|err| unexpected_error!("{}", err))?;

get_workspace(&schema_name, &mut db_conn)?
};
let workspace_settings =
get_workspace(&schema_name, &app_state.db_pool)?;

req.extensions_mut().insert(workspace_id.clone());
req.extensions_mut().insert(WorkspaceContext {
Expand Down
65 changes: 37 additions & 28 deletions crates/superposition/src/organisation/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use actix_web::{
Scope, get, post, routes,
web::{Json, Path, Query},
web::{Data, Json, Path, Query},
};
use chrono::Utc;
use diesel::prelude::*;
use idgenerator::IdInstance;
use service_utils::service::types::DbConnection;
use service_utils::{run_query, service::types::AppState};
use superposition_derives::authorized;
use superposition_types::{
PaginatedResponse, User,
Expand All @@ -32,11 +32,9 @@ pub fn endpoints() -> Scope {
#[post("")]
pub async fn create_handler(
request: Json<CreateRequest>,
db_conn: DbConnection,
state: Data<AppState>,
user: User,
) -> superposition::Result<Json<Organisation>> {
let DbConnection(mut conn) = db_conn;

// Generating a numeric ID from IdInstance and prefixing it with `orgid`
let numeric_id = IdInstance::next_id();
let org_id = format!("orgid{}", numeric_id);
Expand All @@ -58,9 +56,11 @@ pub async fn create_handler(
updated_by: user.get_username(),
};

let new_org = diesel::insert_into(organisations::table)
.values(&new_org)
.get_result(&mut conn)?;
let new_org = run_query!(state.db_pool, conn, {
diesel::insert_into(organisations::table)
.values(&new_org)
.get_result(&mut conn)
})?;

Ok(Json(new_org))
}
Expand All @@ -72,18 +72,19 @@ pub async fn create_handler(
pub async fn update_handler(
org_id: Path<String>,
request: Json<UpdateRequest>,
db_conn: DbConnection,
state: Data<AppState>,
user: User,
) -> superposition::Result<Json<Organisation>> {
let DbConnection(mut conn) = db_conn;
let org_id = org_id.into_inner();
let now = Utc::now();
let req = request.into_inner();

let updated_org = diesel::update(organisations::table)
.filter(organisations::id.eq(org_id))
.set((req, updated_at.eq(now), updated_by.eq(user.get_email())))
.get_result(&mut conn)?;
let updated_org = run_query!(state.db_pool, conn, {
diesel::update(organisations::table)
.filter(organisations::id.eq(org_id))
.set((req, updated_at.eq(now), updated_by.eq(user.get_email())))
.get_result(&mut conn)
})?;

Ok(Json(updated_org))
}
Expand All @@ -92,35 +93,43 @@ pub async fn update_handler(
#[get("/{org_id}")]
pub async fn get_handler(
org_id: Path<String>,
db_conn: DbConnection,
state: Data<AppState>,
) -> superposition::Result<Json<Organisation>> {
let DbConnection(mut conn) = db_conn;

let org = organisations::table
.find(org_id.as_str())
.first::<Organisation>(&mut conn)?;
let org = run_query!(
state.db_pool,
conn,
organisations::table
.find(org_id.as_str())
.first::<Organisation>(&mut conn)
)?;

Ok(Json(org))
}

#[authorized]
#[get("")]
pub async fn list_handler(
db_conn: DbConnection,
state: Data<AppState>,
filters: Query<PaginationParams>,
) -> superposition::Result<Json<PaginatedResponse<Organisation>>> {
let DbConnection(mut conn) = db_conn;

if let Some(true) = filters.all {
let result: Vec<Organisation> = organisations::table
.order(organisations::created_at.desc())
.get_results(&mut conn)?;
let result = run_query!(
state.db_pool,
conn,
organisations::table
.order(organisations::created_at.desc())
.get_results(&mut conn)
)?;

return Ok(Json(PaginatedResponse::all(result)));
}

// Get total count of organisations
let total_items: i64 = organisations::table.count().get_result(&mut conn)?;
let total_items = run_query!(
state.db_pool,
conn,
organisations::table.count().get_result(&mut conn)
)?;

// Set up pagination
let limit = filters.count.unwrap_or(10);
Expand All @@ -136,7 +145,7 @@ pub async fn list_handler(
}

// Get paginated results
let data: Vec<Organisation> = builder.load(&mut conn)?;
let data = run_query!(state.db_pool, conn, builder.load(&mut conn))?;

let total_pages = (total_items as f64 / limit as f64).ceil() as i64;

Expand Down
Loading
Loading