2026-06-19 00:35:00 -05:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
Q1 - improve question-bank variety.
|
|
|
|
|
|
|
|
|
|
This one-off migration edits BOTH the source JSON files and the shipped asset
|
|
|
|
|
DB. It only changes row content:
|
|
|
|
|
* delete fully duplicated question rows where text + answer payload match
|
|
|
|
|
* rewrite remaining duplicate visible texts so each prompt is unique
|
|
|
|
|
* rewrite over-cap template stems into varied, topic-specific prompts
|
|
|
|
|
|
|
|
|
|
Room schema and identity hash are untouched. build_db.py is NOT run.
|
|
|
|
|
"""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import sqlite3
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
from validate_question_variety import MAX_TEMPLATE_STEM_ROWS, TEMPLATE_STEMS, normalize_text
|
|
|
|
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
QUESTIONS_DIR = ROOT / "seed" / "questions"
|
|
|
|
|
DB_PATH = ROOT / "app" / "src" / "main" / "assets" / "database" / "app.db"
|
|
|
|
|
|
|
|
|
|
UNDERSTOOD_STEM = "What do you wish I understood about "
|
|
|
|
|
NEED_STEM = "What do you need from me when "
|
|
|
|
|
|
|
|
|
|
UNDERSTOOD_TOPIC_FIXES = {
|
|
|
|
|
"your growth we have made": "the growth we have made",
|
|
|
|
|
"your forgiveness": "the forgiveness you offer",
|
|
|
|
|
"your support through stress": "the support you give through stress",
|
|
|
|
|
"your shared resilience": "our shared resilience",
|
|
|
|
|
"your ways we repair": "the ways we repair",
|
|
|
|
|
"your feeling chosen": "feeling chosen",
|
|
|
|
|
"your mutual effort": "our mutual effort",
|
|
|
|
|
"your a past hard season": "a past hard season",
|
|
|
|
|
"your future gratitude": "future gratitude",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UNDERSTOOD_VARIANTS = (
|
|
|
|
|
"What is one thing you want me to understand about {topic}?",
|
|
|
|
|
"What feels hardest to explain about {topic}?",
|
|
|
|
|
"What would help me see {topic} with more care?",
|
|
|
|
|
"What do you want me to notice about {topic}?",
|
|
|
|
|
"Where do you feel least understood around {topic}?",
|
|
|
|
|
"What part of {topic} do you rarely get to say out loud?",
|
|
|
|
|
"How can I better honor what {topic} brings up for you?",
|
|
|
|
|
"What question do you wish I would ask about {topic}?",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
NEED_WHEN_VARIANTS = (
|
|
|
|
|
"When {condition}, what support from me would help most?",
|
|
|
|
|
"When {condition}, what should I do more of?",
|
|
|
|
|
"When {condition}, what should I avoid doing?",
|
|
|
|
|
"When {condition}, what reassurance helps most?",
|
|
|
|
|
"When {condition}, what would help you feel less alone?",
|
|
|
|
|
"When {condition}, what small action from me would matter?",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
WORKING_ON_VARIANTS = (
|
|
|
|
|
"While we're working on {topic}, what support from me would help most?",
|
|
|
|
|
"While we're working on {topic}, what should I do more of?",
|
|
|
|
|
"While we're working on {topic}, what should I avoid doing?",
|
|
|
|
|
"While we're working on {topic}, what reassurance helps most?",
|
|
|
|
|
"While we're working on {topic}, what would help us stay connected?",
|
|
|
|
|
"While we're working on {topic}, what small action from me would matter?",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
GENERIC_DUPLICATE_SUFFIXES = (
|
|
|
|
|
"right now",
|
|
|
|
|
"these days",
|
|
|
|
|
"in this season",
|
|
|
|
|
"when it matters most",
|
|
|
|
|
"without overthinking it",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
POLISH_TEXT_REPLACEMENTS = {
|
|
|
|
|
"consistent actions feels fragile": "consistent actions feel fragile",
|
|
|
|
|
"boundaries after rupture feels fragile": "boundaries after rupture feel fragile",
|
|
|
|
|
"triggered moments feels fragile": "triggered moments feel fragile",
|
|
|
|
|
"trust timelines feels fragile": "trust timelines feel fragile",
|
|
|
|
|
"questions after betrayal feels fragile": "questions after betrayal feel fragile",
|
|
|
|
|
"trust deposits feels fragile": "trust deposits feel fragile",
|
|
|
|
|
"broken promises feels fragile": "broken promises feel fragile",
|
|
|
|
|
"relationship agreements feels fragile": "relationship agreements feel fragile",
|
|
|
|
|
"what your support through stress brings up": "what the support you give through stress brings up",
|
|
|
|
|
"your feeling chosen": "feeling chosen",
|
|
|
|
|
"your mutual effort": "our mutual effort",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def canonical_json(value: Any) -> str:
|
|
|
|
|
return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def question_payload(q: dict[str, Any]) -> str:
|
|
|
|
|
payload = {
|
|
|
|
|
"type": q.get("type", "written"),
|
|
|
|
|
"answer_config": q.get("answer_config", {}),
|
|
|
|
|
"options": q.get("options", []),
|
|
|
|
|
}
|
|
|
|
|
return canonical_json(payload)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_json_files() -> tuple[dict[Path, dict[str, Any]], dict[str, tuple[Path, dict[str, Any]]]]:
|
|
|
|
|
files: dict[Path, dict[str, Any]] = {}
|
|
|
|
|
by_id: dict[str, tuple[Path, dict[str, Any]]] = {}
|
|
|
|
|
for path in sorted(QUESTIONS_DIR.glob("*.json")):
|
|
|
|
|
with path.open() as f:
|
|
|
|
|
data = json.load(f)
|
|
|
|
|
files[path] = data
|
|
|
|
|
for q in data.get("questions", []):
|
|
|
|
|
by_id[q["id"]] = (path, q)
|
|
|
|
|
return files, by_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_json_files(files: dict[Path, dict[str, Any]]) -> None:
|
|
|
|
|
for path, data in files.items():
|
|
|
|
|
with path.open("w") as f:
|
|
|
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
|
f.write("\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_topic(topic: str) -> str:
|
|
|
|
|
topic = topic.strip()
|
|
|
|
|
return UNDERSTOOD_TOPIC_FIXES.get(topic, topic)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_condition(condition: str) -> tuple[str, bool]:
|
|
|
|
|
condition = condition.strip()
|
|
|
|
|
if condition.startswith("we are trying "):
|
|
|
|
|
return condition.removeprefix("we are trying ").strip(), True
|
|
|
|
|
return condition, False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def trim_question_mark(text: str) -> str:
|
|
|
|
|
return text.rstrip().removesuffix("?").strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def option_texts(q: dict[str, Any]) -> list[str]:
|
|
|
|
|
if q.get("options"):
|
|
|
|
|
return [opt.get("text", "") for opt in q["options"] if opt.get("text")]
|
|
|
|
|
|
|
|
|
|
answer_config = q.get("answer_config") or {}
|
|
|
|
|
config = answer_config.get("config", answer_config)
|
|
|
|
|
options = config.get("options") or []
|
|
|
|
|
if options:
|
|
|
|
|
return [opt.get("text", "") for opt in options if opt.get("text")]
|
|
|
|
|
|
|
|
|
|
option_a = config.get("optionA")
|
|
|
|
|
option_b = config.get("optionB")
|
|
|
|
|
if option_a and option_b:
|
|
|
|
|
return [option_a.get("text", ""), option_b.get("text", "")]
|
|
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def short_options(options: list[str]) -> str:
|
|
|
|
|
clean = [opt.strip().lower() for opt in options if opt.strip()]
|
|
|
|
|
if len(clean) >= 2:
|
|
|
|
|
return f"{clean[0]} or {clean[1]}"
|
|
|
|
|
if clean:
|
|
|
|
|
return clean[0]
|
|
|
|
|
return "the choices"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def unique_candidate(base: str, taken: set[str]) -> str:
|
|
|
|
|
normalized = normalize_text(base)
|
|
|
|
|
if normalized not in taken:
|
|
|
|
|
return base
|
|
|
|
|
for suffix in GENERIC_DUPLICATE_SUFFIXES:
|
|
|
|
|
candidate = f"{trim_question_mark(base)} {suffix}?"
|
|
|
|
|
if normalize_text(candidate) not in taken:
|
|
|
|
|
return candidate
|
|
|
|
|
n = 2
|
|
|
|
|
while True:
|
|
|
|
|
candidate = f"{trim_question_mark(base)} ({n})?"
|
|
|
|
|
if normalize_text(candidate) not in taken:
|
|
|
|
|
return candidate
|
|
|
|
|
n += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def template_rewrite(q: dict[str, Any], occurrence_index: int) -> str | None:
|
|
|
|
|
text = q["text"]
|
|
|
|
|
if text.startswith(UNDERSTOOD_STEM):
|
|
|
|
|
topic = clean_topic(text.removeprefix(UNDERSTOOD_STEM).rstrip("?"))
|
|
|
|
|
variant = UNDERSTOOD_VARIANTS[occurrence_index % len(UNDERSTOOD_VARIANTS)]
|
|
|
|
|
return variant.format(topic=topic)
|
|
|
|
|
|
|
|
|
|
if text.startswith(NEED_STEM):
|
|
|
|
|
condition, working_on = clean_condition(text.removeprefix(NEED_STEM).rstrip("?"))
|
|
|
|
|
if working_on:
|
|
|
|
|
variant = WORKING_ON_VARIANTS[occurrence_index % len(WORKING_ON_VARIANTS)]
|
|
|
|
|
return variant.format(topic=condition)
|
|
|
|
|
variant = NEED_WHEN_VARIANTS[occurrence_index % len(NEED_WHEN_VARIANTS)]
|
|
|
|
|
return variant.format(condition=condition)
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def duplicate_text_rewrite(q: dict[str, Any], duplicate_index: int) -> str:
|
|
|
|
|
text = q["text"]
|
|
|
|
|
options = option_texts(q)
|
|
|
|
|
qtype = q.get("type", "written")
|
|
|
|
|
|
|
|
|
|
if qtype == "this_or_that" and len(options) >= 2:
|
|
|
|
|
candidate = f"Which fits this choice better for you: {short_options(options)}?"
|
|
|
|
|
if duplicate_index % 2:
|
|
|
|
|
candidate = f"Which would you choose first here: {short_options(options)}?"
|
|
|
|
|
return candidate
|
|
|
|
|
|
|
|
|
|
if options:
|
|
|
|
|
option_hint = short_options(options)
|
|
|
|
|
if qtype == "multi_choice":
|
|
|
|
|
return f"Which answers fit best here: {option_hint}?"
|
|
|
|
|
return f"Which answer fits best here: {option_hint}?"
|
|
|
|
|
|
|
|
|
|
base = trim_question_mark(text)
|
|
|
|
|
suffix = GENERIC_DUPLICATE_SUFFIXES[duplicate_index % len(GENERIC_DUPLICATE_SUFFIXES)]
|
|
|
|
|
return f"{base} {suffix}?"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def delete_full_duplicates(files: dict[Path, dict[str, Any]]) -> set[str]:
|
|
|
|
|
groups: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list)
|
|
|
|
|
for data in files.values():
|
|
|
|
|
for q in data.get("questions", []):
|
|
|
|
|
text = q.get("text", "")
|
|
|
|
|
if text:
|
|
|
|
|
groups[(normalize_text(text), question_payload(q))].append(q)
|
|
|
|
|
|
|
|
|
|
delete_ids: set[str] = set()
|
|
|
|
|
for rows in groups.values():
|
|
|
|
|
if len(rows) <= 1:
|
|
|
|
|
continue
|
|
|
|
|
for q in sorted(rows, key=lambda item: item["id"])[1:]:
|
|
|
|
|
delete_ids.add(q["id"])
|
|
|
|
|
|
|
|
|
|
if not delete_ids:
|
|
|
|
|
return delete_ids
|
|
|
|
|
|
|
|
|
|
for data in files.values():
|
|
|
|
|
data["questions"] = [q for q in data.get("questions", []) if q["id"] not in delete_ids]
|
|
|
|
|
return delete_ids
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def polish_text(text: str) -> str:
|
|
|
|
|
for old, new in POLISH_TEXT_REPLACEMENTS.items():
|
|
|
|
|
text = text.replace(old, new)
|
|
|
|
|
return text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def polish_all_texts(files: dict[Path, dict[str, Any]]) -> dict[str, str]:
|
|
|
|
|
rewrites: dict[str, str] = {}
|
|
|
|
|
for data in files.values():
|
|
|
|
|
for q in data.get("questions", []):
|
|
|
|
|
text = q.get("text")
|
|
|
|
|
if not text:
|
|
|
|
|
continue
|
|
|
|
|
polished = polish_text(text)
|
|
|
|
|
if polished != text:
|
|
|
|
|
q["text"] = polished
|
|
|
|
|
rewrites[q["id"]] = polished
|
|
|
|
|
return rewrites
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rewrite_template_overflow(files: dict[Path, dict[str, Any]]) -> dict[str, str]:
|
|
|
|
|
rewrites: dict[str, str] = {}
|
|
|
|
|
by_stem: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
|
|
|
|
|
|
|
|
|
for data in files.values():
|
|
|
|
|
for q in data.get("questions", []):
|
|
|
|
|
normalized = normalize_text(q.get("text", ""))
|
|
|
|
|
for stem in TEMPLATE_STEMS:
|
|
|
|
|
if normalized.startswith(stem):
|
|
|
|
|
by_stem[stem].append(q)
|
|
|
|
|
break
|
|
|
|
|
|
2026-06-19 00:38:43 -05:00
|
|
|
taken = {
|
|
|
|
|
normalize_text(q["text"])
|
|
|
|
|
for data in files.values()
|
|
|
|
|
for q in data.get("questions", [])
|
|
|
|
|
if q.get("text")
|
|
|
|
|
}
|
2026-06-19 00:35:00 -05:00
|
|
|
|
|
|
|
|
for stem, rows in by_stem.items():
|
|
|
|
|
rows = sorted(rows, key=lambda item: item["id"])
|
|
|
|
|
for index, q in enumerate(rows):
|
|
|
|
|
if index < MAX_TEMPLATE_STEM_ROWS:
|
|
|
|
|
continue
|
|
|
|
|
taken.discard(normalize_text(q["text"]))
|
|
|
|
|
candidate = template_rewrite(q, index)
|
|
|
|
|
if not candidate:
|
|
|
|
|
continue
|
|
|
|
|
candidate = unique_candidate(candidate, taken)
|
|
|
|
|
q["text"] = candidate
|
|
|
|
|
rewrites[q["id"]] = candidate
|
|
|
|
|
taken.add(normalize_text(candidate))
|
|
|
|
|
|
|
|
|
|
return rewrites
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rewrite_remaining_duplicate_texts(files: dict[Path, dict[str, Any]]) -> dict[str, str]:
|
|
|
|
|
rewrites: dict[str, str] = {}
|
|
|
|
|
rows = [q for data in files.values() for q in data.get("questions", []) if q.get("text")]
|
|
|
|
|
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
|
|
|
|
for q in rows:
|
|
|
|
|
groups[normalize_text(q["text"])].append(q)
|
|
|
|
|
|
|
|
|
|
taken = {normalize_text(q["text"]) for q in rows}
|
|
|
|
|
|
|
|
|
|
for group in groups.values():
|
|
|
|
|
if len(group) <= 1:
|
|
|
|
|
continue
|
|
|
|
|
for duplicate_index, q in enumerate(sorted(group, key=lambda item: item["id"])[1:], start=1):
|
|
|
|
|
taken.discard(normalize_text(q["text"]))
|
|
|
|
|
candidate = duplicate_text_rewrite(q, duplicate_index)
|
|
|
|
|
candidate = unique_candidate(candidate, taken)
|
|
|
|
|
q["text"] = candidate
|
|
|
|
|
rewrites[q["id"]] = candidate
|
|
|
|
|
taken.add(normalize_text(candidate))
|
|
|
|
|
|
|
|
|
|
return rewrites
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def migrate_db(delete_ids: set[str], rewrites: dict[str, str]) -> int:
|
|
|
|
|
con = sqlite3.connect(DB_PATH)
|
|
|
|
|
try:
|
|
|
|
|
cur = con.cursor()
|
|
|
|
|
changed = 0
|
|
|
|
|
for qid in sorted(delete_ids):
|
|
|
|
|
cur.execute("DELETE FROM question WHERE id=?", (qid,))
|
|
|
|
|
changed += cur.rowcount
|
|
|
|
|
for qid, text in sorted(rewrites.items()):
|
|
|
|
|
cur.execute("UPDATE question SET text=? WHERE id=?", (text, qid))
|
|
|
|
|
changed += cur.rowcount
|
|
|
|
|
con.commit()
|
|
|
|
|
return changed
|
|
|
|
|
finally:
|
|
|
|
|
con.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> None:
|
|
|
|
|
files, _ = load_json_files()
|
|
|
|
|
delete_ids = delete_full_duplicates(files)
|
|
|
|
|
template_rewrites = rewrite_template_overflow(files)
|
|
|
|
|
duplicate_rewrites = rewrite_remaining_duplicate_texts(files)
|
|
|
|
|
polish_rewrites = polish_all_texts(files)
|
|
|
|
|
rewrites = {**template_rewrites, **duplicate_rewrites, **polish_rewrites}
|
|
|
|
|
|
|
|
|
|
save_json_files(files)
|
|
|
|
|
db_changed = migrate_db(delete_ids, rewrites)
|
|
|
|
|
|
|
|
|
|
print(f"Deleted fully duplicated rows: {len(delete_ids)}")
|
|
|
|
|
print(f"Template-stem rewrites: {len(template_rewrites)}")
|
|
|
|
|
print(f"Duplicate-text rewrites: {len(duplicate_rewrites)}")
|
|
|
|
|
print(f"Polish rewrites: {len(polish_rewrites)}")
|
|
|
|
|
print(f"DB rows changed: {db_changed}")
|
|
|
|
|
|
|
|
|
|
con = sqlite3.connect(DB_PATH)
|
|
|
|
|
try:
|
|
|
|
|
room_hash = con.execute("SELECT identity_hash FROM room_master_table").fetchone()[0]
|
|
|
|
|
active_count = con.execute("SELECT COUNT(*) FROM question WHERE status='active'").fetchone()[0]
|
|
|
|
|
finally:
|
|
|
|
|
con.close()
|
|
|
|
|
print(f"Room hash: {room_hash}")
|
|
|
|
|
print(f"Active questions: {active_count}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
main()
|