-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/resource management #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7dcd0fd
33ef631
c65d8fb
5c8f1ca
a16dd41
27d9ec6
25fc5a3
d6a65a6
2f425e9
effc1a4
5e532ad
8693850
f198b7c
76410ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, status | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| from sqlalchemy import select | ||
| from typing import List | ||
| import uuid | ||
| from ..database import get_db | ||
| from ..models.user import User | ||
| from ..models.resource import Resource | ||
| from ..schemas.resource import ResourceOut | ||
| from ..routers.auth import get_current_user | ||
| from ..services.storage import storage_service | ||
| from arq import create_pool | ||
| from ..config import settings | ||
| from arq.connections import RedisSettings | ||
|
|
||
| router = APIRouter(prefix="/resources", tags=["resources"]) | ||
|
|
||
| @router.post("/", response_model=ResourceOut) | ||
| async def upload_resource( | ||
| type: str = Form(...), | ||
| file: UploadFile = File(...), | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| if file.content_type not in ["application/pdf", "text/plain"]: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail="Only PDF and Text files are supported" | ||
| ) | ||
|
|
||
| # Chunked read with size enforcement (CodeRabbit fix) | ||
| MAX_BYTES = settings.MAX_FILE_SIZE_MB * 1024 * 1024 | ||
| CHUNK_SIZE = 1024 * 1024 # 1MB chunks | ||
| content = bytearray() | ||
| total_size = 0 | ||
|
|
||
| while True: | ||
| chunk = await file.read(CHUNK_SIZE) | ||
| if not chunk: | ||
| break | ||
| total_size += len(chunk) | ||
| if total_size > MAX_BYTES: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, | ||
| detail=f"File too large. Maximum size allowed is {settings.MAX_FILE_SIZE_MB}MB" | ||
| ) | ||
| content.extend(chunk) | ||
|
|
||
| # Generate unique filename for storage | ||
| ext = file.filename.split('.')[-1] | ||
| object_name = f"user_{current_user.id}/{uuid.uuid4()}.{ext}" | ||
|
|
||
| # Upload to DigitalOcean Spaces | ||
| file_url = storage_service.upload_file(content, object_name, file.content_type) | ||
| if not file_url: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| detail="Failed to upload file to storage" | ||
| ) | ||
|
|
||
| # Create DB record (not committed yet) | ||
| new_resource = Resource( | ||
| user_id=current_user.id, | ||
| filename=file.filename, | ||
| file_url=file_url, | ||
| type=type, | ||
| status="processing" | ||
| ) | ||
| db.add(new_resource) | ||
| await db.flush() # Flush to get the ID but don't commit | ||
|
|
||
| # Enqueue background extraction task before committing DB | ||
| try: | ||
| redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) | ||
| await redis.enqueue_job('extraction_task', str(new_resource.id)) | ||
|
|
||
| # Only commit if enqueue was successful | ||
| await db.commit() | ||
| await db.refresh(new_resource) | ||
| except Exception as e: | ||
| await db.rollback() | ||
| # Should also ideally delete the file from Spaces here if we were strict | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| detail=f"Failed to queue background task: {str(e)}" | ||
| ) | ||
|
|
||
| return new_resource | ||
|
|
||
| @router.get("/", response_model=List[ResourceOut]) | ||
| async def list_resources( | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| result = await db.execute( | ||
| select(Resource).where(Resource.user_id == current_user.id).order_by(Resource.created_at.desc()) | ||
| ) | ||
| return result.scalars().all() | ||
|
|
||
| @router.delete("/{resource_id}") | ||
| async def delete_resource( | ||
| resource_id: uuid.UUID, | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| result = await db.execute( | ||
| select(Resource).where(Resource.id == resource_id, Resource.user_id == current_user.id) | ||
| ) | ||
| resource = result.scalar_one_or_none() | ||
|
|
||
| if not resource: | ||
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found") | ||
|
|
||
| # Delete from Spaces | ||
| object_name = resource.file_url.replace(f"{settings.SPACES_PUBLIC_URL}/", "") | ||
| success = storage_service.delete_file(object_name) | ||
|
|
||
| if not success: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| detail="Failed to delete file from cloud storage. Database record preserved." | ||
| ) | ||
|
|
||
| # Delete from DB only after storage is confirmed deleted | ||
| await db.delete(resource) | ||
| await db.commit() | ||
|
shubhamxdd marked this conversation as resolved.
|
||
|
|
||
| return {"message": "Resource deleted successfully"} | ||
|
|
||
| @router.post("/{resource_id}/retry", response_model=ResourceOut) | ||
| async def retry_extraction( | ||
| resource_id: uuid.UUID, | ||
| db: AsyncSession = Depends(get_db), | ||
| current_user: User = Depends(get_current_user) | ||
| ): | ||
| result = await db.execute( | ||
| select(Resource).where(Resource.id == resource_id, Resource.user_id == current_user.id) | ||
| ) | ||
| resource = result.scalar_one_or_none() | ||
|
|
||
| if not resource: | ||
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found") | ||
|
|
||
| # Update status to processing (don't commit yet) | ||
| resource.status = "processing" | ||
|
|
||
| # Re-enqueue background extraction task before committing DB | ||
| try: | ||
| redis = await create_pool(RedisSettings.from_dsn(settings.REDIS_URL)) | ||
| await redis.enqueue_job('extraction_task', str(resource.id)) | ||
|
|
||
| # Only commit if enqueue was successful | ||
| await db.commit() | ||
| await db.refresh(resource) | ||
| except Exception as e: | ||
| await db.rollback() | ||
| raise HTTPException( | ||
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
| detail=f"Failed to queue background task: {str(e)}" | ||
| ) | ||
|
|
||
| return resource | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from pydantic import BaseModel, ConfigDict | ||
| from uuid import UUID | ||
| from datetime import datetime | ||
| from typing import Optional, List | ||
|
|
||
| class ResourceBase(BaseModel): | ||
| filename: str | ||
| type: str # notes | syllabus | past_paper | other | ||
|
|
||
| class ResourceOut(ResourceBase): | ||
| id: UUID | ||
| user_id: UUID | ||
| file_url: str | ||
| status: str # pending | processing | ready | failed | ||
| created_at: datetime | ||
|
|
||
| model_config = ConfigDict(from_attributes=True) | ||
|
|
||
| class ResourceList(BaseModel): | ||
| resources: List[ResourceOut] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| import boto3 | ||
| from botocore.exceptions import ClientError | ||
| from ..config import settings | ||
| import logging | ||
|
|
||
| class StorageService: | ||
| def __init__(self): | ||
| self.session = boto3.session.Session() | ||
| self.client = self.session.client( | ||
| 's3', | ||
| region_name=settings.SPACES_REGION, | ||
| endpoint_url=settings.SPACES_ENDPOINT, | ||
| aws_access_key_id=settings.SPACES_KEY, | ||
| aws_secret_access_key=settings.SPACES_SECRET | ||
| ) | ||
|
|
||
| def upload_file(self, file_content: bytes, object_name: str, content_type: str = 'application/pdf'): | ||
| try: | ||
| self.client.put_object( | ||
| Bucket=settings.SPACES_BUCKET, | ||
| Key=object_name, | ||
| Body=file_content, | ||
| ACL='public-read', | ||
| ContentType=content_type | ||
| ) | ||
| return f"{settings.SPACES_PUBLIC_URL}/{object_name}" | ||
|
Comment on lines
+19
to
+26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cat backend/app/services/storage.pyRepository: shubhamxdd/pyq-appl Length of output: 1487 🏁 Script executed: rg "put_object|upload|storage" backend/app --type py -l | head -20Repository: shubhamxdd/pyq-appl Length of output: 157 🏁 Script executed: rg "public-read\|ACL" backend --type py -A 3 -B 3Repository: shubhamxdd/pyq-appl Length of output: 45 🏁 Script executed: cat backend/app/routers/resources.pyRepository: shubhamxdd/pyq-appl Length of output: 4101 🏁 Script executed: cat backend/app/workers/tasks.pyRepository: shubhamxdd/pyq-appl Length of output: 6629 Use private ACL with pre-signed URLs instead of public-read for uploaded resources. Files are stored with Refactor to use private ACL and generate pre-signed URLs for temporary, authenticated access. Update the extraction task to use the boto3 client directly for internal file access rather than fetching via HTTP. 🤖 Prompt for AI Agents |
||
| except ClientError as e: | ||
| logging.error(f"Error uploading file to DigitalOcean Spaces: {e}") | ||
| return None | ||
|
|
||
| def delete_file(self, object_name: str): | ||
| try: | ||
| self.client.delete_object( | ||
| Bucket=settings.SPACES_BUCKET, | ||
| Key=object_name | ||
| ) | ||
| return True | ||
| except ClientError as e: | ||
| logging.error(f"Error deleting file from DigitalOcean Spaces: {e}") | ||
| return False | ||
|
|
||
| storage_service = StorageService() | ||
Uh oh!
There was an error while loading. Please reload this page.