-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathhybrid_server.py
More file actions
653 lines (553 loc) · 23 KB
/
hybrid_server.py
File metadata and controls
653 lines (553 loc) · 23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
#!/usr/bin/env python3
"""Fast docling server using DocumentConverter singleton.
A lightweight FastAPI server optimized for hybrid PDF processing:
1. Using a single DocumentConverter instance (no per-request initialization)
2. Returns only JSON (DoclingDocument format) - markdown/HTML generated by Java
Usage:
opendataloader-pdf-hybrid [--port PORT] [--host HOST] [--ocr-lang LANG] [--force-ocr]
[--device DEVICE]
[--enrich-formula] [--enrich-picture-description]
[--max-file-size MB]
# Default: http://localhost:5002
opendataloader-pdf-hybrid
# Custom port
opendataloader-pdf-hybrid --port 5003
# Chinese + English OCR with force full-page OCR
opendataloader-pdf-hybrid --ocr-lang "ch_sim,en" --force-ocr
# Korean OCR
opendataloader-pdf-hybrid --ocr-lang "ko"
# Explicitly use Apple Silicon GPU (MPS)
opendataloader-pdf-hybrid --device mps
# Force CPU-only processing
opendataloader-pdf-hybrid --device cpu
# With formula enrichment (LaTeX extraction)
opendataloader-pdf-hybrid --enrich-formula
# With picture description (alt text generation)
opendataloader-pdf-hybrid --enrich-picture-description
# Combined: OCR + enrichments
opendataloader-pdf-hybrid --ocr-lang "en" --enrich-formula --enrich-picture-description
API Endpoints:
GET /health - Health check
POST /v1/convert/file - Convert PDF to JSON
The /v1/convert/file endpoint parameters:
- files: PDF file (multipart/form-data)
- page_ranges: Page range to process (optional)
Requirements:
Install with hybrid extra: pip install opendataloader-pdf[hybrid]
"""
import argparse
import asyncio
import logging
import os
import re
import sys
import tempfile
import threading
import time
import traceback
from contextlib import asynccontextmanager
from typing import Any, Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Configuration
DEFAULT_HOST = "0.0.0.0"
DEFAULT_PORT = 5002
MAX_FILE_SIZE = 0 # No file size limit by default (0 = unlimited)
UPLOAD_CHUNK_SIZE = 1024 * 1024 # 1MB chunks for streaming upload
def _non_negative_int(value: str) -> int:
"""Argparse type validator that rejects negative integers."""
parsed = int(value)
if parsed < 0:
raise argparse.ArgumentTypeError("--max-file-size must be >= 0")
return parsed
# Global converter instance (initialized on startup with CLI options)
converter = None
# Serialize converter.convert() calls. The converter singleton was designed for
# sequential use; this lock keeps that guarantee while allowing the event loop
# to stay responsive via asyncio.to_thread().
_convert_lock = threading.Lock()
# Regex matching lone surrogates (U+D800..U+DFFF) and null characters
_INVALID_UNICODE_RE = re.compile(r"[\ud800-\udfff\x00]")
def _extract_failed_pages_from_errors(errors: list[str]) -> list[int]:
"""Extract failed page numbers from error messages.
Docling error messages follow the pattern "Page N: <error>" (e.g.,
"Page 26: std::bad_alloc"). Even when docling includes failed pages
in the pages dict as empty entries, the error messages reliably
indicate which pages actually failed.
Args:
errors: List of error message strings.
Returns:
Sorted list of 1-indexed page numbers that failed.
"""
failed = set()
page_pattern = re.compile(r"^Page\s+(\d+):")
for msg in errors:
m = page_pattern.match(msg)
if m:
failed.add(int(m.group(1)))
return sorted(failed)
def build_conversion_response(
status_value: str,
json_content: dict,
processing_time: float,
errors: list[str],
requested_pages: tuple[int, int] | None,
total_pages: int | None = None,
) -> dict:
"""Build a structured conversion response with status and failed page info.
When Docling encounters errors (e.g., std::bad_alloc in PDF preprocessing),
it may still include failed pages as empty entries in the pages dict.
This function combines two strategies to detect failed pages:
1. Parse page numbers from error messages ("Page N: <error>")
2. Detect pages missing from the output pages dict (gap detection)
Both results are merged (union) since each catches a different failure mode.
Args:
status_value: Docling ConversionStatus value as string (e.g., "success", "partial_success").
json_content: The exported document dict from Docling.
processing_time: Time taken for conversion in seconds.
errors: List of error message strings from Docling.
requested_pages: Tuple of (start, end) 1-indexed page range, or None for all pages.
total_pages: Total page count of the input document (from Docling InputDocument).
Used to detect boundary page failures when requested_pages is None.
Returns:
Response dict with status, document, errors, failed_pages, and processing_time.
"""
failed_pages: list[int] = []
if status_value == "partial_success":
# Strategy 1: Extract failed pages from error messages (reliable —
# docling may include failed pages as empty entries in the pages dict,
# making gap detection ineffective)
error_failed = set(_extract_failed_pages_from_errors(errors))
# Strategy 2: Detect pages missing from the pages dict (catches
# failures that don't produce "Page N:" error messages)
gap_failed: set[int] = set()
pages_dict = json_content.get("pages", {})
present_pages = set()
for k in pages_dict.keys():
try:
present_pages.add(int(k))
except (ValueError, TypeError):
logger.warning("Unexpected non-integer page key in Docling output: %r", k)
if requested_pages:
expected_pages = set(range(requested_pages[0], requested_pages[1] + 1))
elif total_pages is not None:
expected_pages = set(range(1, total_pages + 1))
elif present_pages:
logger.warning(
"No page range or total_pages available; boundary page failures cannot be detected"
)
expected_pages = set(range(min(present_pages), max(present_pages) + 1))
else:
expected_pages = set()
gap_failed = expected_pages - present_pages
# Union: each strategy catches a different failure mode
failed_pages = sorted(error_failed | gap_failed)
response: dict[str, Any] = {
"status": status_value,
"document": {
"json_content": json_content,
},
"processing_time": processing_time,
"errors": errors,
"failed_pages": failed_pages,
}
return response
def sanitize_unicode(data: Any) -> Any:
"""Recursively replace lone surrogates and null characters with U+FFFD.
Docling OCR can produce lone surrogates (U+D800-U+DFFF) and null characters
from PDFs with malformed font encodings. These pass through json.dumps(ensure_ascii=False)
but fail on .encode('utf-8') in Starlette's JSONResponse.render(), causing
UnicodeEncodeError and a 500 response.
This mirrors the Java-side TextProcessor.replaceUndefinedCharacters().
Args:
data: Arbitrary data structure (dict, list, str, or primitive) from
Docling's export_to_dict() output.
Returns:
The same structure with problematic characters replaced by U+FFFD.
"""
if isinstance(data, str):
return _INVALID_UNICODE_RE.sub("\ufffd", data)
if isinstance(data, dict):
return {k: sanitize_unicode(v) for k, v in data.items()}
if isinstance(data, list):
return [sanitize_unicode(item) for item in data]
return data
def _get_loop_setting() -> str:
"""Return the uvicorn event loop setting appropriate for the current platform.
uvloop is not supported on Windows, so we force 'asyncio' there.
On other platforms, 'auto' lets uvicorn use uvloop if available.
"""
if sys.platform == "win32":
return "asyncio"
return "auto"
def _check_dependencies():
"""Check if hybrid dependencies are installed."""
missing = []
try:
import uvicorn # noqa: F401
except ImportError:
missing.append("uvicorn")
try:
import fastapi # noqa: F401
except ImportError:
missing.append("fastapi")
try:
import docling # noqa: F401
except ImportError:
missing.append("docling")
if missing:
raise ImportError(
f"Missing dependencies: {', '.join(missing)}. "
"Install with: pip install opendataloader-pdf[hybrid]"
)
DEFAULT_PICTURE_DESCRIPTION_PROMPT = "Describe what you see in this image. Include any text, numbers, labels, and data values visible."
def create_converter(
force_full_page_ocr: bool = False,
ocr_lang: list[str] | None = None,
enrich_formula: bool = False,
enrich_picture_description: bool = False,
picture_description_prompt: str | None = None,
device: str = "auto",
):
"""Create a DocumentConverter with the specified options.
Args:
force_full_page_ocr: If True, force OCR on all pages regardless of text content.
If False (default), OCR only where needed.
ocr_lang: List of EasyOCR language codes (e.g., ["ch_sim", "en"]).
If None, uses EasyOCR default languages.
enrich_formula: If True, enable formula enrichment (LaTeX extraction).
enrich_picture_description: If True, enable picture description (alt text generation).
picture_description_prompt: Custom prompt for picture description. If None, uses default.
device: Accelerator device for model inference. Options: "auto", "cpu", "cuda", "mps", "xpu".
"auto" lets Docling select the best available device. Default: "auto".
"""
from docling.datamodel.accelerator_options import AcceleratorOptions
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import (
EasyOcrOptions,
PdfPipelineOptions,
PictureDescriptionVlmOptions,
TableFormerMode,
TableStructureOptions,
)
from docling.document_converter import DocumentConverter, PdfFormatOption
ocr_options = EasyOcrOptions(force_full_page_ocr=force_full_page_ocr)
if ocr_lang:
ocr_options.lang = ocr_lang
# Configure picture description options with custom prompt
picture_description_options = None
if enrich_picture_description:
prompt = picture_description_prompt or DEFAULT_PICTURE_DESCRIPTION_PROMPT
picture_description_options = PictureDescriptionVlmOptions(
repo_id="HuggingFaceTB/SmolVLM-256M-Instruct",
prompt=prompt,
generation_config={"max_new_tokens": 300, "do_sample": False},
)
pipeline_kwargs = {
"do_ocr": True,
"do_table_structure": True,
"ocr_options": ocr_options,
"table_structure_options": TableStructureOptions(mode=TableFormerMode.ACCURATE),
"do_formula_enrichment": enrich_formula,
"do_picture_description": enrich_picture_description,
"generate_picture_images": enrich_picture_description,
"accelerator_options": AcceleratorOptions(device=device),
}
if picture_description_options is not None:
pipeline_kwargs["picture_description_options"] = picture_description_options
pipeline_options = PdfPipelineOptions(**pipeline_kwargs)
return DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
def create_app(
force_ocr: bool = False,
ocr_lang: list[str] | None = None,
enrich_formula: bool = False,
enrich_picture_description: bool = False,
picture_description_prompt: str | None = None,
max_file_size: int = MAX_FILE_SIZE,
device: str = "auto",
):
"""Create and configure the FastAPI application.
Args:
force_ocr: If True, force full-page OCR on all pages.
ocr_lang: List of EasyOCR language codes (e.g., ["ch_sim", "en"]).
enrich_formula: If True, enable formula enrichment (LaTeX extraction).
enrich_picture_description: If True, enable picture description (alt text generation).
picture_description_prompt: Custom prompt for picture description.
max_file_size: Maximum file size in bytes. 0 means no limit (default).
device: Accelerator device for model inference ("auto", "cpu", "cuda", "mps", "xpu").
"""
from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import JSONResponse
@asynccontextmanager
async def lifespan(_app: FastAPI):
"""Lifespan context manager for startup and shutdown events."""
global converter
lang_str = ",".join(ocr_lang) if ocr_lang else "default"
enrichments = []
if enrich_formula:
enrichments.append("formula")
if enrich_picture_description:
enrichments.append("picture-description")
enrichment_str = ",".join(enrichments) if enrichments else "none"
logger.info(
f"Initializing DocumentConverter "
f"(force_ocr={force_ocr}, lang={lang_str}, enrichments={enrichment_str}, device={device})..."
)
start = time.perf_counter()
converter = create_converter(
force_full_page_ocr=force_ocr,
ocr_lang=ocr_lang,
enrich_formula=enrich_formula,
enrich_picture_description=enrich_picture_description,
picture_description_prompt=picture_description_prompt,
device=device,
)
elapsed = time.perf_counter() - start
logger.info(f"DocumentConverter initialized in {elapsed:.2f}s")
yield
# Cleanup on shutdown (if needed)
app = FastAPI(
title="Docling Fast Server",
description="Fast PDF conversion using docling SDK with singleton pattern",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/health")
def health():
"""Health check endpoint."""
return {"status": "ok"}
@app.post("/v1/convert/file")
async def convert_file(
files: UploadFile = File(...),
page_ranges: Optional[str] = Form(default=None),
):
"""Convert PDF file to JSON (DoclingDocument format).
Only JSON output is provided - markdown and HTML are generated by
Java processors for consistent reading order application.
Args:
files: The PDF file to convert
page_ranges: Page range string "start-end" (e.g., "1-5") (optional)
Returns:
JSON response with document content.
"""
global converter
if converter is None:
return JSONResponse(
{"status": "failure", "errors": ["Server not initialized"]},
status_code=503,
)
# Parse page_ranges string to tuple
page_range_tuple = None
if page_ranges:
try:
parts = page_ranges.split("-")
if len(parts) == 2:
page_range_tuple = (int(parts[0]), int(parts[1]))
except ValueError:
pass
# Stream upload to temp file and enforce size incrementally
tmp_path = None
total_size = 0
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as tmp:
tmp_path = tmp.name
while True:
chunk = await files.read(UPLOAD_CHUNK_SIZE)
if not chunk:
break
total_size += len(chunk)
if max_file_size > 0 and total_size > max_file_size:
tmp.close()
os.unlink(tmp_path)
return JSONResponse(
{
"status": "failure",
"errors": [f"File size exceeds maximum allowed ({max_file_size // (1024*1024)}MB)"],
},
status_code=413,
)
tmp.write(chunk)
try:
def _do_convert():
with _convert_lock:
t0 = time.perf_counter()
if page_range_tuple:
res = converter.convert(tmp_path, page_range=page_range_tuple)
else:
res = converter.convert(tmp_path)
return res, time.perf_counter() - t0
result, processing_time = await asyncio.to_thread(_do_convert)
# Export to JSON (DoclingDocument format)
json_content = result.document.export_to_dict()
# Sanitize lone surrogates and null chars from OCR output to prevent
# UnicodeEncodeError in Starlette's JSONResponse.render()
json_content = sanitize_unicode(json_content)
# Extract status and errors from Docling ConversionResult
from docling.datamodel.base_models import ConversionStatus
status_value = result.status.value if hasattr(result.status, "value") else str(result.status)
errors = [getattr(e, "error_message", str(e)) for e in result.errors] if result.errors else []
# Get total page count for accurate failed-page detection
input_page_count = getattr(result.input, "page_count", None) if result.input else None
if result.status == ConversionStatus.PARTIAL_SUCCESS:
logger.warning(
"Docling returned partial_success: %d error(s), failed_pages will be reported",
len(errors),
)
response = build_conversion_response(
status_value=status_value,
json_content=json_content,
processing_time=processing_time,
errors=errors,
requested_pages=page_range_tuple,
total_pages=input_page_count,
)
return JSONResponse(response)
except Exception as e:
logger.error(f"PDF conversion failed: {e}\n{traceback.format_exc()}")
return JSONResponse(
{
"status": "failure",
"errors": ["PDF conversion failed. Check server logs for details."],
},
status_code=500,
)
finally:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
return app
def main():
"""Run the server."""
_check_dependencies()
import uvicorn
parser = argparse.ArgumentParser(description="Docling Fast Server for opendataloader-pdf")
parser.add_argument(
"--host",
default=DEFAULT_HOST,
help=f"Host to bind to (default: {DEFAULT_HOST})",
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PORT,
help=f"Port to bind to (default: {DEFAULT_PORT})",
)
parser.add_argument(
"--log-level",
default="info",
choices=["debug", "info", "warning", "error"],
help="Log level (default: info)",
)
parser.add_argument(
"--force-ocr",
action="store_true",
help="Force full-page OCR on all pages (default: auto-detect)",
)
parser.add_argument(
"--ocr-lang",
type=str,
default=None,
help="OCR languages (comma-separated EasyOCR codes, e.g., 'ch_sim,en'). Default: EasyOCR default",
)
parser.add_argument(
"--enrich-formula",
action="store_true",
default=False,
help="Enable formula enrichment model (LaTeX extraction)",
)
parser.add_argument(
"--no-enrich-formula",
action="store_false",
dest="enrich_formula",
)
parser.add_argument(
"--enrich-picture-description",
action="store_true",
default=False,
help="Enable picture description model (alt text generation using SmolVLM)",
)
parser.add_argument(
"--no-enrich-picture-description",
action="store_false",
dest="enrich_picture_description",
)
parser.add_argument(
"--picture-description-prompt",
type=str,
default=None,
help="Custom prompt for picture description. If not set, uses default prompt optimized for charts and images.",
)
parser.add_argument(
"--max-file-size",
type=_non_negative_int,
default=MAX_FILE_SIZE,
help="Maximum upload file size in MB. 0 means no limit (default: 0).",
)
parser.add_argument(
"--device",
type=str,
default="auto",
choices=["auto", "cpu", "cuda", "mps", "xpu"],
help="Accelerator device for model inference: auto (default), cpu, cuda, mps (Apple Silicon), xpu (Intel GPU).",
)
args = parser.parse_args()
# Parse ocr_lang
ocr_lang = None
if args.ocr_lang:
ocr_lang = [lang.strip() for lang in args.ocr_lang.split(",") if lang.strip()]
# Build enrichment log message
enrichments = []
if args.enrich_formula:
enrichments.append("formula")
if args.enrich_picture_description:
enrichments.append("picture-description")
# Log accelerator detection
try:
import torch
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
cuda_version = torch.version.cuda
logger.info(f"Accelerator: CUDA — {gpu_name} (CUDA {cuda_version})")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
logger.info("Accelerator: MPS (Apple Silicon)")
elif hasattr(torch, "xpu") and torch.xpu.is_available():
logger.info("Accelerator: XPU (Intel GPU)")
else:
logger.info("Accelerator: CPU (no GPU detected)")
except ImportError:
logger.info("Accelerator: CPU (PyTorch not installed)")
if args.device != "auto":
logger.info(f"Device override: --device {args.device}")
# Convert MB to bytes (0 stays 0 = unlimited)
max_file_size_bytes = args.max_file_size * 1024 * 1024 if args.max_file_size > 0 else 0
logger.info(f"Starting Docling Fast Server on http://{args.host}:{args.port}")
logger.info(f"OCR settings: force_ocr={args.force_ocr}, lang={ocr_lang or 'default'}")
if max_file_size_bytes > 0:
logger.info(f"Max file size: {args.max_file_size}MB")
else:
logger.info("Max file size: unlimited")
if enrichments:
logger.info(f"Enrichments enabled: {', '.join(enrichments)}")
app = create_app(
force_ocr=args.force_ocr,
ocr_lang=ocr_lang,
enrich_formula=args.enrich_formula,
enrich_picture_description=args.enrich_picture_description,
picture_description_prompt=args.picture_description_prompt,
max_file_size=max_file_size_bytes,
device=args.device,
)
uvicorn.run(
app,
host=args.host,
port=args.port,
log_level=args.log_level,
loop=_get_loop_setting(),
)
if __name__ == "__main__":
main()