diff --git a/backend/app/routers/papers.py b/backend/app/routers/papers.py index 2c88262..629784c 100644 --- a/backend/app/routers/papers.py +++ b/backend/app/routers/papers.py @@ -151,7 +151,12 @@ async def create_paper( # 4. Enqueue background task redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) try: - job = await redis.enqueue_job("generate_paper_task", str(new_paper.id), str(new_job.id)) + job = await redis.enqueue_job( + "generate_paper_task", + str(new_paper.id), + str(new_job.id), + _job_id=str(new_job.id) + ) if job is None: raise RuntimeError("Failed to enqueue generate_paper_task") await db.commit() diff --git a/backend/app/routers/resources.py b/backend/app/routers/resources.py index 9d7a050..11b4505 100644 --- a/backend/app/routers/resources.py +++ b/backend/app/routers/resources.py @@ -13,9 +13,15 @@ from arq import create_pool from ..config import settings from arq.connections import RedisSettings +import logging router = APIRouter(prefix="/resources", tags=["resources"]) +# Configure logging for the task +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + @router.post("/", response_model=ResourceOut) async def upload_resource( type: str = Form(...), @@ -88,46 +94,58 @@ async def upload_resource( detail="Failed to upload file to storage" ) - # Create DB record (not committed yet) - new_resource = Resource( - user_id=current_user.id, - filename=file.filename, - file_url=file_url, - type=type, - status="processing" - ) - db.add(new_resource) - await db.flush() # Flush to get the ID but don't commit - - from ..models.job import Job - new_job = Job( - user_id=current_user.id, - job_type="ingest", - status="queued", - ref_id=new_resource.id - ) - db.add(new_job) - await db.flush() - - # Enqueue background extraction task before committing DB try: - redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) - # Pass job_id as second argument - await redis.enqueue_job('extraction_task', str(new_resource.id), str(new_job.id)) + # Create DB record (not committed yet) + new_resource = Resource( + user_id=current_user.id, + filename=file.filename, + file_url=file_url, + type=type, + status="processing" + ) + db.add(new_resource) + await db.flush() # Flush to get the ID - # Only commit if enqueue was successful - await db.commit() - await db.refresh(new_resource) + from ..models.job import Job + new_job = Job( + user_id=current_user.id, + job_type="ingest", + status="queued", + ref_id=new_resource.id + ) + db.add(new_job) + await db.flush() + + # Enqueue background extraction task + redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) + try: + # Use _job_id to make it easy to find in hooks + await redis.enqueue_job( + 'extraction_task', + str(new_resource.id), + str(new_job.id), + _job_id=str(new_job.id) + ) + await db.commit() + await db.refresh(new_resource) + except Exception as e: + await db.rollback() + raise e # Caught by outer try/except + finally: + if 'redis' in locals(): + await redis.close() + except Exception as e: - await db.rollback() - # Should also ideally delete the file from Spaces here if we were strict + # CLEANUP: Delete from storage if DB transaction failed + logger.error(f"Upload transaction failed, cleaning up storage: {e}") + storage_service.delete_file(object_name) + + if isinstance(e, HTTPException): + raise e raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to queue background task: {str(e)}" + detail=f"Failed to complete upload transaction: {str(e)}" ) - finally: - if 'redis' in locals(): - await redis.close() return new_resource diff --git a/backend/app/workers/arq_worker.py b/backend/app/workers/arq_worker.py index 755e9c5..ffa5ea1 100644 --- a/backend/app/workers/arq_worker.py +++ b/backend/app/workers/arq_worker.py @@ -1,17 +1,93 @@ import asyncio +import logging +import sys +from datetime import datetime +from sqlalchemy import select from arq.connections import RedisSettings from arq.cron import cron -from app.config import settings +from ..config import settings from .tasks import extraction_task, generate_paper_task, reset_monthly_quotas +from ..database import SessionLocal +from ..models.job import Job +from ..models.resource import Resource +from ..models.paper import Paper + +# Use the arq.worker logger so messages show up in the same stream +logger = logging.getLogger('arq.worker') async def ping(ctx): return "pong" async def startup(ctx): - pass + logger.info("š [WORKER] Starting up and initializing hooks...") async def shutdown(ctx): - pass + logger.info("š [WORKER] Shutting down...") + +async def on_job_start(ctx): + logger.info(f"ā¶ļø [HOOK] Job Start: {ctx.get('job_id')}") + +async def after_job_end(ctx): + """ + Global hook called after every job in arq. + Ensures DB status is updated even if the task itself crashed. + """ + job_id = ctx.get('job_id') + success = ctx.get('success') + + # ARQ logs the failure, but we want our own trace + logger.info(f"š [HOOK] Job {job_id} ended. Success: {success}") + sys.stdout.flush() + + if not job_id: + logger.warning("ā ļø [HOOK] No job_id in context.") + return + + async with SessionLocal() as db: + try: + # We explicitly set _job_id in routers to match our DB Job ID + # So job_id here SHOULD be the UUID of our Job record. + result = await db.execute(select(Job).where(Job.id == job_id)) + job = result.scalar_one_or_none() + + if not job: + logger.error(f"ā [HOOK] Job {job_id} not found in database.") + return + + logger.info(f"š [HOOK] Current DB Job Status: {job.status}") + + # If the job is still in a transient state, force it to its final state + if job.status in ["running", "queued", "pending"]: + job.status = "done" if success else "failed" + job.completed_at = datetime.utcnow() + + # Update the specific entity status (Resource or Paper) + if not success: + if job.job_type == "ingest" and job.ref_id: + res_result = await db.execute(select(Resource).where(Resource.id == job.ref_id)) + resource = res_result.scalar_one_or_none() + if resource and resource.status == "processing": + resource.status = "failed" + logger.info(f"š [HOOK] Marked Resource {job.ref_id} as FAILED") + + elif job.job_type == "generate_paper" and job.ref_id: + paper_result = await db.execute(select(Paper).where(Paper.id == job.ref_id)) + paper = paper_result.scalar_one_or_none() + if paper and paper.status in ["pending", "generating"]: + paper.status = "failed" + logger.info(f"š [HOOK] Marked Paper {job.ref_id} as FAILED") + + await db.commit() + logger.info(f"ā [HOOK] Finalized DB Job {job_id} as {job.status}") + else: + logger.info(f"ā¹ļø [HOOK] DB Job {job_id} was already finalized as {job.status}") + + except Exception as e: + logger.error(f"ā [HOOK] Error updating database for job {job_id}: {e}") + import traceback + logger.error(traceback.format_exc()) + finally: + sys.stdout.flush() class WorkerSettings: functions = [ping, extraction_task, generate_paper_task] @@ -20,4 +96,6 @@ class WorkerSettings: ] on_startup = startup on_shutdown = shutdown + on_job_start = on_job_start + after_job_end = after_job_end redis_settings = RedisSettings.from_dsn(settings.REDIS_URL) diff --git a/backend/app/workers/tasks.py b/backend/app/workers/tasks.py index 4c7cb3c..e51adf0 100644 --- a/backend/app/workers/tasks.py +++ b/backend/app/workers/tasks.py @@ -18,6 +18,8 @@ async def extraction_task(ctx, resource_id: str, job_id: str = None): print(f"\nš [TASK START] Resource ID: {resource_id}") + + # raise RuntimeError("Simulated Worker Crash") async with SessionLocal() as db: try: diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 0f1e642..d0ed429 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -7,6 +7,7 @@ import Resources from './pages/Resources'; import Solver from './pages/Solver'; import Generator from './pages/Generator'; import ProtectedRoute from './components/ProtectedRoute'; +import PublicRoute from './components/PublicRoute'; import Layout from './components/Layout'; import { useAuthStore } from './store/authStore'; @@ -39,6 +40,7 @@ import { ArrowRight } from "lucide-react" import { cn } from './lib/utils'; +import { WelcomeBanner } from './components/WelcomeBanner'; const queryClient = new QueryClient(); @@ -110,6 +112,7 @@ function Dashboard() { return (
+ Get started with your academic journey in three simple steps: +
+{step.description}
+