-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/resource management #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
7dcd0fd
33ef631
c65d8fb
5c8f1ca
a16dd41
27d9ec6
25fc5a3
d6a65a6
2f425e9
effc1a4
5e532ad
8693850
f198b7c
76410ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, status | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| from sqlalchemy import select | ||
| from typing import List | ||
| import uuid | ||
| from ..database import get_db | ||
| from ..models.user import User | ||
| from ..models.resource import Resource | ||
| from ..schemas.resource import ResourceOut | ||
| from ..routers.auth import get_current_user | ||
| from ..services.storage import storage_service | ||
| from arq import create_pool | ||
| from ..config import settings | ||
| from arq.connections import RedisSettings | ||
|
|
||
| router = APIRouter(prefix="/resources", tags=["resources"]) | ||
|
|
||
| @router.post("/", response_model=ResourceOut) | ||
| async def upload_resource( | ||
| type: str = Form(...), | ||
| file: UploadFile = File(...), | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| if file.content_type not in ["application/pdf", "text/plain"]: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="Only PDF and Text files are supported" | ||
| ) | ||
|
|
||
| # Read file content | ||
| content = await file.read() | ||
|
|
||
| # Generate unique filename for storage | ||
| ext = file.filename.split('.')[-1] | ||
| object_name = f"user_{current_user.id}/{uuid.uuid4()}.{ext}" | ||
|
|
||
| # Upload to DigitalOcean Spaces | ||
| file_url = storage_service.upload_file(content, object_name, file.content_type) | ||
| if not file_url: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| detail="Failed to upload file to storage" | ||
| ) | ||
|
|
||
| # Create DB record | ||
| new_resource = Resource( | ||
| user_id=current_user.id, | ||
| filename=file.filename, | ||
| file_url=file_url, | ||
| type=type, | ||
| status="processing" | ||
| ) | ||
| db.add(new_resource) | ||
| await db.commit() | ||
| await db.refresh(new_resource) | ||
|
|
||
| # Enqueue background extraction task | ||
| redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) | ||
| await redis.enqueue_job('extraction_task', str(new_resource.id)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: find backend -type f -name "*.py" | head -20Repository: shubhamxdd/pyq-appl Length of output: 662 🏁 Script executed: cd backend && find . -type f -name "resources.py" | head -5Repository: shubhamxdd/pyq-appl Length of output: 90 🏁 Script executed: wc -l backend/app/routers/resources.pyRepository: shubhamxdd/pyq-appl Length of output: 100 🏁 Script executed: sed -n '50,70p' backend/app/routers/resources.pyRepository: shubhamxdd/pyq-appl Length of output: 751 🏁 Script executed: sed -n '110,130p' backend/app/routers/resources.pyRepository: shubhamxdd/pyq-appl Length of output: 499 🏁 Script executed: cat -n backend/app/routers/resources.py | head -150Repository: shubhamxdd/pyq-appl Length of output: 4948 🏁 Script executed: rg -n "create_pool|redis" backend/app --type py -A 2 -B 2Repository: shubhamxdd/pyq-appl Length of output: 1539 🏁 Script executed: find backend -type f -name "*.py" -exec rg -l "lifespan|startup|app.state" {} \;Repository: shubhamxdd/pyq-appl Length of output: 97 🏁 Script executed: cat backend/app/main.pyRepository: shubhamxdd/pyq-appl Length of output: 701 🏁 Script executed: rg -n "close|cleanup|app.state" backend/app --type pyRepository: shubhamxdd/pyq-appl Length of output: 45 🏁 Script executed: rg -n "arq|redis" backend/app/config.pyRepository: shubhamxdd/pyq-appl Length of output: 45 🏁 Script executed: cat backend/app/workers/arq_worker.pyRepository: shubhamxdd/pyq-appl Length of output: 475 🌐 Web query:
💡 Result: When integrating the ARQ library with FastAPI, managing Redis connection pools effectively is critical for performance and stability. Below are the best practices for pool creation and management. Centralized Pool Management Avoid creating a new Redis pool for every request or task. Instead, initialize a single, long-lived connection pool that is shared across your application [1]. In FastAPI, the recommended approach is to use the lifespan event handler to create the pool on startup and close it on shutdown [2]. Example of Lifespan Management: from contextlib import asynccontextmanager from fastapi import FastAPI from arq import create_pool from arq.connections import RedisSettings redis_pool = None Citations:
Initialize Redis pool once at app startup, not per request. Currently, each POST request to Then in routes, retrieve from 🤖 Prompt for AI Agents |
||
|
|
||
|
shubhamxdd marked this conversation as resolved.
|
||
| return new_resource | ||
|
|
||
| @router.get("/", response_model=List[ResourceOut]) | ||
| async def list_resources( | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| result = await db.execute( | ||
| select(Resource).where(Resource.user_id == current_user.id).order_by(Resource.created_at.desc()) | ||
| ) | ||
| return result.scalars().all() | ||
|
|
||
| @router.delete("/{resource_id}") | ||
| async def delete_resource( | ||
| resource_id: uuid.UUID, | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| result = await db.execute( | ||
| select(Resource).where(Resource.id == resource_id, Resource.user_id == current_user.id) | ||
| ) | ||
| resource = result.scalar_one_or_none() | ||
|
|
||
| if not resource: | ||
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found") | ||
|
|
||
| # Delete from Spaces | ||
| object_name = resource.file_url.replace(f"{settings.SPACES_PUBLIC_URL}/", "") | ||
| storage_service.delete_file(object_name) | ||
|
|
||
| # Delete from DB | ||
| await db.delete(resource) | ||
| await db.commit() | ||
|
shubhamxdd marked this conversation as resolved.
|
||
|
|
||
| return {"message": "Resource deleted successfully"} | ||
|
|
||
| @router.post("/{resource_id}/retry", response_model=ResourceOut) | ||
| async def retry_extraction( | ||
| resource_id: uuid.UUID, | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| result = await db.execute( | ||
| select(Resource).where(Resource.id == resource_id, Resource.user_id == current_user.id) | ||
| ) | ||
| resource = result.scalar_one_or_none() | ||
|
|
||
| if not resource: | ||
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found") | ||
|
|
||
| # Update status to processing | ||
| resource.status = "processing" | ||
| await db.commit() | ||
| await db.refresh(resource) | ||
|
|
||
| # Re-enqueue background extraction task | ||
| redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) | ||
| await redis.enqueue_job('extraction_task', str(resource.id)) | ||
|
|
||
| return resource | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from pydantic import BaseModel, ConfigDict | ||
| from uuid import UUID | ||
| from datetime import datetime | ||
| from typing import Optional, List | ||
|
|
||
| class ResourceBase(BaseModel): | ||
| filename: str | ||
| type: str # notes | syllabus | past_paper | other | ||
|
|
||
| class ResourceOut(ResourceBase): | ||
| id: UUID | ||
| user_id: UUID | ||
| file_url: str | ||
| status: str # pending | processing | ready | failed | ||
| created_at: datetime | ||
|
|
||
| model_config = ConfigDict(from_attributes=True) | ||
|
|
||
| class ResourceList(BaseModel): | ||
| resources: List[ResourceOut] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| import boto3 | ||
| from botocore.exceptions import ClientError | ||
| from ..config import settings | ||
| import logging | ||
|
|
||
| class StorageService: | ||
| def __init__(self): | ||
| self.session = boto3.session.Session() | ||
| self.client = self.session.client( | ||
| 's3', | ||
| region_name=settings.SPACES_REGION, | ||
| endpoint_url=settings.SPACES_ENDPOINT, | ||
| aws_access_key_id=settings.SPACES_KEY, | ||
| aws_secret_access_key=settings.SPACES_SECRET | ||
| ) | ||
|
|
||
| def upload_file(self, file_content: bytes, object_name: str, content_type: str = 'application/pdf'): | ||
| try: | ||
| self.client.put_object( | ||
| Bucket=settings.SPACES_BUCKET, | ||
| Key=object_name, | ||
| Body=file_content, | ||
| ACL='public-read', | ||
| ContentType=content_type | ||
| ) | ||
| return f"{settings.SPACES_PUBLIC_URL}/{object_name}" | ||
|
Comment on lines
+19
to
+26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cat backend/app/services/storage.pyRepository: shubhamxdd/pyq-appl Length of output: 1487 🏁 Script executed: rg "put_object|upload|storage" backend/app --type py -l | head -20Repository: shubhamxdd/pyq-appl Length of output: 157 🏁 Script executed: rg "public-read\|ACL" backend --type py -A 3 -B 3Repository: shubhamxdd/pyq-appl Length of output: 45 🏁 Script executed: cat backend/app/routers/resources.pyRepository: shubhamxdd/pyq-appl Length of output: 4101 🏁 Script executed: cat backend/app/workers/tasks.pyRepository: shubhamxdd/pyq-appl Length of output: 6629 Use private ACL with pre-signed URLs instead of public-read for uploaded resources. Files are stored with Refactor to use private ACL and generate pre-signed URLs for temporary, authenticated access. Update the extraction task to use the boto3 client directly for internal file access rather than fetching via HTTP. 🤖 Prompt for AI Agents |
||
| except ClientError as e: | ||
| logging.error(f"Error uploading file to DigitalOcean Spaces: {e}") | ||
| return None | ||
|
|
||
| def delete_file(self, object_name: str): | ||
| try: | ||
| self.client.delete_object( | ||
| Bucket=settings.SPACES_BUCKET, | ||
| Key=object_name | ||
| ) | ||
| return True | ||
| except ClientError as e: | ||
| logging.error(f"Error deleting file from DigitalOcean Spaces: {e}") | ||
| return False | ||
|
|
||
| storage_service = StorageService() | ||
Uh oh!
There was an error while loading. Please reload this page.