Closer/seed/improve_question_variety_q1.py

373 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Q1 - improve question-bank variety.
This one-off migration edits BOTH the source JSON files and the shipped asset
DB. It only changes row content:
* delete fully duplicated question rows where text + answer payload match
* rewrite remaining duplicate visible texts so each prompt is unique
* rewrite over-cap template stems into varied, topic-specific prompts
Room schema and identity hash are untouched. build_db.py is NOT run.
"""
from __future__ import annotations
import json
import sqlite3
from collections import defaultdict
from pathlib import Path
from typing import Any
from validate_question_variety import MAX_TEMPLATE_STEM_ROWS, TEMPLATE_STEMS, normalize_text
ROOT = Path(__file__).resolve().parents[1]
QUESTIONS_DIR = ROOT / "seed" / "questions"
DB_PATH = ROOT / "app" / "src" / "main" / "assets" / "database" / "app.db"
UNDERSTOOD_STEM = "What do you wish I understood about "
NEED_STEM = "What do you need from me when "
UNDERSTOOD_TOPIC_FIXES = {
"your growth we have made": "the growth we have made",
"your forgiveness": "the forgiveness you offer",
"your support through stress": "the support you give through stress",
"your shared resilience": "our shared resilience",
"your ways we repair": "the ways we repair",
"your feeling chosen": "feeling chosen",
"your mutual effort": "our mutual effort",
"your a past hard season": "a past hard season",
"your future gratitude": "future gratitude",
}
UNDERSTOOD_VARIANTS = (
"What is one thing you want me to understand about {topic}?",
"What feels hardest to explain about {topic}?",
"What would help me see {topic} with more care?",
"What do you want me to notice about {topic}?",
"Where do you feel least understood around {topic}?",
"What part of {topic} do you rarely get to say out loud?",
"How can I better honor what {topic} brings up for you?",
"What question do you wish I would ask about {topic}?",
)
NEED_WHEN_VARIANTS = (
"When {condition}, what support from me would help most?",
"When {condition}, what should I do more of?",
"When {condition}, what should I avoid doing?",
"When {condition}, what reassurance helps most?",
"When {condition}, what would help you feel less alone?",
"When {condition}, what small action from me would matter?",
)
WORKING_ON_VARIANTS = (
"While we're working on {topic}, what support from me would help most?",
"While we're working on {topic}, what should I do more of?",
"While we're working on {topic}, what should I avoid doing?",
"While we're working on {topic}, what reassurance helps most?",
"While we're working on {topic}, what would help us stay connected?",
"While we're working on {topic}, what small action from me would matter?",
)
GENERIC_DUPLICATE_SUFFIXES = (
"right now",
"these days",
"in this season",
"when it matters most",
"without overthinking it",
)
POLISH_TEXT_REPLACEMENTS = {
"consistent actions feels fragile": "consistent actions feel fragile",
"boundaries after rupture feels fragile": "boundaries after rupture feel fragile",
"triggered moments feels fragile": "triggered moments feel fragile",
"trust timelines feels fragile": "trust timelines feel fragile",
"questions after betrayal feels fragile": "questions after betrayal feel fragile",
"trust deposits feels fragile": "trust deposits feel fragile",
"broken promises feels fragile": "broken promises feel fragile",
"relationship agreements feels fragile": "relationship agreements feel fragile",
"what your support through stress brings up": "what the support you give through stress brings up",
"your feeling chosen": "feeling chosen",
"your mutual effort": "our mutual effort",
}
def canonical_json(value: Any) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def question_payload(q: dict[str, Any]) -> str:
payload = {
"type": q.get("type", "written"),
"answer_config": q.get("answer_config", {}),
"options": q.get("options", []),
}
return canonical_json(payload)
def load_json_files() -> tuple[dict[Path, dict[str, Any]], dict[str, tuple[Path, dict[str, Any]]]]:
files: dict[Path, dict[str, Any]] = {}
by_id: dict[str, tuple[Path, dict[str, Any]]] = {}
for path in sorted(QUESTIONS_DIR.glob("*.json")):
with path.open() as f:
data = json.load(f)
files[path] = data
for q in data.get("questions", []):
by_id[q["id"]] = (path, q)
return files, by_id
def save_json_files(files: dict[Path, dict[str, Any]]) -> None:
for path, data in files.items():
with path.open("w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
def clean_topic(topic: str) -> str:
topic = topic.strip()
return UNDERSTOOD_TOPIC_FIXES.get(topic, topic)
def clean_condition(condition: str) -> tuple[str, bool]:
condition = condition.strip()
if condition.startswith("we are trying "):
return condition.removeprefix("we are trying ").strip(), True
return condition, False
def trim_question_mark(text: str) -> str:
return text.rstrip().removesuffix("?").strip()
def option_texts(q: dict[str, Any]) -> list[str]:
if q.get("options"):
return [opt.get("text", "") for opt in q["options"] if opt.get("text")]
answer_config = q.get("answer_config") or {}
config = answer_config.get("config", answer_config)
options = config.get("options") or []
if options:
return [opt.get("text", "") for opt in options if opt.get("text")]
option_a = config.get("optionA")
option_b = config.get("optionB")
if option_a and option_b:
return [option_a.get("text", ""), option_b.get("text", "")]
return []
def short_options(options: list[str]) -> str:
clean = [opt.strip().lower() for opt in options if opt.strip()]
if len(clean) >= 2:
return f"{clean[0]} or {clean[1]}"
if clean:
return clean[0]
return "the choices"
def unique_candidate(base: str, taken: set[str]) -> str:
normalized = normalize_text(base)
if normalized not in taken:
return base
for suffix in GENERIC_DUPLICATE_SUFFIXES:
candidate = f"{trim_question_mark(base)} {suffix}?"
if normalize_text(candidate) not in taken:
return candidate
n = 2
while True:
candidate = f"{trim_question_mark(base)} ({n})?"
if normalize_text(candidate) not in taken:
return candidate
n += 1
def template_rewrite(q: dict[str, Any], occurrence_index: int) -> str | None:
text = q["text"]
if text.startswith(UNDERSTOOD_STEM):
topic = clean_topic(text.removeprefix(UNDERSTOOD_STEM).rstrip("?"))
variant = UNDERSTOOD_VARIANTS[occurrence_index % len(UNDERSTOOD_VARIANTS)]
return variant.format(topic=topic)
if text.startswith(NEED_STEM):
condition, working_on = clean_condition(text.removeprefix(NEED_STEM).rstrip("?"))
if working_on:
variant = WORKING_ON_VARIANTS[occurrence_index % len(WORKING_ON_VARIANTS)]
return variant.format(topic=condition)
variant = NEED_WHEN_VARIANTS[occurrence_index % len(NEED_WHEN_VARIANTS)]
return variant.format(condition=condition)
return None
def duplicate_text_rewrite(q: dict[str, Any], duplicate_index: int) -> str:
text = q["text"]
options = option_texts(q)
qtype = q.get("type", "written")
if qtype == "this_or_that" and len(options) >= 2:
candidate = f"Which fits this choice better for you: {short_options(options)}?"
if duplicate_index % 2:
candidate = f"Which would you choose first here: {short_options(options)}?"
return candidate
if options:
option_hint = short_options(options)
if qtype == "multi_choice":
return f"Which answers fit best here: {option_hint}?"
return f"Which answer fits best here: {option_hint}?"
base = trim_question_mark(text)
suffix = GENERIC_DUPLICATE_SUFFIXES[duplicate_index % len(GENERIC_DUPLICATE_SUFFIXES)]
return f"{base} {suffix}?"
def delete_full_duplicates(files: dict[Path, dict[str, Any]]) -> set[str]:
groups: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list)
for data in files.values():
for q in data.get("questions", []):
text = q.get("text", "")
if text:
groups[(normalize_text(text), question_payload(q))].append(q)
delete_ids: set[str] = set()
for rows in groups.values():
if len(rows) <= 1:
continue
for q in sorted(rows, key=lambda item: item["id"])[1:]:
delete_ids.add(q["id"])
if not delete_ids:
return delete_ids
for data in files.values():
data["questions"] = [q for q in data.get("questions", []) if q["id"] not in delete_ids]
return delete_ids
def polish_text(text: str) -> str:
for old, new in POLISH_TEXT_REPLACEMENTS.items():
text = text.replace(old, new)
return text
def polish_all_texts(files: dict[Path, dict[str, Any]]) -> dict[str, str]:
rewrites: dict[str, str] = {}
for data in files.values():
for q in data.get("questions", []):
text = q.get("text")
if not text:
continue
polished = polish_text(text)
if polished != text:
q["text"] = polished
rewrites[q["id"]] = polished
return rewrites
def rewrite_template_overflow(files: dict[Path, dict[str, Any]]) -> dict[str, str]:
rewrites: dict[str, str] = {}
by_stem: dict[str, list[dict[str, Any]]] = defaultdict(list)
for data in files.values():
for q in data.get("questions", []):
normalized = normalize_text(q.get("text", ""))
for stem in TEMPLATE_STEMS:
if normalized.startswith(stem):
by_stem[stem].append(q)
break
taken = {
normalize_text(q["text"])
for data in files.values()
for q in data.get("questions", [])
if q.get("text")
}
for stem, rows in by_stem.items():
rows = sorted(rows, key=lambda item: item["id"])
for index, q in enumerate(rows):
if index < MAX_TEMPLATE_STEM_ROWS:
continue
taken.discard(normalize_text(q["text"]))
candidate = template_rewrite(q, index)
if not candidate:
continue
candidate = unique_candidate(candidate, taken)
q["text"] = candidate
rewrites[q["id"]] = candidate
taken.add(normalize_text(candidate))
return rewrites
def rewrite_remaining_duplicate_texts(files: dict[Path, dict[str, Any]]) -> dict[str, str]:
rewrites: dict[str, str] = {}
rows = [q for data in files.values() for q in data.get("questions", []) if q.get("text")]
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
for q in rows:
groups[normalize_text(q["text"])].append(q)
taken = {normalize_text(q["text"]) for q in rows}
for group in groups.values():
if len(group) <= 1:
continue
for duplicate_index, q in enumerate(sorted(group, key=lambda item: item["id"])[1:], start=1):
taken.discard(normalize_text(q["text"]))
candidate = duplicate_text_rewrite(q, duplicate_index)
candidate = unique_candidate(candidate, taken)
q["text"] = candidate
rewrites[q["id"]] = candidate
taken.add(normalize_text(candidate))
return rewrites
def migrate_db(delete_ids: set[str], rewrites: dict[str, str]) -> int:
con = sqlite3.connect(DB_PATH)
try:
cur = con.cursor()
changed = 0
for qid in sorted(delete_ids):
cur.execute("DELETE FROM question WHERE id=?", (qid,))
changed += cur.rowcount
for qid, text in sorted(rewrites.items()):
cur.execute("UPDATE question SET text=? WHERE id=?", (text, qid))
changed += cur.rowcount
con.commit()
return changed
finally:
con.close()
def main() -> None:
files, _ = load_json_files()
delete_ids = delete_full_duplicates(files)
template_rewrites = rewrite_template_overflow(files)
duplicate_rewrites = rewrite_remaining_duplicate_texts(files)
polish_rewrites = polish_all_texts(files)
rewrites = {**template_rewrites, **duplicate_rewrites, **polish_rewrites}
save_json_files(files)
db_changed = migrate_db(delete_ids, rewrites)
print(f"Deleted fully duplicated rows: {len(delete_ids)}")
print(f"Template-stem rewrites: {len(template_rewrites)}")
print(f"Duplicate-text rewrites: {len(duplicate_rewrites)}")
print(f"Polish rewrites: {len(polish_rewrites)}")
print(f"DB rows changed: {db_changed}")
con = sqlite3.connect(DB_PATH)
try:
room_hash = con.execute("SELECT identity_hash FROM room_master_table").fetchone()[0]
active_count = con.execute("SELECT COUNT(*) FROM question WHERE status='active'").fetchone()[0]
finally:
con.close()
print(f"Room hash: {room_hash}")
print(f"Active questions: {active_count}")
if __name__ == "__main__":
main()