Event-call-agent

Event-call-agent

Event-call-agent

Implementation-accurate documentation of an AI-powered outbound voice system: Sara (bilingual English/Farsi event-invitation assistant), LiveKit SIP telephony, S3 call recording, and Twilio-style recording notification; CSV lead upload and batch/single call dispatch via LiveKit CLI.

Role:Software Engineer
Year:
PythonFastAPIuvicornLiveKitLiveKit APILiveKit AgentsLiveKit plugins: OpenAI, Silero, noise-cancellationOpenAI (LLM, TTS, STT, Realtime)Telnyx (SIP trunk)AWS S3boto3python-dotenvrequestspydantic

Problem

The Challenge

Context

System automates outbound event-invitation calls using an AI voice agent (Sara), records calls to S3, and notifies an external backend (Twilio-style endpoint) with recording URL and metadata. A separate flow supports CSV lead upload and batch or single call dispatch via LiveKit CLI.

User Pain Points

1

Manual outbound event invitations

2

Need for call recording and delivery to external system

Why Existing Solutions Failed

Manual outbound invitations do not scale; call recording and delivery to an external system are required for audit, compliance, and integration with existing backends that expect a webhook or HTTP callback with recording details.

Goals & Metrics

What We Set Out to Achieve

Objectives

  • 01Place AI-powered outbound calls (Sara) via LiveKit SIP to E.164 numbers
  • 02Record calls as audio-only OGG to AWS S3 with deterministic key
  • 03Set S3 object ACL to public-read and POST Twilio-style JSON to RECORDING_STATUS_ENDPOINT when egress completes
  • 04Accept CSV leads and dispatch single or batch calls via LiveKit CLI (lk dispatch create)
  • 05Run Sara voice agent in room with OpenAI Realtime and bilingual event-invitation prompts

Success Metrics

  • 01Outbound SIP calls placed via LiveKit to E.164 numbers
  • 02Calls recorded as OGG to S3 under recordings/<user_id>/<call_id>.ogg
  • 03Recording URL and metadata POSTed to external endpoint on egress COMPLETE
  • 04CSV upload and batch/single dispatch via lk dispatch create
  • 05Sara agent joins room and conducts conversation with Realtime voice
Loading diagram...

User Flow

User Journey

Two main flows: (1) GET /call or /call/personalized → trunk ensure → SIP + egress → watcher → ACL + POST; (2) POST /upload-csv → leads in memory → POST /call-single or /call-all → lk dispatch create → worker joins → Sara session.

start
Start
action
GET /call or /call/personalized (to_number, user_id)
action
Ensure trunk; CreateSIPParticipant + RoomCompositeEgress to S3
action
Poll egress every 2s; on COMPLETE set ACL and POST to endpoint
action
POST /upload-csv; POST /call-single or /call-all → lk dispatch create
action
Worker joins room; Sara Realtime session, greeting, conversation
end
End
Loading diagram...

Architecture

System Design

Two FastAPI apps (app.py: SIP, egress, POST; main.py: CSV, lk dispatch) and LiveKit agent worker (worker.py). Services: LiveKit (SIP, egress, rooms), Telnyx SIP trunk, OpenAI, AWS S3, RECORDING_STATUS_ENDPOINT. No database; in-memory csv_data_store and EGRESS_CONTEXT; persistent trunk_id.txt and S3.

Frontend

Referenced in main.py: static/index.html and /static (not present in repo)

Backend

app.py: FastAPI — SIP calls, egress to S3, egress polling, POST to external endpointmain.py: FastAPI — CSV upload, in-memory leads, call dispatch via lk dispatch createworker.py: LiveKit agent entrypoint — SaraEventAgent, OpenAI Realtime, BVCTelephony

Services

LiveKit (SIP, egress, rooms)LiveKit Agents (worker runtime)Telnyx SIP trunk (sip.telnyx.com)OpenAI (LLM, TTS, STT, Realtime)AWS S3 (egress storage, boto3 ACL)External HTTP endpoint (RECORDING_STATUS_ENDPOINT) for Twilio-style JSON POST

Databases / state

None; in-memory: csv_data_store (main.py), EGRESS_CONTEXT (app.py)Persistent: trunk_id.txt, S3 objects
Loading diagram...

Data Flow

How Data Moves

Client → app.py (to_number, user_id, contact_name, company_name); app.py → LiveKit (SIP, egress); LiveKit → S3 (OGG); app.py polls egress, then S3 ACL and POST to RECORDING_STATUS_ENDPOINT. Client → main.py (CSV); main.py → lk dispatch create; LiveKit → worker (room metadata).

