Source code for core_genai.interfaces.scheduler

# -*- coding: utf-8 -*-

"""
Base interfaces (sync & async) for all GenAI agent
implementations.
"""

import sys
from abc import ABC
from abc import abstractmethod
from datetime import datetime
from typing import Any
from typing import List
from typing import Optional
from typing import TypedDict

from core_genai.interfaces.agent import UsageMetadata

if sys.version_info >= (3, 11):
    from typing import NotRequired

else:
    from typing_extensions import NotRequired


[docs] class ScheduledJobMetadata(TypedDict): """ Normalized response contract for scheduling a batch job across all providers. """ job_id: str """Provider-assigned batch job identifier.""" model: str """Model used for the batch requests.""" batch_name: NotRequired[str] """Human-readable batch name. Grok only.""" created_at: NotRequired[datetime] """Timestamp when the batch job was created."""
[docs] class ScheduledJobResponse(TypedDict): """Normalized response contract for batch job results across all providers.""" job_id: str """Provider-assigned batch job identifier.""" results: Optional[List[Any]] """List of individual request results; None if the job has not completed successfully.""" error: Optional[Any] """Provider error details; None if the job succeeded or has not yet finished.""" state: Any """Current job state as reported by the provider.""" start_time: NotRequired[Optional[datetime]] """Job start time. Gemini only.""" end_time: NotRequired[Optional[datetime]] """Job end time. Gemini only.""" usage: NotRequired[Optional[UsageMetadata]] """ Aggregated token usage and cost across all succeeded requests. None if the job has not reached a terminal state or all requests failed. """
[docs] class IScheduler(ABC): """Interface for providers that support batch (scheduled) job execution."""
[docs] @abstractmethod async def schedule_job(self, requests: List[Any], model: str, **kwargs) -> Any: """Submits a batch of requests for deferred execution."""
[docs] @abstractmethod async def check_job_status(self, job_name: str) -> Any: """Returns the current status of a scheduled job."""
[docs] @abstractmethod async def extract_job_results(self, job_name: str) -> ScheduledJobResponse: """Retrieves the results of a completed scheduled job."""