Claude#
Agent#
Examples#
Inference#
import asyncio
from core_genai.agents.claude.agent import ClaudeAgent
from core_genai.interfaces import IAgent
from core_mixins.decorators.async_ import SyncWrapper
from environs import Env
env = Env()
env.read_env(".env")
agent: ClaudeAgent = IAgent.create_agent("ClaudeAgent", api_key=env("CLAUDE_API_KEY"))
MODEL = "claude-haiku-4-5"
PROMPT = [{"role": "user", "content": "Explain how AI works in a few words"}]
async def run_async() -> None:
response = await agent.analyze(model=MODEL, prompt=PROMPT)
print("[async]", response.content[0].text) # type: ignore[union-attr]
print(f"cost: {agent.get_cost(response)}")
def run_sync() -> None:
with SyncWrapper(agent) as sync_agent:
response = sync_agent.analyze(model=MODEL, prompt=PROMPT)
print("[sync]", response.content[0].text) # type: ignore[union-attr]
if __name__ == "__main__":
asyncio.run(run_async())
run_sync()
Batch inference#
Each request uses Anthropic’s native Request type with a custom_id and
params (model, max_tokens, messages). Poll check_job_status until the
state is "ended", then stream results via extract_job_results.
import asyncio
from anthropic.types.beta.messages.batch_create_params import Request
from core_genai.agents.claude.agent import ClaudeAgent
from core_genai.interfaces import IAgent
from environs import Env
env = Env()
env.read_env(".env")
MODEL = "claude-haiku-4-5"
POLL_INTERVAL = 30
PROMPTS = [
"Explain what machine learning is in one sentence.",
"Explain what a neural network is in one sentence.",
"Explain what reinforcement learning is in one sentence.",
]
agent: ClaudeAgent = IAgent.create_agent("ClaudeAgent", api_key=env("CLAUDE_API_KEY"))
def build_requests() -> list[Request]:
return [
Request(
custom_id=f"request-{i}",
params={
"model": MODEL,
"max_tokens": 256,
"messages": [{"role": "user", "content": prompt}],
},
)
for i, prompt in enumerate(PROMPTS)
]
async def poll_until_done(batch_id: str) -> str:
while True:
state = await agent.check_job_status(batch_id)
print(f" status: {state}")
if state in ClaudeAgent.TERMINAL_STATES:
return state
await asyncio.sleep(POLL_INTERVAL)
async def main() -> None:
requests = build_requests()
print(f"Scheduling batch job with {len(requests)} request(s)...")
job = await agent.schedule_job(requests=requests, model=MODEL)
batch_id = job["job_id"]
print(f"Job scheduled: {batch_id} (created: {job['created_at']})")
print("Polling for completion...")
final_state = await poll_until_done(batch_id)
print(f"Job finished with state: {final_state}")
result = await agent.extract_job_results(batch_id)
responses = result["results"] or []
print(f"\n{len(responses)} response(s):\n")
for resp in responses:
i = int(resp.custom_id.split("-")[-1])
if resp.result.type == "succeeded":
msg = resp.result.message
text = next((b.text for b in msg.content if b.type == "text"), "")
else:
text = f"[{resp.result.type}]"
print(f"[{i}] {PROMPTS[i]!r}")
print(f" {text}\n")
async def extract_results(batch_id: str) -> None:
result = await agent.extract_job_results(batch_id)
responses = result["results"] or []
print(f"{batch_id}: {len(responses)} response(s)\n")
for resp in responses:
i = int(resp.custom_id.split("-")[-1])
if resp.result.type == "succeeded":
msg = resp.result.message
text = next((b.text for b in msg.content if b.type == "text"), "")
else:
text = f"[{resp.result.type}]"
print(f"[{i}] {PROMPTS[i]!r}")
print(f" {text}\n")
async def check_batches(*batch_ids: str) -> None:
for batch_id in batch_ids:
state = await agent.check_job_status(batch_id)
print(f"{batch_id}: {state}")
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(check_batches("msgbatch_..."))
asyncio.run(extract_results("msgbatch_..."))