1
Client → app.py
GET /call or /call/personalized: to_number, user_id, optional contact_name, company_name
2
app.py → LiveKit
CreateSIPParticipantRequest, RoomCompositeEgressRequest (make_sara_call)
3
LiveKit → S3
Audio OGG egress to recordings/<user_id>/<call_id>.ogg
4
app.py → LiveKit
Egress status poll (get_egress or list_egress) every 2s (watch_egress_and_post)
5
app.py → S3
put_object_acl public-read on egress COMPLETE
6
app.py → RECORDING_STATUS_ENDPOINT
POST JSON: RecordingUrl, CallSid, RecordingDuration, RecordingStatus; query userId, to
7
Client → main.py
POST /upload-csv: CSV file, optional auto_call, delay_seconds
8
main.py → LiveKit CLI
lk dispatch create --metadata (phone_number, company_data, user_id)
9
LiveKit → worker.py
Room metadata (contact_name, company_name) on agent job dispatch
Loading diagram...

Core Features

Key Functionality

01

SIP trunk create and check

What it does

Creates LiveKit SIP outbound trunk (Telnyx) and writes trunk_id to trunk_id.txt; validates trunk address and DID

Why it matters

app.py: create_trunk(), check_trunk_config(), GET /trunk/create

Implementation

LiveKit API CreateSIPOutboundTrunkRequest; list_sip_outbound_trunk and filter by trunk_id; validate address sip.telnyx.com and numbers

02

Generic and personalized outbound call with egress

What it does

Places SIP call via LiveKit, starts RoomComposite Egress (audio OGG) to S3 with deterministic key; returns call and egress info

Why it matters

app.py: GET /call, GET /call/personalized, make_sara_call()

Implementation

CreateSIPParticipantRequest with participant_metadata (initial_message, contact_name, company_name, user_id, call_id); RoomCompositeEgressRequest with EncodedFileOutput OGG to S3; EGRESS_CONTEXT and asyncio.create_task(watch_egress_and_post)

03

Egress watcher and recording notification

What it does

Polls LiveKit egress status until terminal; on COMPLETE sets S3 object ACL to public-read and POSTs Twilio-style JSON to RECORDING_STATUS_ENDPOINT

Why it matters

app.py: watch_egress_and_post(), make_s3_object_public(), send_recording_status()

Implementation

Poll every 2s with 30-min deadline; get_egress or list_egress; on COMPLETE pop context, boto3 put_object_acl, build RecordingUrl, requests.post with params userId, to and JSON body

04

CSV upload and lead storage

What it does

Parses CSV (Phone Number, Name/Contact Name, Company Name), validates phone length and required fields, stores leads in memory; optional auto_call with delay

Why it matters

main.py: POST /upload-csv, parse_csv_content()

Implementation

csv.DictReader; normalize phone (+ prefix, digits); column name variants; append to csv_data_store; optional thread for dispatch with time.sleep(delay_seconds)

05

Lead list and get by index

What it does

Returns all leads or single lead by index

Why it matters

main.py: GET /leads, GET /leads/{index}

Implementation

Return csv_data_store or csv_data_store[index]; 404 if index out of range

06

Single and batch call dispatch (LiveKit CLI)

What it does

Queues one or many calls via subprocess lk dispatch create with metadata (phone_number, company_data, user_id)

Why it matters

main.py: dispatch_personalized_call(), POST /call-single, /call-lead/{index}, /call-all, /call

Implementation

BackgroundTasks or threading; subprocess.run(['lk','dispatch','create','--new-room','--agent-name','my-telephony-agent-win','--metadata', json]); HARDCODED_USER_ID in metadata

07

Sara voice agent (worker)

What it does

LiveKit agent joins room, runs OpenAI Realtime voice session with event-invitation instructions, speaks SARA_WELCOME_MESSAGE and handles conversation; bilingual English/Farsi prompts

Why it matters

worker.py: entrypoint(), SaraEventAgent, prompts.py

Implementation

AgentSession with openai.realtime.RealtimeModel (voice alloy), RoomInputOptions with BVCTelephony(); session.generate_reply for initial greeting; metadata contact_name, company_name appended to SARA_INSTRUCTIONS

08

Sara greeting for app.py

What it does

Builds initial_message for room_metadata from SaraAgent (if importable) or prompts.SARA_WELCOME_MESSAGE

Why it matters

app.py: make_sara_call(); sara_agent.SaraAgent

Implementation

Optional import sara_agent; SaraAgent(contact_name, company_name).initial_message or SARA_WELCOME_MESSAGE

Technical Challenges

Problems We Solved

Why This Was Hard

App uses polling instead of LiveKit egress webhooks; must track egress_id to context (user_id, to_number, s3_key, call_id) and run long-lived background task

Our Solution

In-memory EGRESS_CONTEXT map; watch_egress_and_post polls get_egress/list_egress every 2s until terminal; on COMPLETE set ACL and POST to external endpoint

Why This Was Hard

app.py uses LiveKit SIP API directly; main.py uses lk dispatch create subprocess with different agent name; no single entrypoint

Our Solution

Two FastAPI apps (app.py and main.py); app.py for SIP+egress+POST flow; main.py for CSV and CLI dispatch

Why This Was Hard

External endpoint needs recording URL after egress; egress file path must be known before upload completes

Our Solution

