Source code for core_genai.interfaces.scheduler
# -*- coding: utf-8 -*-
"""
Base interfaces (sync & async) for all GenAI agent
implementations.
"""
import sys
from abc import ABC
from abc import abstractmethod
from datetime import datetime
from typing import Any
from typing import List
from typing import Optional
from typing import TypedDict
from core_genai.interfaces.agent import UsageMetadata
if sys.version_info >= (3, 11):
from typing import NotRequired
else:
from typing_extensions import NotRequired
[docs]
class ScheduledJobResponse(TypedDict):
"""Normalized response contract for batch job results across all providers."""
job_id: str
"""Provider-assigned batch job identifier."""
results: Optional[List[Any]]
"""List of individual request results; None if the job has not completed successfully."""
error: Optional[Any]
"""Provider error details; None if the job succeeded or has not yet finished."""
state: Any
"""Current job state as reported by the provider."""
start_time: NotRequired[Optional[datetime]]
"""Job start time. Gemini only."""
end_time: NotRequired[Optional[datetime]]
"""Job end time. Gemini only."""
usage: NotRequired[Optional[UsageMetadata]]
"""
Aggregated token usage and cost across all succeeded requests. None
if the job has not reached a terminal state or all requests failed.
"""
[docs]
class IScheduler(ABC):
"""Interface for providers that support batch (scheduled) job execution."""
[docs]
@abstractmethod
async def schedule_job(self, requests: List[Any], model: str, **kwargs) -> Any:
"""Submits a batch of requests for deferred execution."""
[docs]
@abstractmethod
async def check_job_status(self, job_name: str) -> Any:
"""Returns the current status of a scheduled job."""