feat(backup): add BackupManager (incremental append + compaction) and BackupRestoreManager (download + decrypt + upsert)
This commit is contained in:
parent
f94fccfc61
commit
4ac2c8f841
|
|
@ -0,0 +1,114 @@
|
||||||
|
package app.closer.data.backup
|
||||||
|
|
||||||
|
import android.util.Log
|
||||||
|
import app.closer.crypto.CoupleEncryptionManager
|
||||||
|
import app.closer.data.remote.FirestoreBackupDataSource
|
||||||
|
import app.closer.data.remote.FirestoreConversationDataSource
|
||||||
|
import app.closer.domain.model.BackupCursor
|
||||||
|
import app.closer.domain.model.BackupManifest
|
||||||
|
import app.closer.domain.model.BackupMessageRecord
|
||||||
|
import app.closer.domain.repository.AuthRepository
|
||||||
|
import app.closer.domain.repository.CoupleRepository
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
import javax.inject.Inject
|
||||||
|
import javax.inject.Singleton
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Orchestrates the E2EE conversation backup: cheap **incremental** appends (new messages since the
|
||||||
|
* manifest cursor) plus periodic **compaction** into a full snapshot. Compaction re-reads current
|
||||||
|
* message state, so it also captures mutations (deletes/reactions) that a pure append-log would miss.
|
||||||
|
*
|
||||||
|
* Both partners run this against the same couple backup; convergence is by message-id dedupe (restore)
|
||||||
|
* + manifest `generation` CAS (here). All content is couple-key ciphertext — this class never logs it.
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
|
class BackupManager @Inject constructor(
|
||||||
|
private val authRepository: AuthRepository,
|
||||||
|
private val coupleRepository: CoupleRepository,
|
||||||
|
private val encryptionManager: CoupleEncryptionManager,
|
||||||
|
private val conversationDataSource: FirestoreConversationDataSource,
|
||||||
|
private val backupDataSource: FirestoreBackupDataSource
|
||||||
|
) {
|
||||||
|
private val mutex = Mutex()
|
||||||
|
@Volatile private var lastRunAt = 0L
|
||||||
|
|
||||||
|
private val cursorComparator = Comparator<BackupCursor> { a, b ->
|
||||||
|
if (a.createdAt != b.createdAt) a.createdAt.compareTo(b.createdAt) else a.messageId.compareTo(b.messageId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opportunistic backup — safe to call frequently (e.g. app background / Home load). Throttled and
|
||||||
|
* single-flighted; a missing couple key or no couple is a graceful no-op. Never throws.
|
||||||
|
*/
|
||||||
|
suspend fun backupNow(force: Boolean = false) {
|
||||||
|
val now = System.currentTimeMillis()
|
||||||
|
if (!force && now - lastRunAt < MIN_INTERVAL_MS) return
|
||||||
|
if (mutex.isLocked) return
|
||||||
|
mutex.withLock {
|
||||||
|
runCatching {
|
||||||
|
val uid = authRepository.currentUserId ?: return
|
||||||
|
val couple = coupleRepository.getCoupleForUser(uid) ?: return
|
||||||
|
val coupleId = couple.id
|
||||||
|
if (encryptionManager.aeadFor(coupleId) == null) return // key not present on this device
|
||||||
|
lastRunAt = now
|
||||||
|
|
||||||
|
appendIncremental(coupleId, uid)
|
||||||
|
|
||||||
|
val manifest = backupDataSource.getManifest(coupleId)
|
||||||
|
val chunkCount = backupDataSource.getChunks(coupleId).size
|
||||||
|
if (manifest?.snapshotUrl == null || chunkCount >= COMPACT_CHUNK_THRESHOLD) {
|
||||||
|
compact(coupleId, uid)
|
||||||
|
}
|
||||||
|
}.onFailure { Log.w(TAG, "backup run failed", it) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Reads new messages since the backed-up cursor and appends them as one encrypted chunk. */
|
||||||
|
private suspend fun appendIncremental(coupleId: String, uid: String) {
|
||||||
|
repeat(MAX_CAS_RETRIES) {
|
||||||
|
val manifest = backupDataSource.getManifest(coupleId) ?: BackupManifest()
|
||||||
|
val cursor = manifest.snapshotThroughCursor
|
||||||
|
val records = collectRecords(coupleId, afterCreatedAt = cursor.createdAt)
|
||||||
|
.filter { cursorComparator.compare(BackupCursor(it.createdAt, it.messageId), cursor) > 0 }
|
||||||
|
if (records.isEmpty()) return
|
||||||
|
val newCursor = records.map { BackupCursor(it.createdAt, it.messageId) }.maxWith(cursorComparator)
|
||||||
|
if (backupDataSource.appendChunk(coupleId, uid, records, newCursor, records.size)) return
|
||||||
|
// CAS lost (partner appended concurrently) → re-read + retry.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Full-state re-read → single snapshot blob → fold chunks. Captures deletes/reactions. */
|
||||||
|
private suspend fun compact(coupleId: String, uid: String) {
|
||||||
|
repeat(MAX_CAS_RETRIES) {
|
||||||
|
val manifest = backupDataSource.getManifest(coupleId) ?: BackupManifest()
|
||||||
|
val records = collectRecords(coupleId, afterCreatedAt = 0L)
|
||||||
|
if (records.isEmpty() && manifest.snapshotUrl == null) return // nothing to snapshot yet
|
||||||
|
val through = records.map { BackupCursor(it.createdAt, it.messageId) }
|
||||||
|
.maxWithOrNull(cursorComparator) ?: BackupCursor.ZERO
|
||||||
|
val foldedSeqs = backupDataSource.getChunks(coupleId).map { it.seq }
|
||||||
|
val result = backupDataSource.writeSnapshot(
|
||||||
|
coupleId = coupleId,
|
||||||
|
userId = uid,
|
||||||
|
records = records,
|
||||||
|
throughCursor = through,
|
||||||
|
expectedGeneration = manifest.generation,
|
||||||
|
foldedChunkSeqs = foldedSeqs
|
||||||
|
)
|
||||||
|
if (result != null) return // committed
|
||||||
|
// CAS lost → re-read + retry.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun collectRecords(coupleId: String, afterCreatedAt: Long): List<BackupMessageRecord> =
|
||||||
|
conversationDataSource.getConversationsForBackup(coupleId).flatMap { (id, type) ->
|
||||||
|
conversationDataSource.getBackupRecords(coupleId, id, type, afterCreatedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
const val TAG = "BackupManager"
|
||||||
|
const val MIN_INTERVAL_MS = 5 * 60 * 1000L // throttle opportunistic runs to ~once per 5 min
|
||||||
|
const val COMPACT_CHUNK_THRESHOLD = 15 // fold into a snapshot once enough chunks accrue
|
||||||
|
const val MAX_CAS_RETRIES = 4
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,95 @@
|
||||||
|
package app.closer.data.backup
|
||||||
|
|
||||||
|
import android.util.Log
|
||||||
|
import app.closer.crypto.CoupleEncryptionManager
|
||||||
|
import app.closer.data.local.ConversationCacheDao
|
||||||
|
import app.closer.data.local.entity.ConversationCacheEntity
|
||||||
|
import app.closer.data.remote.BackupCodec
|
||||||
|
import app.closer.data.remote.FirestoreBackupDataSource
|
||||||
|
import app.closer.domain.model.BackupMessageRecord
|
||||||
|
import app.closer.domain.repository.AuthRepository
|
||||||
|
import app.closer.domain.repository.CoupleRepository
|
||||||
|
import javax.inject.Inject
|
||||||
|
import javax.inject.Singleton
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restores the couple's conversation history from the E2EE backup into the local durable cache.
|
||||||
|
*
|
||||||
|
* Requires the couple key to already be present (recovered via the phrase or partner-assist). The
|
||||||
|
* snapshot blob + incremental chunks are decrypted with the couple key, merged (chunks override the
|
||||||
|
* snapshot per message id), integrity-checked, and upserted into [ConversationCacheDao] (idempotent →
|
||||||
|
* safe to re-run / resume). Content is never logged.
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
|
class BackupRestoreManager @Inject constructor(
|
||||||
|
private val authRepository: AuthRepository,
|
||||||
|
private val coupleRepository: CoupleRepository,
|
||||||
|
private val encryptionManager: CoupleEncryptionManager,
|
||||||
|
private val backupDataSource: FirestoreBackupDataSource,
|
||||||
|
private val cacheDao: ConversationCacheDao
|
||||||
|
) {
|
||||||
|
|
||||||
|
sealed interface RestoreResult {
|
||||||
|
data class Success(val restored: Int, val manifestMessageCount: Int) : RestoreResult
|
||||||
|
data object NothingToRestore : RestoreResult
|
||||||
|
data class Unavailable(val reason: String) : RestoreResult // no key / no couple
|
||||||
|
data class Failed(val reason: String) : RestoreResult
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun restore(): RestoreResult {
|
||||||
|
val uid = authRepository.currentUserId ?: return RestoreResult.Unavailable("not signed in")
|
||||||
|
val couple = coupleRepository.getCoupleForUser(uid) ?: return RestoreResult.Unavailable("no couple")
|
||||||
|
val coupleId = couple.id
|
||||||
|
if (encryptionManager.aeadFor(coupleId) == null) return RestoreResult.Unavailable("couple key missing")
|
||||||
|
|
||||||
|
return runCatching {
|
||||||
|
val manifest = backupDataSource.getManifest(coupleId) ?: return RestoreResult.NothingToRestore
|
||||||
|
|
||||||
|
val byId = LinkedHashMap<String, BackupMessageRecord>()
|
||||||
|
|
||||||
|
// 1) Snapshot (full current state), integrity-checked.
|
||||||
|
manifest.snapshotUrl?.let { url ->
|
||||||
|
val ciphertext = backupDataSource.downloadSnapshotCiphertext(url)
|
||||||
|
val records = backupDataSource.decodeCiphertext(coupleId, ciphertext)
|
||||||
|
val plainForChecksum = BackupCodec.encode(records)
|
||||||
|
if (manifest.snapshotChecksum != null && manifest.snapshotChecksum != BackupCodec.checksum(plainForChecksum)) {
|
||||||
|
// Corruption/tamper or a partial write — skip the snapshot, still restore from chunks.
|
||||||
|
Log.w(TAG, "snapshot checksum mismatch — restoring from chunks only")
|
||||||
|
} else {
|
||||||
|
records.forEach { byId[it.messageId] = it }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2) Incremental chunks (newer messages) override the snapshot per id.
|
||||||
|
backupDataSource.getChunks(coupleId).forEach { chunk ->
|
||||||
|
backupDataSource.decodeCiphertext(coupleId, chunk.payload).forEach { byId[it.messageId] = it }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (byId.isEmpty()) return RestoreResult.NothingToRestore
|
||||||
|
|
||||||
|
cacheDao.upsertAll(byId.values.map { it.toEntity() })
|
||||||
|
RestoreResult.Success(restored = byId.size, manifestMessageCount = manifest.messageCount)
|
||||||
|
}.getOrElse {
|
||||||
|
Log.w(TAG, "restore failed", it)
|
||||||
|
RestoreResult.Failed(it.message ?: "restore failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun BackupMessageRecord.toEntity() = ConversationCacheEntity(
|
||||||
|
messageId = messageId,
|
||||||
|
conversationId = conversationId,
|
||||||
|
conversationType = conversationType,
|
||||||
|
authorUserId = authorUserId,
|
||||||
|
type = type,
|
||||||
|
encText = encText,
|
||||||
|
mediaUrl = mediaUrl,
|
||||||
|
durationMs = durationMs,
|
||||||
|
createdAt = createdAt,
|
||||||
|
deleted = deleted,
|
||||||
|
reactionsJson = BackupCodec.reactionsToJson(reactions)
|
||||||
|
)
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
const val TAG = "BackupRestoreManager"
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue