BillTracker/services/bankSyncWorker.js

138 lines
3.9 KiB
JavaScript
Raw Normal View History

'use strict';
const { getDb } = require('../db/database');
const { getBankSyncConfig } = require('./bankSyncConfigService');
const { syncDataSource } = require('./bankSyncService');
// Skip a source if it was synced less than this long ago (catches recent manual syncs)
const MIN_SYNC_AGE_MS = 60 * 60 * 1000; // 1 hour
// Pause between each source to avoid hammering SimpleFIN
const STAGGER_DELAY_MS = 3000;
let timer = null;
let running = false;
let lastRunAt = null;
let nextRunAt = null;
function intervalMs() {
const { sync_interval_hours } = getBankSyncConfig();
return Math.round(sync_interval_hours * 3600000);
}
function needsSync(source) {
if (!source.last_sync_at) return true;
const age = Date.now() - new Date(source.last_sync_at).getTime();
return age >= MIN_SYNC_AGE_MS;
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function runCycle() {
if (running) return;
const config = getBankSyncConfig();
if (!config.enabled) {
console.log('[bankSync] Disabled — skipping cycle');
return;
}
const debug = config.debug_logging;
let db, sources;
try {
db = getDb();
sources = db.prepare(`
SELECT * FROM data_sources
WHERE type = 'provider_sync' AND provider = 'simplefin'
ORDER BY last_sync_at ASC
`).all();
} catch (err) {
console.error('[bankSync] Worker failed to load sources:', err.message);
return;
}
if (sources.length === 0) {
if (debug) console.log('[bankSync] No SimpleFIN sources configured — skipping cycle');
return;
}
console.log(`[bankSync] Cycle starting — ${sources.length} source(s)`);
running = true;
lastRunAt = new Date().toISOString();
let synced = 0;
let skipped = 0;
let failed = 0;
for (let i = 0; i < sources.length; i++) {
const source = sources[i];
if (!needsSync(source)) {
if (debug) console.log(`[bankSync] Source #${source.id} (user ${source.user_id}): recently synced — skipping`);
skipped++;
continue;
}
if (debug) console.log(`[bankSync] Source #${source.id} (user ${source.user_id}): starting sync`);
try {
const result = await syncDataSource(db, source.user_id, source.id, { debug });
synced++;
const extra = [
result.transactionsPosted ? `${result.transactionsPosted} settled` : null,
result.pendingCleared ? `${result.pendingCleared} pending cleared` : null,
].filter(Boolean).join(', ');
console.log(`[bankSync] Source #${source.id}: OK — ${result.accountsUpserted} account(s), ${result.transactionsNew} new, ${result.transactionsSkip} skipped${extra ? `, ${extra}` : ''}${result.errlist ? ` [partial: ${result.errlist}]` : ''}`);
} catch (err) {
failed++;
console.error(`[bankSync] Source #${source.id}: FAILED — ${err.message}`);
}
// Stagger requests — don't fire them all simultaneously
if (i < sources.length - 1) await sleep(STAGGER_DELAY_MS);
}
console.log(`[bankSync] Cycle complete — ${synced} synced, ${failed} failed, ${skipped} skipped`);
running = false;
}
function scheduleNext() {
const ms = intervalMs();
nextRunAt = new Date(Date.now() + ms).toISOString();
timer = setTimeout(() => {
runCycle()
.catch(err => {
console.error('[bankSync] Worker cycle error:', err.message);
running = false;
})
.finally(scheduleNext);
}, ms);
// Don't hold the event loop open if the server is shutting down
if (timer.unref) timer.unref();
}
function start() {
if (timer) return;
scheduleNext();
console.log(`[bankSync] Auto-sync worker started (interval: ${getBankSyncConfig().sync_interval_hours}h)`);
}
function stop() {
clearTimeout(timer);
timer = null;
}
function getStatus() {
return {
running,
interval_hours: getBankSyncConfig().sync_interval_hours,
last_run_at: lastRunAt,
next_run_at: nextRunAt,
};
}
module.exports = { start, stop, getStatus };