ChatGPT#
Agent#
Utilities#
Examples#
Inference#
import asyncio
from environs import Env
from core_mixins.decorators.async_ import SyncWrapper
from core_genai.interfaces import IAgent
from core_genai.chatgpt.agent import ChatGPTAgent
env = Env()
env.read_env(".env")
agent: ChatGPTAgent = IAgent.create_agent("ChatGPTAgent", api_key=env("CHATGPT_API_KEY"))
MODEL = "gpt-4.1-nano"
PROMPT = [{"role": "user", "content": "Explain how AI works in a few words"}]
async def run_async() -> None:
output = await agent.analyze(model=MODEL, prompt=PROMPT)
print("[async]", agent.get_text(output))
print("[meta]", agent.get_metadata(output))
def run_sync() -> None:
with SyncWrapper(agent) as sync_agent:
output = sync_agent.analyze(model=MODEL, prompt=PROMPT)
print("[sync]", agent.get_text(output))
if __name__ == "__main__":
asyncio.run(run_async())
run_sync()
Batch inference#
Pass a list of BatchRequest objects with
prompt (required) and custom_id (optional). schedule_job builds
the full JSONL record and uploads the file automatically. Poll until the state
is in TERMINAL_STATES, then parse results from extract_job_results.
import asyncio
from core_genai.agents import ChatGPTAgent
from core_genai.agents.chatgpt import BatchRequest
from core_genai.interfaces import IAgent
from environs import Env
env = Env()
env.read_env(".env")
MODEL = "gpt-4o-mini"
POLL_INTERVAL = 30
PROMPTS = [
"Explain what machine learning is in one sentence.",
"Explain what a neural network is in one sentence.",
"Explain what reinforcement learning is in one sentence.",
]
agent: ChatGPTAgent = IAgent.create_agent("ChatGPTAgent", api_key=env("CHATGPT_API_KEY"))
def build_requests() -> list[BatchRequest]:
return [
BatchRequest(
custom_id=f"request-{i}",
prompt=[{"role": "user", "content": prompt}],
)
for i, prompt in enumerate(PROMPTS)
]
async def poll_until_done(batch_id: str) -> str:
while True:
state = await agent.check_job_status(batch_id)
print(f" status: {state}")
if state in ChatGPTAgent.TERMINAL_STATES:
return state
await asyncio.sleep(POLL_INTERVAL)
async def main() -> None:
requests = build_requests()
print(f"Scheduling batch job with {len(requests)} request(s)...")
job = await agent.schedule_job(requests=requests, model=MODEL)
batch_id = job["job_id"]
print(f"Job scheduled: {batch_id} (created: {job['created_at']})")
print("Polling for completion...")
final_state = await poll_until_done(batch_id)
print(f"Job finished with state: {final_state}")
result = await agent.extract_job_results(batch_id)
if result["error"]:
print(f"Error: {result['error']}")
return
responses = result["results"] or []
print(f"\n{len(responses)} response(s):\n")
for i, resp in enumerate(responses, 1):
body = resp.get("response", {}).get("body", {})
choices = body.get("choices", [])
text = choices[0].get("message", {}).get("content", "") if choices else ""
print(f"[{i}] {PROMPTS[i - 1]!r}")
print(f" {text}\n")
async def extract_results(batch_id: str) -> None:
result = await agent.extract_job_results(batch_id)
if result["error"]:
print(f"Error: {result['error']}")
return
responses = result["results"] or []
print(f"{batch_id}: {len(responses)} response(s)\n")
for i, resp in enumerate(responses, 1):
body = resp.get("response", {}).get("body", {})
choices = body.get("choices", [])
text = choices[0].get("message", {}).get("content", "") if choices else ""
print(f"[{i}] {text}\n")
async def check_batches(*batch_ids: str) -> None:
for batch_id in batch_ids:
state = await agent.check_job_status(batch_id)
print(f"{batch_id}: {state}")
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(check_batches(
"batch_...",
"batch_...",
))
asyncio.run(extract_results("batch_..."))