Event-call-agent
Implementation-accurate documentation of an AI-powered outbound voice system: Sara (bilingual English/Farsi event-invitation assistant), LiveKit SIP telephony, S3 call recording, and Twilio-style recording notification; CSV lead upload and batch/single call dispatch via LiveKit CLI.
Problem
The Challenge
Context
System automates outbound event-invitation calls using an AI voice agent (Sara), records calls to S3, and notifies an external backend (Twilio-style endpoint) with recording URL and metadata. A separate flow supports CSV lead upload and batch or single call dispatch via LiveKit CLI.
User Pain Points
Manual outbound event invitations
Need for call recording and delivery to external system
Why Existing Solutions Failed
Manual outbound invitations do not scale; call recording and delivery to an external system are required for audit, compliance, and integration with existing backends that expect a webhook or HTTP callback with recording details.
Goals & Metrics
What We Set Out to Achieve
Objectives
- 01Place AI-powered outbound calls (Sara) via LiveKit SIP to E.164 numbers
- 02Record calls as audio-only OGG to AWS S3 with deterministic key
- 03Set S3 object ACL to public-read and POST Twilio-style JSON to RECORDING_STATUS_ENDPOINT when egress completes
- 04Accept CSV leads and dispatch single or batch calls via LiveKit CLI (lk dispatch create)
- 05Run Sara voice agent in room with OpenAI Realtime and bilingual event-invitation prompts
Success Metrics
- 01Outbound SIP calls placed via LiveKit to E.164 numbers
- 02Calls recorded as OGG to S3 under recordings/<user_id>/<call_id>.ogg
- 03Recording URL and metadata POSTed to external endpoint on egress COMPLETE
- 04CSV upload and batch/single dispatch via lk dispatch create
- 05Sara agent joins room and conducts conversation with Realtime voice
User Flow
User Journey
Two main flows: (1) GET /call or /call/personalized → trunk ensure → SIP + egress → watcher → ACL + POST; (2) POST /upload-csv → leads in memory → POST /call-single or /call-all → lk dispatch create → worker joins → Sara session.
Architecture
System Design
Two FastAPI apps (app.py: SIP, egress, POST; main.py: CSV, lk dispatch) and LiveKit agent worker (worker.py). Services: LiveKit (SIP, egress, rooms), Telnyx SIP trunk, OpenAI, AWS S3, RECORDING_STATUS_ENDPOINT. No database; in-memory csv_data_store and EGRESS_CONTEXT; persistent trunk_id.txt and S3.
Frontend
Backend
Services
Databases / state
Data Flow
How Data Moves
Client → app.py (to_number, user_id, contact_name, company_name); app.py → LiveKit (SIP, egress); LiveKit → S3 (OGG); app.py polls egress, then S3 ACL and POST to RECORDING_STATUS_ENDPOINT. Client → main.py (CSV); main.py → lk dispatch create; LiveKit → worker (room metadata).
Core Features
Key Functionality
SIP trunk create and check
What it does
Creates LiveKit SIP outbound trunk (Telnyx) and writes trunk_id to trunk_id.txt; validates trunk address and DID
Why it matters
app.py: create_trunk(), check_trunk_config(), GET /trunk/create
Implementation
LiveKit API CreateSIPOutboundTrunkRequest; list_sip_outbound_trunk and filter by trunk_id; validate address sip.telnyx.com and numbers
Generic and personalized outbound call with egress
What it does
Places SIP call via LiveKit, starts RoomComposite Egress (audio OGG) to S3 with deterministic key; returns call and egress info
Why it matters
app.py: GET /call, GET /call/personalized, make_sara_call()
Implementation
CreateSIPParticipantRequest with participant_metadata (initial_message, contact_name, company_name, user_id, call_id); RoomCompositeEgressRequest with EncodedFileOutput OGG to S3; EGRESS_CONTEXT and asyncio.create_task(watch_egress_and_post)
Egress watcher and recording notification
What it does
Polls LiveKit egress status until terminal; on COMPLETE sets S3 object ACL to public-read and POSTs Twilio-style JSON to RECORDING_STATUS_ENDPOINT
Why it matters
app.py: watch_egress_and_post(), make_s3_object_public(), send_recording_status()
Implementation
Poll every 2s with 30-min deadline; get_egress or list_egress; on COMPLETE pop context, boto3 put_object_acl, build RecordingUrl, requests.post with params userId, to and JSON body
CSV upload and lead storage
What it does
Parses CSV (Phone Number, Name/Contact Name, Company Name), validates phone length and required fields, stores leads in memory; optional auto_call with delay
Why it matters
main.py: POST /upload-csv, parse_csv_content()
Implementation
csv.DictReader; normalize phone (+ prefix, digits); column name variants; append to csv_data_store; optional thread for dispatch with time.sleep(delay_seconds)
Lead list and get by index
What it does
Returns all leads or single lead by index
Why it matters
main.py: GET /leads, GET /leads/{index}
Implementation
Return csv_data_store or csv_data_store[index]; 404 if index out of range
Single and batch call dispatch (LiveKit CLI)
What it does
Queues one or many calls via subprocess lk dispatch create with metadata (phone_number, company_data, user_id)
Why it matters
main.py: dispatch_personalized_call(), POST /call-single, /call-lead/{index}, /call-all, /call
Implementation
BackgroundTasks or threading; subprocess.run(['lk','dispatch','create','--new-room','--agent-name','my-telephony-agent-win','--metadata', json]); HARDCODED_USER_ID in metadata
Sara voice agent (worker)
What it does
LiveKit agent joins room, runs OpenAI Realtime voice session with event-invitation instructions, speaks SARA_WELCOME_MESSAGE and handles conversation; bilingual English/Farsi prompts
Why it matters
worker.py: entrypoint(), SaraEventAgent, prompts.py
Implementation
AgentSession with openai.realtime.RealtimeModel (voice alloy), RoomInputOptions with BVCTelephony(); session.generate_reply for initial greeting; metadata contact_name, company_name appended to SARA_INSTRUCTIONS
Sara greeting for app.py
What it does
Builds initial_message for room_metadata from SaraAgent (if importable) or prompts.SARA_WELCOME_MESSAGE
Why it matters
app.py: make_sara_call(); sara_agent.SaraAgent
Implementation
Optional import sara_agent; SaraAgent(contact_name, company_name).initial_message or SARA_WELCOME_MESSAGE
Technical Challenges
Problems We Solved
Why This Was Hard
App uses polling instead of LiveKit egress webhooks; must track egress_id to context (user_id, to_number, s3_key, call_id) and run long-lived background task
Our Solution
In-memory EGRESS_CONTEXT map; watch_egress_and_post polls get_egress/list_egress every 2s until terminal; on COMPLETE set ACL and POST to external endpoint
Why This Was Hard
app.py uses LiveKit SIP API directly; main.py uses lk dispatch create subprocess with different agent name; no single entrypoint
Our Solution
Two FastAPI apps (app.py and main.py); app.py for SIP+egress+POST flow; main.py for CSV and CLI dispatch
Why This Was Hard
External endpoint needs recording URL after egress; egress file path must be known before upload completes
Our Solution
s3_key_for(call_id, user_id) = recordings/<user_id>/<call_id>.ogg with uuid-based call_id; same key in EGRESS_CONTEXT and RoomCompositeEgressRequest filepath
Engineering Excellence
Performance, Security & Resilience
Performance
- Egress poll interval 2s with 30-minute deadline
- worker.py: asyncio.wait_for timeouts (15s connect, 30s session start, 15s greeting)
- OpenAI Realtime and BVCTelephony for low-latency voice in worker
Error Handling
- Missing/invalid trunk: create or 400/500
- Sara/prompts load failure: 500
- CSV parse: skip invalid rows, continue
- Lead index out of range: 404
- Dispatch subprocess non-zero: RuntimeError, 500
- Egress polling exception: log and continue
- ACL or POST failure: log warning, continue POST if ACL failed
Security
- No authentication on FastAPI routes
- Secrets from environment (LIVEKIT_*, TELNYX_*, OPENAI_*, AWS_*)
- S3 objects set to public-read after egress
- main.py CORS allow_origins=['*']
- RECORDING_STATUS_ENDPOINT POST has no auth; ngrok-skip-browser-warning header only
Design Decisions
Visual & UX Choices
Frontend
Rationale
Frontend not present in repository; main.py references static/index.html and /static
Details
API-only interaction for app.py (GET /call, /call/personalized); main.py: upload CSV, then call endpoints to dispatch; optional auto_call on upload
Impact
The Result
What We Achieved
Outbound SIP calls placed via LiveKit to E.164 numbers. Calls recorded as OGG to S3 with deterministic path (recordings/<user_id>/<call_id>.ogg). When egress completes, app sets object ACL to public-read and POSTs Twilio-style JSON (RecordingUrl, CallSid, RecordingDuration, RecordingStatus) to a configurable HTTP endpoint. CSV lead upload and batch/single call dispatch via LiveKit CLI. Sara voice agent joins rooms, runs OpenAI Realtime session with bilingual event-invitation prompts, and conducts conversation with BVCTelephony for low-latency voice.
Who It Helped
Operators or systems that need automated outbound event-invitation or lead calls, S3-backed recording, and a single HTTP callback with recording details compatible with Twilio-style consumers.
Why It Matters
Demonstrates working integration of LiveKit SIP, egress, and agents with OpenAI Realtime, S3, and an external notification endpoint; documents trade-offs (polling vs webhooks, two apps, in-memory state, no auth) as in the implementation.
Verification
Measurable Outcomes
Each outcome verified against reference implementations or test suites.
Outbound SIP calls to E.164 numbers via LiveKit
OGG recordings to S3 with deterministic path
Recording URL and metadata POSTed to external endpoint on egress COMPLETE
CSV lead upload and batch/single dispatch via lk dispatch create
Sara agent runs in room with bilingual event-invitation flow and Realtime voice
Reflections
Key Learnings
Technical Learnings
- Egress completion without webhooks: in-memory EGRESS_CONTEXT and polling (2s, 30-min deadline) tie egress_id to context for ACL and POST.
- Deterministic S3 key (recordings/<user_id>/<call_id>.ogg) allows building RecordingUrl before egress finishes.
- Two call paths (app.py SIP+egress vs main.py lk dispatch) reflect distinct use cases; worker timeouts (15s, 30s, 15s) avoid hangs.
Architectural Insights
- Dual FastAPI apps (app.py for SIP+egress+POST; main.py for CSV+dispatch) with no shared router.
- No database: csv_data_store and EGRESS_CONTEXT are process-local; restart loses in-flight egress and leads.
- Library-first agent (worker.py, prompts.py) with optional SaraAgent import in app.py for greeting.
What I'd Improve
- Unify or document call paths; persist egress context and leads (e.g. Redis/DB); optional egress webhooks; expose /egress/stop and /trunk/status; user_id from request in main.py; API auth and rate limiting; presigned URLs for recordings; POST retries and idempotency.
Roadmap
Future Enhancements
Unify or document call paths: single entrypoint or clear documentation of which worker serves which app.
Persist egress context and leads (e.g. Redis, DB) so restart does not lose in-flight egress or uploaded leads.
Optional egress webhooks when available; keep polling as fallback.
Expose /egress/stop and /trunk/status for operational visibility.
Accept user_id from request (or auth) in main.py instead of HARDCODED_USER_ID.
Add API-level authentication and rate limiting for /call, /trunk/create, /upload-csv, /call-all if exposed to untrusted clients.
Consider presigned URLs or private bucket with signed URLs instead of public-read ACL for recordings.
Retry POST to RECORDING_STATUS_ENDPOINT on transient failures; consider idempotency key.
Add minimal static UI under static/ or remove static/index.html and /static mount from main.py.
Add boto3 to requirements.txt for reproducible installs.
