#!/usr/bin/env python3 """ Q1 - improve question-bank variety. This one-off migration edits BOTH the source JSON files and the shipped asset DB. It only changes row content: * delete fully duplicated question rows where text + answer payload match * rewrite remaining duplicate visible texts so each prompt is unique * rewrite over-cap template stems into varied, topic-specific prompts Room schema and identity hash are untouched. build_db.py is NOT run. """ from __future__ import annotations import json import sqlite3 from collections import defaultdict from pathlib import Path from typing import Any from validate_question_variety import MAX_TEMPLATE_STEM_ROWS, TEMPLATE_STEMS, normalize_text ROOT = Path(__file__).resolve().parents[1] QUESTIONS_DIR = ROOT / "seed" / "questions" DB_PATH = ROOT / "app" / "src" / "main" / "assets" / "database" / "app.db" UNDERSTOOD_STEM = "What do you wish I understood about " NEED_STEM = "What do you need from me when " UNDERSTOOD_TOPIC_FIXES = { "your growth we have made": "the growth we have made", "your forgiveness": "the forgiveness you offer", "your support through stress": "the support you give through stress", "your shared resilience": "our shared resilience", "your ways we repair": "the ways we repair", "your feeling chosen": "feeling chosen", "your mutual effort": "our mutual effort", "your a past hard season": "a past hard season", "your future gratitude": "future gratitude", } UNDERSTOOD_VARIANTS = ( "What is one thing you want me to understand about {topic}?", "What feels hardest to explain about {topic}?", "What would help me see {topic} with more care?", "What do you want me to notice about {topic}?", "Where do you feel least understood around {topic}?", "What part of {topic} do you rarely get to say out loud?", "How can I better honor what {topic} brings up for you?", "What question do you wish I would ask about {topic}?", ) NEED_WHEN_VARIANTS = ( "When {condition}, what support from me would help most?", "When {condition}, what should I do more of?", "When {condition}, what should I avoid doing?", "When {condition}, what reassurance helps most?", "When {condition}, what would help you feel less alone?", "When {condition}, what small action from me would matter?", ) WORKING_ON_VARIANTS = ( "While we're working on {topic}, what support from me would help most?", "While we're working on {topic}, what should I do more of?", "While we're working on {topic}, what should I avoid doing?", "While we're working on {topic}, what reassurance helps most?", "While we're working on {topic}, what would help us stay connected?", "While we're working on {topic}, what small action from me would matter?", ) GENERIC_DUPLICATE_SUFFIXES = ( "right now", "these days", "in this season", "when it matters most", "without overthinking it", ) POLISH_TEXT_REPLACEMENTS = { "consistent actions feels fragile": "consistent actions feel fragile", "boundaries after rupture feels fragile": "boundaries after rupture feel fragile", "triggered moments feels fragile": "triggered moments feel fragile", "trust timelines feels fragile": "trust timelines feel fragile", "questions after betrayal feels fragile": "questions after betrayal feel fragile", "trust deposits feels fragile": "trust deposits feel fragile", "broken promises feels fragile": "broken promises feel fragile", "relationship agreements feels fragile": "relationship agreements feel fragile", "what your support through stress brings up": "what the support you give through stress brings up", "your feeling chosen": "feeling chosen", "your mutual effort": "our mutual effort", } def canonical_json(value: Any) -> str: return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False) def question_payload(q: dict[str, Any]) -> str: payload = { "type": q.get("type", "written"), "answer_config": q.get("answer_config", {}), "options": q.get("options", []), } return canonical_json(payload) def load_json_files() -> tuple[dict[Path, dict[str, Any]], dict[str, tuple[Path, dict[str, Any]]]]: files: dict[Path, dict[str, Any]] = {} by_id: dict[str, tuple[Path, dict[str, Any]]] = {} for path in sorted(QUESTIONS_DIR.glob("*.json")): with path.open() as f: data = json.load(f) files[path] = data for q in data.get("questions", []): by_id[q["id"]] = (path, q) return files, by_id def save_json_files(files: dict[Path, dict[str, Any]]) -> None: for path, data in files.items(): with path.open("w") as f: json.dump(data, f, indent=2, ensure_ascii=False) f.write("\n") def clean_topic(topic: str) -> str: topic = topic.strip() return UNDERSTOOD_TOPIC_FIXES.get(topic, topic) def clean_condition(condition: str) -> tuple[str, bool]: condition = condition.strip() if condition.startswith("we are trying "): return condition.removeprefix("we are trying ").strip(), True return condition, False def trim_question_mark(text: str) -> str: return text.rstrip().removesuffix("?").strip() def option_texts(q: dict[str, Any]) -> list[str]: if q.get("options"): return [opt.get("text", "") for opt in q["options"] if opt.get("text")] answer_config = q.get("answer_config") or {} config = answer_config.get("config", answer_config) options = config.get("options") or [] if options: return [opt.get("text", "") for opt in options if opt.get("text")] option_a = config.get("optionA") option_b = config.get("optionB") if option_a and option_b: return [option_a.get("text", ""), option_b.get("text", "")] return [] def short_options(options: list[str]) -> str: clean = [opt.strip().lower() for opt in options if opt.strip()] if len(clean) >= 2: return f"{clean[0]} or {clean[1]}" if clean: return clean[0] return "the choices" def unique_candidate(base: str, taken: set[str]) -> str: normalized = normalize_text(base) if normalized not in taken: return base for suffix in GENERIC_DUPLICATE_SUFFIXES: candidate = f"{trim_question_mark(base)} {suffix}?" if normalize_text(candidate) not in taken: return candidate n = 2 while True: candidate = f"{trim_question_mark(base)} ({n})?" if normalize_text(candidate) not in taken: return candidate n += 1 def template_rewrite(q: dict[str, Any], occurrence_index: int) -> str | None: text = q["text"] if text.startswith(UNDERSTOOD_STEM): topic = clean_topic(text.removeprefix(UNDERSTOOD_STEM).rstrip("?")) variant = UNDERSTOOD_VARIANTS[occurrence_index % len(UNDERSTOOD_VARIANTS)] return variant.format(topic=topic) if text.startswith(NEED_STEM): condition, working_on = clean_condition(text.removeprefix(NEED_STEM).rstrip("?")) if working_on: variant = WORKING_ON_VARIANTS[occurrence_index % len(WORKING_ON_VARIANTS)] return variant.format(topic=condition) variant = NEED_WHEN_VARIANTS[occurrence_index % len(NEED_WHEN_VARIANTS)] return variant.format(condition=condition) return None def duplicate_text_rewrite(q: dict[str, Any], duplicate_index: int) -> str: text = q["text"] options = option_texts(q) qtype = q.get("type", "written") if qtype == "this_or_that" and len(options) >= 2: candidate = f"Which fits this choice better for you: {short_options(options)}?" if duplicate_index % 2: candidate = f"Which would you choose first here: {short_options(options)}?" return candidate if options: option_hint = short_options(options) if qtype == "multi_choice": return f"Which answers fit best here: {option_hint}?" return f"Which answer fits best here: {option_hint}?" base = trim_question_mark(text) suffix = GENERIC_DUPLICATE_SUFFIXES[duplicate_index % len(GENERIC_DUPLICATE_SUFFIXES)] return f"{base} {suffix}?" def delete_full_duplicates(files: dict[Path, dict[str, Any]]) -> set[str]: groups: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list) for data in files.values(): for q in data.get("questions", []): text = q.get("text", "") if text: groups[(normalize_text(text), question_payload(q))].append(q) delete_ids: set[str] = set() for rows in groups.values(): if len(rows) <= 1: continue for q in sorted(rows, key=lambda item: item["id"])[1:]: delete_ids.add(q["id"]) if not delete_ids: return delete_ids for data in files.values(): data["questions"] = [q for q in data.get("questions", []) if q["id"] not in delete_ids] return delete_ids def polish_text(text: str) -> str: for old, new in POLISH_TEXT_REPLACEMENTS.items(): text = text.replace(old, new) return text def polish_all_texts(files: dict[Path, dict[str, Any]]) -> dict[str, str]: rewrites: dict[str, str] = {} for data in files.values(): for q in data.get("questions", []): text = q.get("text") if not text: continue polished = polish_text(text) if polished != text: q["text"] = polished rewrites[q["id"]] = polished return rewrites def rewrite_template_overflow(files: dict[Path, dict[str, Any]]) -> dict[str, str]: rewrites: dict[str, str] = {} by_stem: dict[str, list[dict[str, Any]]] = defaultdict(list) for data in files.values(): for q in data.get("questions", []): normalized = normalize_text(q.get("text", "")) for stem in TEMPLATE_STEMS: if normalized.startswith(stem): by_stem[stem].append(q) break taken = { normalize_text(q["text"]) for data in files.values() for q in data.get("questions", []) if q.get("text") } for stem, rows in by_stem.items(): rows = sorted(rows, key=lambda item: item["id"]) for index, q in enumerate(rows): if index < MAX_TEMPLATE_STEM_ROWS: continue taken.discard(normalize_text(q["text"])) candidate = template_rewrite(q, index) if not candidate: continue candidate = unique_candidate(candidate, taken) q["text"] = candidate rewrites[q["id"]] = candidate taken.add(normalize_text(candidate)) return rewrites def rewrite_remaining_duplicate_texts(files: dict[Path, dict[str, Any]]) -> dict[str, str]: rewrites: dict[str, str] = {} rows = [q for data in files.values() for q in data.get("questions", []) if q.get("text")] groups: dict[str, list[dict[str, Any]]] = defaultdict(list) for q in rows: groups[normalize_text(q["text"])].append(q) taken = {normalize_text(q["text"]) for q in rows} for group in groups.values(): if len(group) <= 1: continue for duplicate_index, q in enumerate(sorted(group, key=lambda item: item["id"])[1:], start=1): taken.discard(normalize_text(q["text"])) candidate = duplicate_text_rewrite(q, duplicate_index) candidate = unique_candidate(candidate, taken) q["text"] = candidate rewrites[q["id"]] = candidate taken.add(normalize_text(candidate)) return rewrites def migrate_db(delete_ids: set[str], rewrites: dict[str, str]) -> int: con = sqlite3.connect(DB_PATH) try: cur = con.cursor() changed = 0 for qid in sorted(delete_ids): cur.execute("DELETE FROM question WHERE id=?", (qid,)) changed += cur.rowcount for qid, text in sorted(rewrites.items()): cur.execute("UPDATE question SET text=? WHERE id=?", (text, qid)) changed += cur.rowcount con.commit() return changed finally: con.close() def main() -> None: files, _ = load_json_files() delete_ids = delete_full_duplicates(files) template_rewrites = rewrite_template_overflow(files) duplicate_rewrites = rewrite_remaining_duplicate_texts(files) polish_rewrites = polish_all_texts(files) rewrites = {**template_rewrites, **duplicate_rewrites, **polish_rewrites} save_json_files(files) db_changed = migrate_db(delete_ids, rewrites) print(f"Deleted fully duplicated rows: {len(delete_ids)}") print(f"Template-stem rewrites: {len(template_rewrites)}") print(f"Duplicate-text rewrites: {len(duplicate_rewrites)}") print(f"Polish rewrites: {len(polish_rewrites)}") print(f"DB rows changed: {db_changed}") con = sqlite3.connect(DB_PATH) try: room_hash = con.execute("SELECT identity_hash FROM room_master_table").fetchone()[0] active_count = con.execute("SELECT COUNT(*) FROM question WHERE status='active'").fetchone()[0] finally: con.close() print(f"Room hash: {room_hash}") print(f"Active questions: {active_count}") if __name__ == "__main__": main()