Claude#

Agent#

Examples#

Inference#

import asyncio

from core_genai.agents.claude.agent import ClaudeAgent
from core_genai.interfaces import IAgent
from core_mixins.decorators.async_ import SyncWrapper
from environs import Env

env = Env()
env.read_env(".env")

agent: ClaudeAgent = IAgent.create_agent("ClaudeAgent", api_key=env("CLAUDE_API_KEY"))

MODEL = "claude-haiku-4-5"
PROMPT = [{"role": "user", "content": "Explain how AI works in a few words"}]

async def run_async() -> None:
    response = await agent.analyze(model=MODEL, prompt=PROMPT)
    print("[async]", response.content[0].text)  # type: ignore[union-attr]
    print(f"cost: {agent.get_cost(response)}")


def run_sync() -> None:
    with SyncWrapper(agent) as sync_agent:
        response = sync_agent.analyze(model=MODEL, prompt=PROMPT)
        print("[sync]", response.content[0].text)  # type: ignore[union-attr]


if __name__ == "__main__":
    asyncio.run(run_async())
    run_sync()

Batch inference#

Each request uses Anthropic’s native Request type with a custom_id and params (model, max_tokens, messages). Poll check_job_status until the state is "ended", then stream results via extract_job_results.

import asyncio

from anthropic.types.beta.messages.batch_create_params import Request
from core_genai.agents.claude.agent import ClaudeAgent
from core_genai.interfaces import IAgent
from environs import Env

env = Env()
env.read_env(".env")

MODEL = "claude-haiku-4-5"
POLL_INTERVAL = 30

PROMPTS = [
    "Explain what machine learning is in one sentence.",
    "Explain what a neural network is in one sentence.",
    "Explain what reinforcement learning is in one sentence.",
]

agent: ClaudeAgent = IAgent.create_agent("ClaudeAgent", api_key=env("CLAUDE_API_KEY"))


def build_requests() -> list[Request]:
    return [
        Request(
            custom_id=f"request-{i}",
            params={
                "model": MODEL,
                "max_tokens": 256,
                "messages": [{"role": "user", "content": prompt}],
            },
        )
        for i, prompt in enumerate(PROMPTS)
    ]


async def poll_until_done(batch_id: str) -> str:
    while True:
        state = await agent.check_job_status(batch_id)
        print(f"  status: {state}")

        if state in ClaudeAgent.TERMINAL_STATES:
            return state

        await asyncio.sleep(POLL_INTERVAL)


async def main() -> None:
    requests = build_requests()
    print(f"Scheduling batch job with {len(requests)} request(s)...")

    job = await agent.schedule_job(requests=requests, model=MODEL)
    batch_id = job["job_id"]
    print(f"Job scheduled: {batch_id}  (created: {job['created_at']})")

    print("Polling for completion...")
    final_state = await poll_until_done(batch_id)
    print(f"Job finished with state: {final_state}")

    result = await agent.extract_job_results(batch_id)

    responses = result["results"] or []
    print(f"\n{len(responses)} response(s):\n")
    for resp in responses:
        i = int(resp.custom_id.split("-")[-1])
        if resp.result.type == "succeeded":
            msg = resp.result.message
            text = next((b.text for b in msg.content if b.type == "text"), "")
        else:
            text = f"[{resp.result.type}]"
        print(f"[{i}] {PROMPTS[i]!r}")
        print(f"    {text}\n")


async def extract_results(batch_id: str) -> None:
    result = await agent.extract_job_results(batch_id)

    responses = result["results"] or []
    print(f"{batch_id}: {len(responses)} response(s)\n")
    for resp in responses:
        i = int(resp.custom_id.split("-")[-1])
        if resp.result.type == "succeeded":
            msg = resp.result.message
            text = next((b.text for b in msg.content if b.type == "text"), "")
        else:
            text = f"[{resp.result.type}]"
        print(f"[{i}] {PROMPTS[i]!r}")
        print(f"    {text}\n")


async def check_batches(*batch_ids: str) -> None:
    for batch_id in batch_ids:
        state = await agent.check_job_status(batch_id)
        print(f"{batch_id}: {state}")


if __name__ == "__main__":
    asyncio.run(main())
    asyncio.run(check_batches("msgbatch_..."))
    asyncio.run(extract_results("msgbatch_..."))