feat: update caching logic and improve provider usage fetching intervals

This commit is contained in:
null 2026-05-24 19:05:30 -05:00
parent 7d297df9aa
commit d406beec56
3 changed files with 89 additions and 23 deletions

View File

@ -32,13 +32,16 @@ ollama → GET {base_url}/api/tags (health-check only; no rate limits)
Caching Caching
------- -------
Results are cached per credential_id for CACHE_TTL_SECONDS (default 60s) to Results are cached by the effective provider credential source for
avoid hammering provider APIs on every page load. CACHE_TTL_SECONDS (default 60s) to avoid hammering provider APIs on every page
load. Multiple Pipeline credential rows that point at the same local Claude or
Codex login share one provider fetch.
""" """
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import hashlib
import json as _json_module import json as _json_module
import os import os
import re import re
@ -926,27 +929,76 @@ _cache: dict[str, tuple[datetime, ProviderUsageLive, int]] = {}
_inflight: dict[str, asyncio.Future[ProviderUsageLive]] = {} _inflight: dict[str, asyncio.Future[ProviderUsageLive]] = {}
def _get_cached(credential_id: str) -> ProviderUsageLive | None: def _get_cached(cache_key: str) -> ProviderUsageLive | None:
entry = _cache.get(credential_id) entry = _cache.get(cache_key)
if entry is None: if entry is None:
return None return None
cached_at, result, ttl = entry cached_at, result, ttl = entry
if (utcnow() - cached_at).total_seconds() > ttl: if (utcnow() - cached_at).total_seconds() > ttl:
del _cache[credential_id] del _cache[cache_key]
return None return None
return result return result
def _set_cached(credential_id: str, result: ProviderUsageLive, ttl: int = CACHE_TTL_SECONDS) -> None: def _set_cached(cache_key: str, result: ProviderUsageLive, ttl: int = CACHE_TTL_SECONDS) -> None:
_cache[credential_id] = (utcnow(), result, ttl) _cache[cache_key] = (utcnow(), result, ttl)
def _secret_fingerprint(value: str | None) -> str:
if not value:
return "none"
return hashlib.sha256(value.encode("utf-8")).hexdigest()[:16]
def _usage_cache_key(
credential_id: str,
provider: str,
api_key: str | None,
base_url: str | None,
session_key: str | None,
) -> str:
"""Return a cache key for the real upstream credential being used.
Local OAuth tokens are intentionally shared across credential rows, because
Pipeline's primary source of truth is the local machine's Claude/Codex login.
"""
normalized_provider = provider.lower()
normalized_base_url = (base_url or "").rstrip("/")
if normalized_provider == "anthropic":
local_oauth = _read_claude_local_oauth_token()
if local_oauth:
return f"anthropic:local-oauth:{_secret_fingerprint(local_oauth)}"
if session_key:
return f"anthropic:session:{_secret_fingerprint(session_key)}"
if api_key:
return f"anthropic:api:{normalized_base_url}:{_secret_fingerprint(api_key)}"
if normalized_provider in ("openai", "codex"):
if api_key:
return (
f"{normalized_provider}:api:"
f"{normalized_base_url}:{_secret_fingerprint(api_key)}"
)
if session_key:
return f"{normalized_provider}:session:{_secret_fingerprint(session_key)}"
local_codex = _read_codex_local_token()
if local_codex:
return f"{normalized_provider}:local-oauth:{_secret_fingerprint(local_codex)}"
if normalized_provider == "ollama":
return f"ollama:{normalized_base_url}:{_secret_fingerprint(api_key)}"
return f"{normalized_provider}:credential:{credential_id}"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Public entry point # Public entry point
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _do_fetch_provider_usage( async def _do_fetch_provider_usage(
credential_id: str, cache_key: str,
provider: str, provider: str,
account_key: str, account_key: str,
api_key: str | None, api_key: str | None,
@ -1030,11 +1082,19 @@ async def _do_fetch_provider_usage(
# Use a short TTL when subscription windows were expected but came back empty # Use a short TTL when subscription windows were expected but came back empty
# (e.g. a transient 429 at startup). This avoids persisting a 60s stale result # (e.g. a transient 429 at startup). This avoids persisting a 60s stale result
# while still preventing the thundering-herd that occurs with no caching at all. # while still preventing the thundering-herd that occurs with no caching at all.
ttl = CACHE_TTL_FAILURE_SECONDS if (subscription_attempted and not result.subscription_windows) else CACHE_TTL_SECONDS ttl = (
_set_cached(credential_id, result, ttl=ttl) CACHE_TTL_FAILURE_SECONDS
if (subscription_attempted and not result.subscription_windows)
else CACHE_TTL_SECONDS
)
_set_cached(cache_key, result, ttl=ttl)
logger.info( logger.info(
"provider_usage.checked provider=%s account=%s reachable=%s windows=%d error=%s", "provider_usage.checked provider=%s account=%s reachable=%s windows=%d error=%s",
provider, account_key, result.reachable, len(result.subscription_windows), result.error, provider,
account_key,
result.reachable,
len(result.subscription_windows),
result.error,
) )
return result return result
@ -1057,25 +1117,32 @@ async def fetch_provider_usage(
Results are cached for CACHE_TTL_SECONDS (short CACHE_TTL_FAILURE_SECONDS Results are cached for CACHE_TTL_SECONDS (short CACHE_TTL_FAILURE_SECONDS
when subscription windows are unavailable). Concurrent requests for the same when subscription windows are unavailable). Concurrent requests for the same
credential share one in-flight fetch to avoid rate-limit cascades. effective provider credential share one in-flight fetch to avoid rate-limit cascades.
Pass force_refresh=True to bypass the cache. Pass force_refresh=True to bypass the cache.
""" """
cache_key = _usage_cache_key(credential_id, provider, api_key, base_url, session_key)
if not force_refresh: if not force_refresh:
cached = _get_cached(credential_id) cached = _get_cached(cache_key)
if cached is not None: if cached is not None:
return cached return cached
# In-flight deduplication: if another coroutine is already fetching this # In-flight deduplication: if another coroutine is already fetching this
# credential, await its result rather than racing to hit the provider API. # upstream credential, await its result rather than racing to hit the provider API.
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if credential_id in _inflight: if cache_key in _inflight:
return await asyncio.shield(_inflight[credential_id]) return await asyncio.shield(_inflight[cache_key])
fut: asyncio.Future[ProviderUsageLive] = loop.create_future() fut: asyncio.Future[ProviderUsageLive] = loop.create_future()
_inflight[credential_id] = fut _inflight[cache_key] = fut
try: try:
result = await _do_fetch_provider_usage( result = await _do_fetch_provider_usage(
credential_id, provider, account_key, api_key, base_url, session_key, cache_key,
provider,
account_key,
api_key,
base_url,
session_key,
) )
fut.set_result(result) fut.set_result(result)
return result return result
@ -1083,4 +1150,4 @@ async def fetch_provider_usage(
fut.set_exception(exc) fut.set_exception(exc)
raise raise
finally: finally:
_inflight.pop(credential_id, None) _inflight.pop(cache_key, None)

View File

@ -860,7 +860,7 @@ export default function DashboardPage() {
const credentialUsageQuery = useQuery<ProviderNativeUsageWindow[], ApiError>({ const credentialUsageQuery = useQuery<ProviderNativeUsageWindow[], ApiError>({
queryKey: ["dashboard", "provider-credential-usage"], queryKey: ["dashboard", "provider-credential-usage"],
enabled: Boolean(isSignedIn), enabled: Boolean(isSignedIn),
refetchInterval: 30_000, refetchInterval: 60_000,
refetchOnMount: "always", refetchOnMount: "always",
queryFn: async () => { queryFn: async () => {
const credentialsRes = await listProviderCredentialsApiV1ProviderCredentialsGet(); const credentialsRes = await listProviderCredentialsApiV1ProviderCredentialsGet();

View File

@ -302,14 +302,13 @@ export function ProviderNavbarStatus() {
} }
let cancelled = false; let cancelled = false;
const fetchUsage = async (refresh = false) => { const fetchUsage = async () => {
setIsUsageLoading(true); setIsUsageLoading(true);
const pairs = await Promise.all( const pairs = await Promise.all(
usageCredentials.map(async (cred) => { usageCredentials.map(async (cred) => {
try { try {
const res = await getProviderUsageLiveApiV1ProviderCredentialsCredentialIdUsageGet( const res = await getProviderUsageLiveApiV1ProviderCredentialsCredentialIdUsageGet(
cred.id, cred.id,
refresh ? { refresh: true } : undefined,
); );
return [cred.id, res.status === 200 ? res.data : null] as const; return [cred.id, res.status === 200 ? res.data : null] as const;
} catch { } catch {
@ -328,7 +327,7 @@ export function ProviderNavbarStatus() {
void fetchUsage(); void fetchUsage();
}, 0); }, 0);
const interval = window.setInterval(() => { const interval = window.setInterval(() => {
void fetchUsage(true); void fetchUsage();
}, 60_000); }, 60_000);
return () => { return () => {