Skip to content

DevMuzee/GAQIP-Air_Monitoring

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GAQIP — Global Air Quality Intelligence Pipeline

ETL → PostgreSQL → dbt → Airflow orchestration. Built on real OpenWeather + GeoNames data across 209 countries, 583 cities, 88 features.


Stack

Layer Tool
Extract & Transform Python 3.11, Pandas
Load SQLAlchemy + PostgreSQL (raw schema)
Transformation dbt (staging → marts)
Orchestration Apache Airflow 2.x
Visualisation Power BI Desktop (via ODBC)
Environment Linux (Ubuntu) backend, Windows analytics layer
Next Docker, GitHub Actions

Project Structure

gaqip/
├── main.py                  # Pipeline entry point
├── extract/                 # API + CSV ingestion
├── transform/               # Pandas transformations
├── load/                    # SQLAlchemy DB loader
├── air_quality_project/     # dbt project root
│   ├── models/
│   │   ├── staging/         # stg_weather_air_quality
│   │   └── marts/           # mart_city_aqi, mart_country_summary
│   ├── dbt_project.yml
│   └── profiles.yml
├── dags/
│   └── gaqip_dag.py         # Airflow DAG
├── logs/
│   └── pipeline.log
└── requirements.txt

Execution Modes

1. Local (manual run)

python main.py

Runs full Extract → Transform → Load into raw.weather_air_quality.


2. dbt transformation layer

cd air_quality_project
dbt run
dbt test

Builds staging and mart models. Tests enforce schema and data quality constraints.


3. Airflow orchestration (production mode)

DAG: gaqip_dag.py

gaqip_pipeline
  ├── run_etl         (BashOperator — sources venv, runs main.py)
  ├── dbt_run         (BashOperator — dbt run)
  ├── dbt_test        (BashOperator — dbt test)
  └── notify          (success/failure alert)
default_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

Load Strategy

Dataset is a non-incremental snapshot — no append accumulation.

# Pattern: TRUNCATE → LOAD
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE raw.weather_air_quality"))

df.to_sql(
    name="weather_air_quality",
    con=conn,
    schema="raw",
    if_exists="append",
    index=False
)

Guarantees deterministic state on every run. Safe for dbt model rebuilds.


Setup (from scratch)

# 1. Clone and create venv
git clone https://github.com/<you>/gaqip.git && cd gaqip
python3.11 -m venv gp_venv && source gp_venv/bin/activate
pip install -r requirements.txt

# 2. Configure DB
export DB_URL="postgresql://user:pass@localhost:5432/gaqip"

# 3. Run pipeline
python main.py

# 4. Run dbt
cd air_quality_project && dbt run && dbt test

# 5. (Optional) Start Airflow
airflow db init
airflow scheduler & airflow webserver

Airflow tasks use source gp_venv/bin/activate inside BashOperators — not the system Python.


Failure Handling

Layer Strategy
ETL try/except with retry decorator, logged to logs/pipeline.log
DB load engine.begin() — full transactional commit, auto-rollback on error
Airflow Task-level retry (retries: 2, retry_delay: 5m)
dbt Model-level dbt test — fails DAG on assertion breach

Production Fixes Applied

1. ResourceClosedError on DB load

  • Root cause: connection used outside transaction scope
  • Fix: enforce engine.begin() context manager throughout loader

2. Airflow task using wrong Python environment

  • Root cause: BashOperator defaulting to system Python
  • Fix: explicit source gp_venv/bin/activate in every BashOperator command

3. dbt schema drift (staging_mart vs staging_marts)

  • Root cause: conflicting schema names between dbt_project.yml and profile
  • Fix: centralised schema config in dbt_project.yml, removed profile-level overrides

4. Power BI native connector SSL failure

  • Root cause: Power BI's .NET-based PostgreSQL driver enforces strict TLS certificate validation; the PostgreSQL server used a self-signed certificate, causing rejection at the TLS handshake — not a credentials or network issue
  • pgAdmin and psql both connected successfully (both accept self-signed certs by default)
  • Fix: bypassed native connector entirely via PostgreSQL ODBC driver (psqlodbc_x64.msi)
# ODBC DSN configuration (Windows ODBC Data Source Administrator — 64-bit)
Driver:   PostgreSQL Unicode(x64)
Host:     <linux-server-ip>
Port:     5432
Database: gaqip
SSL Mode: disable  ← bypasses TLS handshake enforcement

Power BI connection path:

Get Data → ODBC → Select DSN → Load tables
Tool SSL behaviour
pgAdmin Accepts self-signed certificates
psql CLI Accepts self-signed certificates
Power BI native connector Strict TLS validation — rejects self-signed ❌
Power BI via ODBC Configurable SSL mode — connection stable ✅

Observability

Logs written per stage to logs/pipeline.log:

[EXTRACT] 583 rows fetched — 2026-04-24T07:06:06Z
[TRANSFORM] Nulls filled: wind_gust=47, grnd_level=12
[LOAD] TRUNCATE → INSERT 583 rows — OK
[DBT] dbt run completed: 4 models, 0 errors
[DBT] dbt test completed: 11 tests passed

dbt Models

staging/
  └── stg_weather_air_quality     — typed, null-handled, renamed cols

marts/
  ├── mart_city_aqi               — city-level AQI ranking + pm2_5 category
  └── mart_country_summary        — country-level aggregations (avg temp, AQI dist)

Dataset Schema (key columns)

Column Type Description
country_code str ISO 2-letter code
city_name str GeoNames city
temp float °C (OpenWeather metric)
aqi int 1–5 (OpenWeather AQI scale)
pm2_5, pm10 float µg/m³
target_aqi_class int Encoded AQI label (ML target)
comfort_proxy float Derived: temp × (1 - humidity/100)
wind_pm25_interaction float wind_speed × pm2_5

Full schema: 88 columns, 583 rows, 209 countries, 5 continents.


Production Readiness

Item Status
Modular ETL
Transactional DB load
Retry logic (ETL + Airflow)
Structured logging
dbt staging + marts
dbt tests
Linux execution stability
Airflow DAG (BashOperator)
Power BI integration (ODBC)
Dockerization ⏳ next
CI/CD (GitHub Actions) ⏳ next
PythonOperator DAG refactor ⏳ next
FastAPI data layer ⏳ planned

Engineering Notes

This project was built through real production failures — not a tutorial:

  • Migrated from Windows → Linux for Airflow stability
  • Resolved Python 3.12 incompatibility with Airflow (pinned to 3.11)
  • Debugged ResourceClosedError from connection lifecycle mismanagement
  • Fixed dbt schema drift from profile/project config conflict
  • Stabilised Airflow venv isolation in BashOperator tasks
  • Resolved Power BI SSL rejection via ODBC driver (psqlodbc_x64) with sslmode=disable

Next Steps

  1. Dockerize — Postgres + Airflow + dbt in docker-compose.yml
  2. CI/CD — GitHub Actions: lint → test → dbt run on push
  3. PythonOperator refactor — replace BashOperators with native Python callables
  4. Great Expectations — row-level data quality assertions pre-load
  5. FastAPI layer/cities, /aqi, /countries endpoints over the mart layer

About

Global Air Quality Intelligence Platform

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages