Documentation Index
Fetch the complete documentation index at: https://docs.sciforium.com/llms.txt
Use this file to discover all available pages before exploring further.
Building a deep agentic document-understanding system on Sciforium
This notebook is a hands-on tour. By the end you will have built, stage by stage, an investor-grade due-diligence pipeline that takes a PDF and returns a cited, fact-checked memo (plus a podcast and a cover image).
Everything is defined inline — no project imports. Each stage uses one primitive of the Sciforium API. The same pattern underlies most useful production agent pipelines: cheap model ×N in parallel → mid-tier model ×M in parallel → best model ×1 for synthesis.
PDF --> parse --> extract (×5 parallel) --> ground (Exa) --> verify (batched) --> analyze (×6 parallel) --> synthesize --> memo + audio + image
What you’ll learn
| Primitive | Endpoint | Where it shows up |
|---|
chat() | POST /v1/chat/completions | Every LLM call |
chat_with_attachment() | same, with a file content part | Metrics extraction, verification |
parse_file() | POST /api/attachments/parse | Turning a PDF into text |
synthesize_speech() | POST /v1/audio/speech | Podcast TTS |
generate_image() | POST /v1/images/generations | Cover image |
| Agentic pattern | Stage |
|---|
| Many small LLM calls in parallel | Extraction, analysis |
| Bounded concurrency (semaphore) around a third-party API | Exa grounding |
| Batching work so a big task streams progress | Verification |
| Evidence fusion — feeding the output of cheap calls into a best-in-class synthesis | Memo |
| Schema-constrained JSON output with robust parsing | Verification, podcast scripting |
| Plan → fan-out generate → stitch | Podcast |
Prerequisites
- A Sciforium API key in
.env as SCIFORIUM_API_KEY.
- Optional:
EXA_API_KEY for web grounding.
pip install openai httpx (already in this project’s venv).
0 · Setup
Load environment variables, set base URLs, and sanity-check the key. No pipeline imports — everything from here down is written in the notebook.
import asyncio, base64, contextvars, json, os, wave
from pathlib import Path
import httpx
from openai import AsyncOpenAI
from IPython.display import Markdown, display
def load_dotenv(path='.env'):
if not Path(path).exists():
return
for line in Path(path).read_text().splitlines():
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
k, _, v = line.partition('=')
os.environ.setdefault(k.strip(), v.strip().strip('"').strip("'"))
load_dotenv()
API_KEY = os.environ.get('SCIFORIUM_API_KEY', '')
BASE_URL = os.environ.get('SCIFORIUM_BASE_URL', 'https://api.sciforium.com').rstrip('/')
EXA_KEY = os.environ.get('EXA_API_KEY', '')
assert API_KEY, 'Set SCIFORIUM_API_KEY in .env before continuing.'
print(f'API base : {BASE_URL}')
print(f'API key : ****{API_KEY[-4:]}')
print(f'Exa key : {"set" if EXA_KEY else "not set (web grounding will be skipped)"}')
1 · Configuration — models and tasks
The whole pipeline is parameterised by a MODELS dict and two task lists. This is the only place you choose capability vs. cost per stage.
Rule of thumb:
- Extractor — cheap and fast. You’ll call it many times in parallel.
- Verifier — mid-tier. We batch the work so throughput > single-call capability.
- Analyst — mid/strong. Fewer calls, each needs to reason across evidence.
- Synthesizer — best model you have. One call, highest-stakes output.
MODELS = {
'extractor': 'openai/gpt-oss-120b',
'verifier': 'openai/gpt-oss-120b',
'analyst': 'openai/gpt-oss-120b',
'synthesizer': 'openai/gpt-oss-120b',
'tts': 'Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice',
'image': 'tencent/HunyuanImage-3.0-Instruct',
}
EXTRACTION_TASKS = [
# (name, prompt, attach_original_file)
('summary', 'Summarise this document in exactly 3 sentences.', False),
('key_points', 'List the 5 most important claims or findings. One per line, no bullets.', False),
('metrics', "Extract every number, percentage, monetary amount, date, or financial data point. "
"Format strictly as one per line: 'METRIC: <value> — CONTEXT: <where in the doc>'. "
"Be exhaustive. Preserve units exactly.", True),
('entities', "List every person, company, product, and date mentioned. For people tag the role "
"(founder/investor/customer/advisor/etc). One per line. Format: 'NAME — ROLE'.", False),
('risks', 'List any risks, caveats, or open questions the document raises. One per line.', False),
]
ANALYSIS_TASKS = [
('market', 'Assess the market opportunity. Are TAM/SAM/SOM claims credible against the web evidence?'),
('team', 'Assess the founding team. Use web evidence to check backgrounds and red flags.'),
('moat', 'Assess competitive moat. Who are the real competitors based on web evidence?'),
('economics', 'Assess unit economics. Flag any metric the verification layer marked UNVERIFIED or CONTRADICTED.'),
('risks', 'Synthesise the most material risks. Rank by severity.'),
('assessment', 'One paragraph — what would need to be true for this to be a strong investment?'),
]
for tier, model in MODELS.items():
print(f' {tier:<14} {model}')
2 · Primitive — chat() one-shot completion
Sciforium speaks the OpenAI Chat Completions protocol. Anything that works with openai-python against OpenAI works here — just point base_url at https://api.sciforium.com/v1.
We use AsyncOpenAI so downstream stages can fan out with asyncio.gather.
async def chat(model: str, system: str, user: str) -> str:
client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL + '/v1')
response = await client.chat.completions.create(
model=model,
messages=[
{'role': 'system', 'content': system},
{'role': 'user', 'content': user},
],
)
return response.choices[0].message.content.strip()
# Smoke test — make sure our key works before we build anything on top.
reply = await chat(
MODELS['extractor'],
'You are terse.',
'In five words: what is due diligence?',
)
print(reply)
When numeric fidelity matters — tables, figures, dense financials — a lossy text parse is a bad input. Sciforium’s chat endpoint accepts a file content part with a base64 data URL. The gateway extracts native bytes before the model sees the message.
We’ll use this in two places: the metrics extraction task, and the whole verification stage.
MIME_MAP = {
'pdf': 'application/pdf',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'doc': 'application/msword',
'txt': 'text/plain',
'md': 'text/markdown',
'csv': 'text/csv',
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
}
async def chat_with_attachment(model: str, system: str, user: str, file_path: str) -> str:
path = Path(file_path)
mime = MIME_MAP.get(path.suffix.lstrip('.').lower(), 'application/octet-stream')
data_url = f'data:{mime};base64,{base64.b64encode(path.read_bytes()).decode()}'
client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL + '/v1')
response = await client.chat.completions.create(
model=model,
messages=[
{'role': 'system', 'content': system},
{'role': 'user', 'content': [
{'type': 'text', 'text': user},
{'type': 'file', 'file': {'filename': path.name, 'file_data': data_url}},
]},
],
)
return response.choices[0].message.content.strip()
4 · Primitive — parse_file() layout-aware parser
chat_with_attachment is great for a single question about one doc, but for multi-stage pipelines you usually want plain text once — otherwise every extraction step re-parses the same file on the server.
Sciforium’s /api/attachments/parse endpoint returns per-page structured text with layout preserved. Call it once, cache the output, reuse for every text-only stage.
async def parse_file(file_path: str) -> str:
path = Path(file_path)
mime = MIME_MAP.get(path.suffix.lstrip('.').lower(), 'application/octet-stream')
encoded = base64.b64encode(path.read_bytes()).decode()
async with httpx.AsyncClient(timeout=300) as client:
response = await client.post(
BASE_URL + '/api/attachments/parse',
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {API_KEY}',
'x-api-key': API_KEY,
},
json={'files': [{
'url': f'data:{mime};base64,{encoded}',
'filename': path.name,
'media_type': mime,
}]},
)
response.raise_for_status()
content = response.json()['results'][0].get('content') or {}
pages = content.get('pages') or []
if pages:
return '\n\n'.join(f"[Page {p['page']}]\n{p['text']}" for p in pages if p.get('text', '').strip())
return content.get('text', '')
5 · Pick a document
Set DOC_PATH to your PDF. The fallback below grabs the newest PDF in jobs/ if you’ve already run something through the web UI.
DOC_PATH = None # e.g. 'docs/my_deck.pdf'
FOCUS = 'the most important findings and risks for an investment decision'
if DOC_PATH is None:
candidates = sorted(Path('jobs').glob('*/*.pdf'), key=lambda p: p.stat().st_mtime, reverse=True)
if not candidates:
raise RuntimeError('No PDF found under jobs/. Set DOC_PATH to a document path.')
DOC_PATH = str(candidates[0])
print(f'Document : {DOC_PATH}')
print(f'Size : {Path(DOC_PATH).stat().st_size // 1024} KB')
document_text = await parse_file(DOC_PATH)
print(f'\nExtracted {len(document_text):,} characters.\n')
print(document_text[:1200])
Five tasks, fired simultaneously with asyncio.gather. Because each task is a separate HTTP request, the wall-clock cost is dominated by the slowest one — not the sum.
Notice the use_attachment=True flag on the metrics task. For that one call we skip our own parse output and send the original PDF — the gateway’s native extractor preserves table numerics better than anything a general parser does with a bag of words.
async def run_extractions(document_text: str, file_path: str) -> dict:
model = MODELS['extractor']
system = 'You are a precise document analyst. Extract only what is asked. Be concise and exhaustive.'
async def one(name: str, prompt: str, use_attachment: bool):
if use_attachment:
result = await chat_with_attachment(model, system, prompt, file_path)
tag = ' [attached]'
else:
user = f'<document>\n{document_text}\n</document>\n\n{prompt}'
result = await chat(model, system, user)
tag = ''
print(f' ✓ {name}{tag}')
return name, result
print(f'[Extract] {len(EXTRACTION_TASKS)} tasks in parallel (model: {model})')
pairs = await asyncio.gather(*[one(n, p, a) for n, p, a in EXTRACTION_TASKS])
return dict(pairs)
extractions = await run_extractions(document_text, DOC_PATH)
for name, text in extractions.items():
display(Markdown(f'### {name}\n\n{text}'))
7 · Stage 2 — Grounding with Exa (bounded concurrency)
For every top claim and every named person we want external corroboration. Two rules:
- Fan out — queries are independent, so run them with
asyncio.gather.
- Bound the fan — Exa (like any third-party API) rate-limits aggressive callers. A shared
asyncio.Semaphore caps how many requests are in flight. 4 is a safe default for free/basic tiers.
This is a pattern you’ll use any time you chain an LLM stage into an external API.
_EXA_SEMAPHORE = asyncio.Semaphore(4)
async def exa_search(query: str, num_results: int = 3) -> list:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
'https://api.exa.ai/search',
headers={'x-api-key': EXA_KEY, 'Content-Type': 'application/json'},
json={
'query': query,
'numResults': num_results,
'useAutoprompt': True,
'contents': {'text': {'maxCharacters': 800}},
},
)
response.raise_for_status()
return [
{'title': r.get('title', ''), 'url': r.get('url', ''), 'snippet': (r.get('text') or '').strip()}
for r in response.json().get('results', [])
]
async def run_grounding(extractions: dict, num_results: int = 3, max_queries: int = 8) -> dict:
if not EXA_KEY:
print('[Grounding] Skipped — EXA_API_KEY not set.')
return {}
queries = []
for line in extractions.get('key_points', '').splitlines():
if line.strip():
queries.append(('claim', line.strip()))
for line in extractions.get('entities', '').splitlines():
line = line.strip()
if not line:
continue
name = line.split('—', 1)[0].strip() if '—' in line else line
role = (line.split('—', 1)[1].lower() if '—' in line else '')
if any(r in role for r in ('founder', 'ceo', 'cto', 'exec', 'chief')):
queries.append(('person', f'{name} background prior company'))
queries.append(('person', f'{name} litigation controversy'))
else:
queries.append(('entity', name))
queries = queries[:max_queries]
print(f'[Grounding] {len(queries)} queries (max 4 in flight, {num_results} results each)')
async def one(kind: str, q: str):
async with _EXA_SEMAPHORE:
try:
results = await exa_search(q, num_results=num_results)
except Exception as e:
print(f' ! [{kind}] {q[:50]} ({type(e).__name__})')
return q, []
print(f' ✓ [{kind}] {q[:56]}')
return q, results
return dict(await asyncio.gather(*[one(k, q) for k, q in queries]))
grounding = await run_grounding(extractions, num_results=3, max_queries=8)
for q, hits in list(grounding.items())[:3]:
print(f'\nQUERY: {q}')
for r in hits:
print(f' - {r["title"][:80]}')
print(f' {r["url"]}')
8 · Stage 3 — Batched verification
We now ask the verifier model to re-check every extracted claim and metric against the original document. The naive implementation is one giant call with all 40 items — which turns into a multi-minute silent request that users abandon.
The fix is a pattern worth remembering: split the work into batches, fire the batches in parallel, log progress per batch. The user sees steady motion and the provider can serve the batches independently. Same total work, much better UX.
We also need to coax the model into returning JSON reliably. parse_json_response is a tiny utility that handles the three common failure modes (markdown fences, prose prelude, partial braces).
def parse_json_response(text: str):
text = (text or '').strip()
if '```' in text:
for part in text.split('```'):
part = part.strip()
if part.startswith('json'):
part = part[4:].lstrip()
if part.startswith(('{', '[')):
try:
return json.loads(part)
except json.JSONDecodeError:
continue
try:
return json.loads(text)
except json.JSONDecodeError:
pass
for opener, closer in (('{', '}'), ('[', ']')):
start, end = text.find(opener), text.rfind(closer)
if start != -1 and end > start:
try:
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
continue
raise ValueError(f'Could not parse JSON: {text[:200]}')
async def run_verification(extractions: dict, file_path: str, batch_size: int = 8) -> dict:
model = MODELS['verifier']
claims = []
for line in extractions.get('key_points', '').splitlines():
if line.strip():
claims.append({'kind': 'claim', 'text': line.strip()})
for line in extractions.get('metrics', '').splitlines():
if line.strip():
claims.append({'kind': 'metric', 'text': line.strip()})
if not claims:
print('[Verify] Nothing to check.')
return {'verdicts': [], 'counts': {}}
batches = [claims[i:i + batch_size] for i in range(0, len(claims), batch_size)]
print(f'[Verify] {len(claims)} items in {len(batches)} parallel batches (model: {model})')
system = ('You are a meticulous fact-checker. For each claim or metric, verify it against the '
'attached document. Respond ONLY with valid JSON — no prose, no fences.')
async def one(idx: int, batch: list):
user = (
'For each item below, mark status as one of: verified, partial, unverified, contradicted. '
'If present, quote the supporting passage in `evidence` (≤200 chars). '
'For metrics, require exact numeric match — approximations are "partial".\n\n'
'Return a JSON array:\n'
'[{"text": "<original>", "kind": "claim|metric", "status": "...", "evidence": "..."}]\n\n'
f'Items:\n{json.dumps(batch, indent=2)}'
)
try:
raw = await chat_with_attachment(model, system, user, file_path)
parsed = parse_json_response(raw)
if not isinstance(parsed, list):
parsed = []
except Exception as e:
print(f' ! batch {idx+1}/{len(batches)} failed: {e}')
return []
print(f' ✓ batch {idx+1}/{len(batches)} ({len(parsed)} verdicts)')
return parsed
results = await asyncio.gather(*(one(i, b) for i, b in enumerate(batches)))
verdicts = [v for batch in results for v in batch]
counts = {}
for v in verdicts:
s = v.get('status', 'unknown')
counts[s] = counts.get(s, 0) + 1
return {'verdicts': verdicts, 'counts': counts}
verifications = await run_verification(extractions, DOC_PATH)
print('\nCounts:', verifications['counts'])
for v in verifications['verdicts'][:8]:
print(f" [{v.get('status','?'):<12}] {v.get('text','')[:90]}")
9 · Stage 4 — Analysis with evidence fusion
Every analyst task sees the same context packet: extractions + verification verdicts + numbered web evidence + the investor’s focus. The system prompt tells the model to downgrade confidence on anything the verifier marked UNVERIFIED or CONTRADICTED — that’s how fact-checking actually propagates into reasoning.
All six analyses run in parallel.
async def run_analyses(extractions: dict, grounding: dict, verifications: dict, focus: str) -> dict:
model = MODELS['analyst']
system = ('You are a senior investment analyst. Reason carefully across ALL provided evidence. '
'When the verification layer flags a claim UNVERIFIED or CONTRADICTED, say so explicitly. '
'Never invent numbers.')
parts = [f'[{k.upper()}]\n{v}' for k, v in extractions.items()]
if verifications.get('verdicts'):
lines = [
f" - [{v.get('status','?').upper()}] {v.get('text','')}"
+ (f' (evidence: “{v.get("evidence","")[:160]}”)' if v.get('evidence') else '')
for v in verifications['verdicts']
]
parts.append('[VERIFICATION]\n' + '\n'.join(lines))
if grounding:
blocks = []
for query, results in grounding.items():
rows = '\n'.join(
f" [{i+1}] {r['title']}\n {r['url']}\n {r['snippet'][:300]}"
for i, r in enumerate(results)
)
blocks.append(f'QUERY: {query}\n{rows}')
parts.append('[WEB EVIDENCE]\n' + '\n\n'.join(blocks))
context = '\n\n'.join(parts)
async def one(name: str, prompt: str):
user = f'<evidence>\n{context}\n</evidence>\n\nInvestor focus: {focus}\n\n{prompt}'
result = await chat(model, system, user)
print(f' ✓ {name}')
return name, result
print(f'[Analyze] {len(ANALYSIS_TASKS)} tasks in parallel (model: {model})')
return dict(await asyncio.gather(*[one(n, p) for n, p in ANALYSIS_TASKS]))
analyses = await run_analyses(extractions, grounding, verifications, FOCUS)
for name, text in analyses.items():
display(Markdown(f'### {name}\n\n{text}'))
10 · Stage 5 — Synthesis with citation & confidence invariants
One call to the best model. The prompt enforces three invariants the reader can verify:
- Citation — every external fact gets an inline
[n] linked to a numbered Sources list.
- Confidence tagging — every company claim carries
[VERIFIED], [PARTIAL], [UNVERIFIED], or [CONTRADICTED], taken from the verifier’s output.
- No invented numbers — every metric must come from the extractions or verification.
Invariants like these are what turn a fluent LLM memo into one a real investor can rely on.
async def run_synthesis(extractions: dict, grounding: dict, verifications: dict,
analyses: dict, focus: str) -> dict:
model = MODELS['synthesizer']
print(f'[Synthesize] Writing the memo (model: {model})')
sources, registry = [], []
for query, results in grounding.items():
for r in results:
sid = len(sources) + 1
sources.append(f"[{sid}] {r.get('title','')} — {r.get('url','')}\n query: {query}")
registry.append({'id': sid, 'title': r.get('title', ''), 'url': r.get('url', '')})
sections = ['=== EXTRACTIONS ===']
sections += [f'[{k.upper()}]\n{v}' for k, v in extractions.items()]
if verifications.get('verdicts'):
sections += [
'=== SOURCE VERIFICATION ===',
*[f"[{v.get('status','?').upper()}] {v.get('kind','')}: {v.get('text','')}" for v in verifications['verdicts']],
]
if sources:
sections += ['=== NUMBERED WEB SOURCES — cite as [n] ===', *sources]
sections += ['=== ANALYSES ===']
sections += [f'[{k.upper()}]\n{v}' for k, v in analyses.items()]
system = (
'You are writing a due-diligence memo for a professional investor. Be direct, specific, '
'and decision-oriented. Plain prose; avoid bullet spam.\n\n'
'RULES — all are mandatory:\n'
' 1. Cite every external fact with an inline [n] matching the NUMBERED WEB SOURCES section.\n'
' 2. Tag every non-trivial quantitative/factual claim with its confidence: [VERIFIED], '
'[PARTIAL], [UNVERIFIED], or [CONTRADICTED].\n'
' 3. Never invent numbers. Every metric must appear in the extractions or verification.\n'
' 4. If verification contradicted any claim, flag it prominently in Risks.\n'
' 5. End with a `Sources` section listing every [n] you cited.'
)
user = (
f"<context>\n{chr(10).join(sections)}\n</context>\n\n"
f'Write a due-diligence memo focused on: {focus}\n\n'
'Structure: Executive Summary · Company & Market · Team · Traction & Financials · Competitive '
'Landscape & Moat · Risks & Open Questions · Recommendation · Sources.'
)
report = await chat(model, system, user)
return {'report': report, 'sources': registry}
synthesis = await run_synthesis(extractions, grounding, verifications, analyses, FOCUS)
display(Markdown(synthesis['report']))
11 · Stage 6 — Multimodal output (TTS + image)
Two more primitives and a nice orchestration pattern.
synthesize_speech — POST /v1/audio/speech returns WAV bytes.
generate_image — POST /v1/images/generations returns a base64 PNG.
synthesize_podcast — a mini-pipeline: ask the synthesizer to split the memo into short spoken chunks, TTS each chunk in parallel, stitch them into one WAV with silence gaps. This is the “plan → fan-out → stitch” pattern you can use for any long-form generation where latency matters.
The last cell runs it — skip if you don’t want to burn credits on audio.
async def synthesize_speech(text: str, output_path: str) -> None:
client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL + '/v1')
response = await client.audio.speech.create(
model=MODELS['tts'],
voice='Vivian',
input=text,
response_format='wav',
)
Path(output_path).write_bytes(response.content)
async def generate_image(prompt: str, output_path: str) -> None:
client = AsyncOpenAI(api_key=API_KEY, base_url=BASE_URL + '/v1')
response = await client.images.generate(
model=MODELS['image'],
prompt=prompt,
size='1024x1024',
n=1,
)
Path(output_path).write_bytes(base64.b64decode(response.data[0].b64_json))
def stitch_wav(paths: list, output_path: str, silence_ms: int = 350) -> None:
with wave.open(paths[0], 'rb') as probe:
params, framerate, sampwidth, nchannels = (
probe.getparams(), probe.getframerate(), probe.getsampwidth(), probe.getnchannels()
)
silence = b'\x00' * (int(framerate * silence_ms / 1000) * sampwidth * nchannels)
with wave.open(output_path, 'wb') as out:
out.setparams(params)
for i, p in enumerate(paths):
with wave.open(p, 'rb') as src:
out.writeframes(src.readframes(src.getnframes()))
if i < len(paths) - 1:
out.writeframes(silence)
async def synthesize_podcast(report: str, output_path: str) -> None:
print('[Podcast] Writing script...')
raw = await chat(
MODELS['synthesizer'],
'You are a podcast scriptwriter. Write punchy, natural spoken-word content.',
(
'Turn this report into 7–10 spoken chunks (~25 words each). '
'Return ONLY a JSON array: [{"text": "..."}]. No fences.\n\n'
f'Report:\n{report}'
),
)
try:
chunks = parse_json_response(raw)
except Exception:
chunks = [{'text': raw}]
print(f' ✓ {len(chunks)} chunks planned')
out_dir = Path(output_path).parent
async def tts_chunk(i, chunk):
tmp = str(out_dir / f'_chunk_{i:03d}.wav')
await synthesize_speech(chunk['text'], tmp)
return i, tmp
pairs = await asyncio.gather(*[tts_chunk(i, c) for i, c in enumerate(chunks)])
tmp_paths = [p for _, p in sorted(pairs)]
print('[Podcast] Stitching...')
stitch_wav(tmp_paths, output_path)
for p in tmp_paths:
Path(p).unlink(missing_ok=True)
print(f' ✓ {output_path}')
# Uncomment to actually run the output layer — generates demo_podcast.wav and demo_cover.png.
# from IPython.display import Audio, Image
#
# await synthesize_podcast(synthesis['report'], 'demo_podcast.wav')
# display(Audio('demo_podcast.wav'))
#
# img_prompt = await chat(
# MODELS['synthesizer'],
# 'You write one-sentence prompts for AI image generation.',
# f"One sentence describing a visual metaphor for this memo:\n\n{synthesis['report'][:1500]}",
# )
# await generate_image(img_prompt, 'demo_cover.png')
# display(Image('demo_cover.png'))
What you’ve built
A full document → cited memo pipeline, in roughly 250 lines of Python, sitting entirely on top of Sciforium primitives:
- One endpoint (
/v1/chat/completions) handled text extraction, verification, analysis, and synthesis.
- The same endpoint with a
file content part did multimodal grounding against the original PDF.
- A separate attachments endpoint gave you high-fidelity text once, cached for every downstream text-only call.
- TTS and image endpoints rounded it out into a multimodal deliverable.
Patterns worth keeping
- Stack models by cost. Cheap → mid → best, matched to call volume. Don’t use your biggest model for 40 parallel extractions.
- Fan out, then bound.
asyncio.gather for independent work; a Semaphore in front of any external API that can rate-limit you.
- Batch big prompts. Prefer five parallel 8-item prompts over one 40-item prompt — better throughput, better UX, better error isolation.
- Cache the parse. Run the attachments API once, reuse the text. Attach the file only for the passes that truly need native bytes.
- Enforce invariants in the synthesis prompt. Citations, confidence tags, no invented numbers — these are what separate a demo from a trustworthy artefact.
Where to go next
- Swap
MODELS values to try different verifiers, analysts, or synthesizers.
- Extend
EXTRACTION_TASKS / ANALYSIS_TASKS — new lenses cost roughly nothing since they fan out in parallel.
- Add retries with backoff around
chat() (see the Exa helper for the pattern) for production robustness.
- Batch mode: process many PDFs and bucket/rank them — same building blocks, wrapped in an outer
asyncio.Semaphore.