BillTracker/services/csvTransactionImportService.js

557 lines
18 KiB
JavaScript
Raw Permalink Normal View History

2026-05-16 20:26:09 -05:00
'use strict';
const crypto = require('crypto');
const { getDb } = require('../db/database');
const { decorateTransaction, ensureManualDataSource } = require('./transactionService');
const SESSION_TTL_MS = 24 * 60 * 60 * 1000;
const MAX_ROWS = 25000;
const SAMPLE_SIZE = 10;
const FIELD_LABELS = {
posted_date: 'Posted date',
transacted_at: 'Transaction date/time',
amount: 'Amount',
debit_amount: 'Debit amount',
credit_amount: 'Credit amount',
description: 'Description',
payee: 'Payee',
memo: 'Memo',
category: 'Category',
account: 'Account',
transaction_id: 'Transaction ID',
transaction_type: 'Transaction type',
currency: 'Currency',
};
function importError(status, message, code, details = []) {
const err = new Error(message);
err.status = status;
err.code = code;
err.details = details;
return err;
}
function makeSessionId() {
return crypto.randomBytes(16).toString('hex');
}
function cleanFilename(value) {
return value
? String(value).replace(/[^a-zA-Z0-9._\-\s]/g, '').trim().slice(0, 255)
: null;
}
function normalizeHeader(value, index) {
const text = String(value ?? '').trim();
return text || `Column ${index + 1}`;
}
function parseCsv(text) {
const rows = [];
let row = [];
let cell = '';
let inQuotes = false;
for (let i = 0; i < text.length; i++) {
const ch = text[i];
const next = text[i + 1];
if (inQuotes) {
if (ch === '"' && next === '"') {
cell += '"';
i++;
} else if (ch === '"') {
inQuotes = false;
} else {
cell += ch;
}
continue;
}
if (ch === '"') {
inQuotes = true;
} else if (ch === ',') {
row.push(cell);
cell = '';
} else if (ch === '\n') {
row.push(cell);
rows.push(row);
row = [];
cell = '';
} else if (ch === '\r') {
if (next === '\n') continue;
row.push(cell);
rows.push(row);
row = [];
cell = '';
} else {
cell += ch;
}
}
if (inQuotes) {
throw importError(400, 'CSV has an unterminated quoted field.', 'CSV_PARSE_ERROR');
}
if (cell !== '' || row.length > 0) {
row.push(cell);
rows.push(row);
}
return rows.filter(r => r.some(c => String(c ?? '').trim() !== ''));
}
function csvBufferToText(buffer) {
if (!Buffer.isBuffer(buffer) || buffer.length === 0) {
throw importError(400, 'CSV file is required.', 'CSV_REQUIRED');
}
return buffer.toString('utf8').replace(/^\uFEFF/, '');
}
function rowToObject(headers, row) {
const out = {};
headers.forEach((header, index) => {
out[header] = String(row[index] ?? '').trim();
});
return out;
}
function normalizeHeaderToken(value) {
return String(value || '').toLowerCase().replace(/[^a-z0-9]+/g, ' ').trim();
}
function headerMatches(header, patterns) {
const token = normalizeHeaderToken(header);
return patterns.some(pattern => (
pattern instanceof RegExp ? pattern.test(token) : token === pattern
));
}
function suggestMapping(headers) {
const mapping = {};
const candidates = [
['posted_date', [/^date$/, 'posted date', 'post date', 'posting date', 'transaction date', 'trans date']],
['transacted_at', ['authorized date', 'authorization date', 'datetime', 'date time', 'timestamp']],
['transaction_id', ['transaction id', 'transaction number', 'trans id', 'id', 'fitid', 'reference', 'reference number']],
['description', ['description', 'transaction description', 'name', 'details', 'details description']],
['payee', ['payee', 'merchant', 'merchant name', 'vendor', 'name']],
['memo', ['memo', 'notes', 'note']],
['amount', [/^amount$/, 'transaction amount', 'net amount']],
['debit_amount', ['debit', 'debits', 'withdrawal', 'withdrawals', 'charge', 'charges']],
['credit_amount', ['credit', 'credits', 'deposit', 'deposits']],
['category', ['category', 'transaction category']],
['account', ['account', 'account name', 'account number']],
['transaction_type', ['type', 'transaction type']],
['currency', ['currency', 'currency code']],
];
for (const [field, patterns] of candidates) {
const found = headers.find(header => !Object.values(mapping).includes(header) && headerMatches(header, patterns));
if (found) mapping[field] = found;
}
if (!mapping.payee && mapping.description) {
const payee = headers.find(header => header !== mapping.description && headerMatches(header, ['name', 'merchant name']));
if (payee) mapping.payee = payee;
}
return mapping;
}
function parseCsvPreview(buffer, options = {}) {
const text = csvBufferToText(buffer);
const parsed = parseCsv(text);
if (parsed.length < 2) {
throw importError(400, 'CSV must include a header row and at least one data row.', 'CSV_EMPTY');
}
const headers = parsed[0].map(normalizeHeader);
const seenHeaders = new Set();
const duplicateHeaders = [];
for (const header of headers) {
const key = header.toLowerCase();
if (seenHeaders.has(key)) duplicateHeaders.push(header);
seenHeaders.add(key);
}
if (duplicateHeaders.length > 0) {
throw importError(400, `CSV contains duplicate headers: ${duplicateHeaders.join(', ')}`, 'CSV_DUPLICATE_HEADERS');
}
const dataRows = parsed.slice(1).slice(0, MAX_ROWS).map(row => rowToObject(headers, row));
const truncated = parsed.length - 1 > MAX_ROWS;
const suggestedMapping = suggestMapping(headers);
const errors = [];
if (!suggestedMapping.posted_date) {
errors.push({ field: 'posted_date', message: 'Could not detect a posted date column.' });
}
if (!suggestedMapping.amount && !(suggestedMapping.debit_amount || suggestedMapping.credit_amount)) {
errors.push({ field: 'amount', message: 'Could not detect an amount column.' });
}
if (!suggestedMapping.description && !suggestedMapping.payee && !suggestedMapping.memo) {
errors.push({ field: 'description', message: 'No description, payee, or memo column was detected. Dedupe will be less useful.' });
}
if (truncated) {
errors.push({ field: 'file', message: `Only the first ${MAX_ROWS} rows will be imported from this CSV.` });
}
return {
headers,
rows: dataRows,
rowCount: dataRows.length,
sampleRows: dataRows.slice(0, SAMPLE_SIZE),
suggestedMapping,
errors,
original_filename: cleanFilename(options.original_filename),
};
}
function pruneExpiredSessions(db) {
db.prepare('DELETE FROM import_sessions WHERE expires_at <= ?').run(new Date().toISOString());
}
function saveImportSession(db, userId, sessionData) {
const id = makeSessionId();
const now = new Date().toISOString();
const expiresAt = new Date(Date.now() + SESSION_TTL_MS).toISOString();
db.prepare(`
INSERT INTO import_sessions (id, user_id, created_at, expires_at, preview_json)
VALUES (?, ?, ?, ?, ?)
`).run(id, userId, now, expiresAt, JSON.stringify(sessionData));
return id;
}
function loadImportSession(db, userId, sessionId) {
const row = db.prepare(`
SELECT preview_json
FROM import_sessions
WHERE id = ? AND user_id = ? AND expires_at > ?
`).get(sessionId, userId, new Date().toISOString());
if (!row) {
throw importError(404, 'CSV import session not found or expired. Please re-upload the file.', 'CSV_SESSION_NOT_FOUND');
}
return JSON.parse(row.preview_json);
}
function deleteImportSession(db, sessionId) {
db.prepare('DELETE FROM import_sessions WHERE id = ?').run(sessionId);
}
function previewCsvTransactions(userId, buffer, options = {}) {
const db = getDb();
pruneExpiredSessions(db);
const preview = parseCsvPreview(buffer, options);
const sessionId = saveImportSession(db, userId, {
kind: 'csv_transactions',
...preview,
});
return {
import_session_id: sessionId,
headers: preview.headers,
sampleRows: preview.sampleRows,
rowCount: preview.rowCount,
suggestedMapping: preview.suggestedMapping,
errors: preview.errors,
fields: FIELD_LABELS,
};
}
function headerValue(row, mapping, field) {
const header = mapping?.[field];
if (!header) return '';
return String(row[header] ?? '').trim();
}
function parseDateValue(value, field, rowNumber) {
const text = String(value || '').trim();
if (!text) {
throw importError(400, `${FIELD_LABELS[field] || field} is required`, 'CSV_ROW_VALIDATION', [
{ row: rowNumber, field, message: `${FIELD_LABELS[field] || field} is required` },
]);
}
const iso = /^(\d{4})-(\d{1,2})-(\d{1,2})/.exec(text);
if (iso) {
const normalized = `${iso[1]}-${iso[2].padStart(2, '0')}-${iso[3].padStart(2, '0')}`;
if (isRealDate(normalized)) return normalized;
}
const slash = /^(\d{1,2})\/(\d{1,2})\/(\d{2,4})$/.exec(text);
if (slash) {
const year = slash[3].length === 2 ? `20${slash[3]}` : slash[3];
const normalized = `${year}-${slash[1].padStart(2, '0')}-${slash[2].padStart(2, '0')}`;
if (isRealDate(normalized)) return normalized;
}
throw importError(400, `${FIELD_LABELS[field] || field} must be a valid date`, 'CSV_ROW_VALIDATION', [
{ row: rowNumber, field, value: text, message: `${FIELD_LABELS[field] || field} must be YYYY-MM-DD or MM/DD/YYYY` },
]);
}
function isRealDate(value) {
const [year, month, day] = String(value).split('-').map(Number);
const date = new Date(Date.UTC(year, month - 1, day));
return date.getUTCFullYear() === year
&& date.getUTCMonth() === month - 1
&& date.getUTCDate() === day;
}
function parseCents(value, { negative = false } = {}) {
const text = String(value ?? '').trim();
if (!text) return null;
const parenNegative = /^\(.*\)$/.test(text);
const cleaned = text.replace(/[,$\s]/g, '').replace(/^\((.*)\)$/, '$1');
if (!/^[+-]?\d+(?:\.\d{1,4})?$/.test(cleaned)) return null;
const number = Number(cleaned);
if (!Number.isFinite(number)) return null;
const explicitNegative = cleaned.startsWith('-') || parenNegative;
const sign = explicitNegative || negative ? -1 : 1;
return Math.round(Math.abs(number) * 100) * sign;
}
function parseMappedAmount(row, mapping) {
const amount = parseCents(headerValue(row, mapping, 'amount'));
if (amount !== null) return amount;
const debit = parseCents(headerValue(row, mapping, 'debit_amount'), { negative: true });
if (debit !== null) return debit;
const credit = parseCents(headerValue(row, mapping, 'credit_amount'));
if (credit !== null) return credit;
return null;
}
function stableHash(parts) {
return crypto
.createHash('sha256')
.update(parts.map(part => String(part || '').trim().toLowerCase()).join('\u001f'))
.digest('hex')
.slice(0, 48);
}
function getOrCreateCsvDataSource(db, userId) {
ensureManualDataSource(db, userId);
const existing = db.prepare(`
SELECT *
FROM data_sources
WHERE user_id = ? AND type = 'file_import' AND provider = 'csv' AND name = 'CSV Import'
ORDER BY id ASC
LIMIT 1
`).get(userId);
if (existing) return existing;
const result = db.prepare(`
INSERT INTO data_sources (user_id, type, provider, name, status)
VALUES (?, 'file_import', 'csv', 'CSV Import', 'active')
`).run(userId);
return db.prepare('SELECT * FROM data_sources WHERE id = ? AND user_id = ?').get(result.lastInsertRowid, userId);
}
function getOrCreateAccount(db, userId, dataSourceId, name) {
const accountName = String(name || '').trim();
if (!accountName) return null;
const providerAccountId = stableHash([accountName]).slice(0, 32);
const existing = db.prepare(`
SELECT *
FROM financial_accounts
WHERE user_id = ? AND data_source_id = ? AND provider_account_id = ?
`).get(userId, dataSourceId, providerAccountId);
if (existing) return existing;
const result = db.prepare(`
INSERT INTO financial_accounts
(user_id, data_source_id, provider_account_id, name, account_type, currency)
VALUES (?, ?, ?, ?, 'csv', 'USD')
`).run(userId, dataSourceId, providerAccountId, accountName);
return db.prepare('SELECT * FROM financial_accounts WHERE id = ? AND user_id = ?').get(result.lastInsertRowid, userId);
}
function validateMapping(headers, mapping = {}) {
const headerSet = new Set(headers);
const required = [];
if (!mapping.posted_date) required.push('posted_date');
if (!mapping.amount && !(mapping.debit_amount || mapping.credit_amount)) required.push('amount');
if (required.length) {
throw importError(400, `Missing required mapping: ${required.map(f => FIELD_LABELS[f] || f).join(', ')}`, 'CSV_MAPPING_REQUIRED');
}
for (const [field, header] of Object.entries(mapping)) {
if (!FIELD_LABELS[field]) {
throw importError(400, `Unsupported mapping field: ${field}`, 'CSV_MAPPING_INVALID');
}
if (header && !headerSet.has(header)) {
throw importError(400, `Mapped column "${header}" for ${field} was not found in the CSV headers.`, 'CSV_MAPPING_INVALID');
}
}
}
function parseOptionalDateTimeValue(value, field, rowNumber) {
const text = String(value || '').trim();
if (!text) return null;
const match = /^(\d{4}-\d{2}-\d{2})(?:[T ]\d{2}:\d{2}(?::\d{2}(?:\.\d{1,9})?)?(?:Z|[+-]\d{2}:?\d{2})?)?$/.exec(text);
if (!match || !isRealDate(match[1])) {
throw importError(400, `${FIELD_LABELS[field] || field} must be a valid ISO date or date-time`, 'CSV_ROW_VALIDATION', [
{ row: rowNumber, field, value: text, message: `${FIELD_LABELS[field] || field} must be YYYY-MM-DD or an ISO date-time` },
]);
}
return text;
}
function normalizeCsvTransaction(row, mapping, rowNumber) {
const postedDate = parseDateValue(headerValue(row, mapping, 'posted_date'), 'posted_date', rowNumber);
const transactedAt = parseOptionalDateTimeValue(headerValue(row, mapping, 'transacted_at'), 'transacted_at', rowNumber);
const amount = parseMappedAmount(row, mapping);
if (!Number.isSafeInteger(amount) || amount === 0) {
throw importError(400, 'Amount must be a non-zero number.', 'CSV_ROW_VALIDATION', [
{ row: rowNumber, field: 'amount', message: 'Amount must be a non-zero number.' },
]);
}
const description = headerValue(row, mapping, 'description');
const payee = headerValue(row, mapping, 'payee');
const memo = headerValue(row, mapping, 'memo');
const accountName = headerValue(row, mapping, 'account');
const transactionId = headerValue(row, mapping, 'transaction_id');
const providerTransactionId = transactionId
? `csv:id:${transactionId}`
: `csv:hash:${stableHash([postedDate, amount, description, payee, accountName])}`;
return {
provider_transaction_id: providerTransactionId,
transaction_type: headerValue(row, mapping, 'transaction_type') || null,
posted_date: postedDate,
transacted_at: transactedAt,
amount,
currency: headerValue(row, mapping, 'currency') || 'USD',
description: description || payee || memo || null,
payee: payee || null,
memo: memo || null,
category: headerValue(row, mapping, 'category') || null,
account_name: accountName || null,
raw_data: JSON.stringify(row),
};
}
function commitCsvTransactions(userId, importSessionId, mapping, options = {}) {
const db = getDb();
const session = loadImportSession(db, userId, importSessionId);
if (session.kind !== 'csv_transactions') {
throw importError(400, 'Import session is not a CSV transaction preview.', 'CSV_SESSION_INVALID');
}
validateMapping(session.headers, mapping);
const dataSource = getOrCreateCsvDataSource(db, userId);
const details = [];
const counts = { imported: 0, skipped: 0, failed: 0 };
const insert = db.prepare(`
INSERT INTO transactions
(user_id, data_source_id, account_id, provider_transaction_id, source_type,
transaction_type, posted_date, transacted_at, amount, currency, description,
payee, memo, category, raw_data, match_status, ignored)
VALUES (?, ?, ?, ?, 'file_import', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'unmatched', 0)
`);
const existing = db.prepare(`
SELECT id
FROM transactions
WHERE user_id = ? AND data_source_id = ? AND provider_transaction_id = ?
`);
const run = db.transaction(() => {
session.rows.forEach((row, index) => {
const rowNumber = index + 2;
try {
const tx = normalizeCsvTransaction(row, mapping, rowNumber);
if (existing.get(userId, dataSource.id, tx.provider_transaction_id)) {
counts.skipped++;
details.push({ row: rowNumber, result: 'skipped_duplicate', provider_transaction_id: tx.provider_transaction_id });
return;
}
const account = getOrCreateAccount(db, userId, dataSource.id, tx.account_name);
const result = insert.run(
userId,
dataSource.id,
account?.id ?? null,
tx.provider_transaction_id,
tx.transaction_type,
tx.posted_date,
tx.transacted_at,
tx.amount,
tx.currency,
tx.description,
tx.payee,
tx.memo,
tx.category,
tx.raw_data,
);
counts.imported++;
details.push({
row: rowNumber,
result: 'imported',
transaction: decorateTransaction({
...tx,
id: result.lastInsertRowid,
user_id: userId,
data_source_id: dataSource.id,
source_type: 'file_import',
data_source_type: dataSource.type,
data_source_provider: dataSource.provider,
data_source_name: dataSource.name,
data_source_status: dataSource.status,
account_id: account?.id ?? null,
account_name: account?.name ?? null,
match_status: 'unmatched',
ignored: 0,
}),
});
} catch (err) {
counts.failed++;
details.push({ row: rowNumber, result: 'failed', message: err.message, details: err.details || [] });
}
});
db.prepare(`
INSERT INTO import_history (
user_id, imported_at, source_filename, file_type, sheet_name,
rows_parsed, rows_created, rows_updated, rows_skipped, rows_ambiguous,
rows_errored, options_json, summary_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
userId,
new Date().toISOString(),
session.original_filename,
'csv_transactions',
null,
session.rows.length,
counts.imported,
0,
counts.skipped,
0,
counts.failed,
JSON.stringify({ mapping, options }),
JSON.stringify(details.slice(0, 500)),
);
});
run();
deleteImportSession(db, importSessionId);
return {
success: true,
imported: counts.imported,
skipped: counts.skipped,
failed: counts.failed,
details,
};
}
module.exports = {
FIELD_LABELS,
commitCsvTransactions,
previewCsvTransactions,
};