s3_key_for(call_id, user_id) = recordings/<user_id>/<call_id>.ogg with uuid-based call_id; same key in EGRESS_CONTEXT and RoomCompositeEgressRequest filepath

Engineering Excellence

Performance, Security & Resilience

Performance

  • Egress poll interval 2s with 30-minute deadline
  • worker.py: asyncio.wait_for timeouts (15s connect, 30s session start, 15s greeting)
  • OpenAI Realtime and BVCTelephony for low-latency voice in worker
🛡️

Error Handling

  • Missing/invalid trunk: create or 400/500
  • Sara/prompts load failure: 500
  • CSV parse: skip invalid rows, continue
  • Lead index out of range: 404
  • Dispatch subprocess non-zero: RuntimeError, 500
  • Egress polling exception: log and continue
  • ACL or POST failure: log warning, continue POST if ACL failed
🔒

Security

  • No authentication on FastAPI routes
  • Secrets from environment (LIVEKIT_*, TELNYX_*, OPENAI_*, AWS_*)
  • S3 objects set to public-read after egress
  • main.py CORS allow_origins=['*']
  • RECORDING_STATUS_ENDPOINT POST has no auth; ngrok-skip-browser-warning header only
Loading diagram...

Design Decisions

Visual & UX Choices

Frontend

Rationale

Frontend not present in repository; main.py references static/index.html and /static

Details

API-only interaction for app.py (GET /call, /call/personalized); main.py: upload CSV, then call endpoints to dispatch; optional auto_call on upload

Impact

The Result

What We Achieved

Outbound SIP calls placed via LiveKit to E.164 numbers. Calls recorded as OGG to S3 with deterministic path (recordings/<user_id>/<call_id>.ogg). When egress completes, app sets object ACL to public-read and POSTs Twilio-style JSON (RecordingUrl, CallSid, RecordingDuration, RecordingStatus) to a configurable HTTP endpoint. CSV lead upload and batch/single call dispatch via LiveKit CLI. Sara voice agent joins rooms, runs OpenAI Realtime session with bilingual event-invitation prompts, and conducts conversation with BVCTelephony for low-latency voice.

👥

Who It Helped

Operators or systems that need automated outbound event-invitation or lead calls, S3-backed recording, and a single HTTP callback with recording details compatible with Twilio-style consumers.

Why It Matters

Demonstrates working integration of LiveKit SIP, egress, and agents with OpenAI Realtime, S3, and an external notification endpoint; documents trade-offs (polling vs webhooks, two apps, in-memory state, no auth) as in the implementation.

Verification

Measurable Outcomes

Each outcome verified against reference implementations or test suites.

01

Outbound SIP calls to E.164 numbers via LiveKit

02

OGG recordings to S3 with deterministic path

03

Recording URL and metadata POSTed to external endpoint on egress COMPLETE

04

CSV lead upload and batch/single dispatch via lk dispatch create

05

Sara agent runs in room with bilingual event-invitation flow and Realtime voice

Reflections

Key Learnings

Technical Learnings

  • Egress completion without webhooks: in-memory EGRESS_CONTEXT and polling (2s, 30-min deadline) tie egress_id to context for ACL and POST.
  • Deterministic S3 key (recordings/<user_id>/<call_id>.ogg) allows building RecordingUrl before egress finishes.
  • Two call paths (app.py SIP+egress vs main.py lk dispatch) reflect distinct use cases; worker timeouts (15s, 30s, 15s) avoid hangs.

Architectural Insights

  • Dual FastAPI apps (app.py for SIP+egress+POST; main.py for CSV+dispatch) with no shared router.
  • No database: csv_data_store and EGRESS_CONTEXT are process-local; restart loses in-flight egress and leads.
  • Library-first agent (worker.py, prompts.py) with optional SaraAgent import in app.py for greeting.

What I'd Improve

  • Unify or document call paths; persist egress context and leads (e.g. Redis/DB); optional egress webhooks; expose /egress/stop and /trunk/status; user_id from request in main.py; API auth and rate limiting; presigned URLs for recordings; POST retries and idempotency.

Roadmap

Future Enhancements

01

Unify or document call paths: single entrypoint or clear documentation of which worker serves which app.

02

Persist egress context and leads (e.g. Redis, DB) so restart does not lose in-flight egress or uploaded leads.

03

Optional egress webhooks when available; keep polling as fallback.

04

Expose /egress/stop and /trunk/status for operational visibility.

05

Accept user_id from request (or auth) in main.py instead of HARDCODED_USER_ID.

06

Add API-level authentication and rate limiting for /call, /trunk/create, /upload-csv, /call-all if exposed to untrusted clients.

07

Consider presigned URLs or private bucket with signed URLs instead of public-read ACL for recordings.

08

Retry POST to RECORDING_STATUS_ENDPOINT on transient failures; consider idempotency key.

09

Add minimal static UI under static/ or remove static/index.html and /static mount from main.py.

10

Add boto3 to requirements.txt for reproducible installs.