Gemini#

Agent#

Examples#

Inference#

import asyncio

from core_genai.agents.gemini.agent import GeminiAgent
from core_genai.interfaces import IAgent
from core_mixins.decorators.async_ import SyncWrapper
from environs import Env

env = Env()
env.read_env(".env")

agent: GeminiAgent = IAgent.create_agent("GeminiAgent", api_key=env("GEMINI_API_KEY"))

MODEL = "gemini-2.5-flash"
PROMPT = "Explain how AI works in a few words"


async def run_async() -> None:
    output = await agent.analyze(model=MODEL, prompt=PROMPT)
    print("[async]", agent.get_text(output))


def run_sync() -> None:
    with SyncWrapper(agent) as sync_agent:
        output = sync_agent.analyze(model=MODEL, prompt=PROMPT)
        print("[sync]", agent.get_text(output))


if __name__ == "__main__":
    asyncio.run(run_async())
    run_sync()

Batch inference#

Each request is an InlinedRequestDict with the model and prompt. schedule_job submits the batch; poll check_job_status until the state is terminal, then call extract_job_results.

import asyncio

from core_genai.agents.gemini.agent import GeminiAgent
from core_genai.interfaces import IAgent
from environs import Env
from google.genai.types import InlinedRequestDict
from google.genai.types import JobState

env = Env()
env.read_env(".env")

MODEL = "gemini-2.5-flash-lite"
POLL_INTERVAL = 30

PROMPTS = [
    "Explain what machine learning is in one sentence.",
    "Explain what a neural network is in one sentence.",
    "Explain what reinforcement learning is in one sentence.",
]

agent: GeminiAgent = IAgent.create_agent("GeminiAgent", api_key=env("GEMINI_API_KEY"))


def build_requests() -> list[InlinedRequestDict]:
    return [
        InlinedRequestDict(model=MODEL, contents=prompt)
        for prompt in PROMPTS
    ]


def print_results(result: dict) -> None:
    if result["error"]:
        print(f"Error: {result['error']}")
        return

    responses = result["results"] or []
    print(f"{len(responses)} response(s):\n")
    for i, resp in enumerate(responses, 1):
        if resp.error:
            text = f"[error {resp.error.code}: {resp.error.message}]"
        elif resp.response:
            text = agent.get_text(resp.response)
        else:
            text = "[no response]"
        print(f"[{i}] {PROMPTS[i - 1]!r}")
        print(f"    {text}\n")


async def poll_until_done(job_id: str) -> JobState:
    while True:
        state = await agent.check_job_status(job_id)
        print(f"  status: {state.name}")

        if state in GeminiAgent.TERMINAL_STATES:
            return state

        await asyncio.sleep(POLL_INTERVAL)


async def main() -> None:
    requests = build_requests()
    print(f"Scheduling batch job with {len(requests)} request(s)...")

    job = await agent.schedule_job(requests=requests, model=MODEL)
    job_id = job["job_id"]
    print(f"Job scheduled: {job_id}  (created: {job.get('created_at')})")

    print("Polling for completion...")
    final_state = await poll_until_done(job_id)
    print(f"Job finished with state: {final_state.name}")

    print_results(await agent.extract_job_results(job_id))


async def extract_results(job_id: str) -> None:
    print_results(await agent.extract_job_results(job_id))


async def check_batches(*job_ids: str) -> None:
    for job_id in job_ids:
        state = await agent.check_job_status(job_id)
        print(f"{job_id}: {state.name}")


if __name__ == "__main__":
    asyncio.run(main())
    asyncio.run(check_batches("batches/..."))
    asyncio.run(extract_results("batches/..."))