Closer/seed/build_db.py

277 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Build SQLite database from question JSON files for Room asset loading.
This script reads JSON question files and creates a pre-seeded SQLite database
that can be bundled with the APK and loaded by Room via createFromAsset().
"""
import json
import sqlite3
import os
from pathlib import Path
from typing import Dict, List, Any
from validate_question_variety import load_json_records, validate_records
def load_json_file(filepath: str) -> Dict[str, Any]:
"""Load and parse a JSON file."""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
def get_category_id_from_filename(filename: str) -> str:
"""Extract category id from filename."""
# Handle both v1 and v2 filenames
# examples: questions_communication---...json, questions_communication_v2---...json
basename = os.path.basename(filename)
if '_v2---' in basename:
return basename.split('_v2---')[0].replace('questions_', '')
elif '---' in basename:
return basename.split('---')[0].replace('questions_', '')
return 'unknown'
def build_database(json_dir: str, output_path: str) -> None:
"""Build SQLite database from JSON files."""
variety_errors = validate_records(load_json_records(Path(json_dir)), "JSON")
if variety_errors:
raise ValueError(
"Question variety check failed:\n" + "\n".join(variety_errors)
)
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Remove existing database if present
if os.path.exists(output_path):
os.remove(output_path)
# Connect to new database
conn = sqlite3.connect(output_path)
cursor = conn.cursor()
# Create tables with Room-compatible schema
# Question table
cursor.execute('''
CREATE TABLE Question (
id TEXT NOT NULL,
text TEXT NOT NULL,
category_id TEXT NOT NULL,
depth_level INTEGER NOT NULL,
is_premium INTEGER NOT NULL,
type TEXT NOT NULL,
tags TEXT NOT NULL,
answer_config TEXT NOT NULL,
pack_id TEXT,
created_at INTEGER NOT NULL,
status TEXT NOT NULL,
sex TEXT,
PRIMARY KEY (id)
)
''')
# QuestionCategory table
cursor.execute('''
CREATE TABLE QuestionCategory (
id TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT NOT NULL,
access TEXT NOT NULL,
icon_name TEXT NOT NULL,
PRIMARY KEY (id)
)
''')
# Create indexes
cursor.execute('''
CREATE INDEX idx_question_category_id ON Question(category_id)
''')
# Process each JSON file (support both prefixed and clean filenames)
json_files = list(Path(json_dir).glob('*.json'))
total_questions = 0
categories_processed = set()
for json_file in json_files:
print(f"\nProcessing: {json_file.name}")
try:
data = load_json_file(str(json_file))
except json.JSONDecodeError as e:
print(f" ❌ Invalid JSON: {e}")
continue
except Exception as e:
print(f" ❌ Error reading file: {e}")
continue
# Extract category info
category_data = data.get('category', {})
category_id = category_data.get('id', get_category_id_from_filename(str(json_file)))
category_display_name = category_data.get('display_name', category_id)
category_description = category_data.get('description', '')
category_access = category_data.get('access', 'free')
category_icon = category_data.get('icon_name', 'question')
# Insert category (ignore duplicates)
cursor.execute('''
INSERT OR IGNORE INTO QuestionCategory
(id, display_name, description, access, icon_name)
VALUES (?, ?, ?, ?, ?)
''', (category_id, category_display_name, category_description, category_access, category_icon))
if category_id not in categories_processed:
categories_processed.add(category_id)
print(f" Category: {category_display_name} (id: {category_id})")
# Insert questions
questions = data.get('questions', [])
questions_inserted = 0
for q in questions:
question_id = q.get('id')
text = q.get('text', '')
category_id_q = q.get('category_id', category_id)
depth_level = q.get('depth', q.get('depth_level', 1))
is_premium = q.get('access', 'free') == 'premium'
question_type = q.get('type', 'written')
tags = q.get('tags', [])
# Handle answer_config - check both patterns
answer_config = q.get('answer_config', {})
options = q.get('options', []) # Alternative location for choice types
# Build answer_config JSON
ac_json = {}
if question_type == 'written':
ac_json = {
'type': 'written',
'config': answer_config if answer_config else {
'minLength': 1,
'maxLength': 1000,
'placeholder': 'Write your answer...'
}
}
elif question_type == 'single_choice':
if options:
ac_json = {
'type': 'single_choice',
'config': {'options': options}
}
elif answer_config:
ac_json = {
'type': 'single_choice',
'config': answer_config
}
elif question_type == 'multi_choice':
if options:
ac_json = {
'type': 'multi_choice',
'config': {'options': options}
}
elif answer_config:
ac_json = {
'type': 'multi_choice',
'config': answer_config
}
elif question_type == 'scale':
ac_json = {
'type': 'scale',
'config': answer_config if answer_config else {
'minScale': 1,
'maxScale': 5,
'minLabel': 'Disagree',
'maxLabel': 'Agree'
}
}
elif question_type == 'this_or_that':
if options:
ac_json = {
'type': 'this_or_that',
'config': {
'optionA': options[0] if len(options) > 0 else {'id': 'a', 'text': ''},
'optionB': options[1] if len(options) > 1 else {'id': 'b', 'text': ''}
}
}
elif answer_config:
ac_json = {
'type': 'this_or_that',
'config': answer_config
}
# Convert tags and answer_config to JSON strings
tags_json = json.dumps(tags, separators=(',', ':'))
ac_json_str = json.dumps(ac_json, separators=(',', ':'))
# Extract optional sex field (used for Desire Sync filtering)
sex = q.get('sex')
# Insert question
cursor.execute('''
INSERT OR REPLACE INTO Question
(id, text, category_id, depth_level, is_premium, type, tags, answer_config,
created_at, status, sex)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
question_id,
text,
category_id_q,
depth_level,
1 if is_premium else 0,
question_type,
tags_json,
ac_json_str,
int(json_file.stat().st_mtime), # Use file mtime as created_at
'active',
sex
))
questions_inserted += 1
total_questions += 1
free_count = sum(1 for q in questions if q.get('access', 'free') == 'free')
premium_count = sum(1 for q in questions if q.get('access', 'free') == 'premium')
print(f" Questions: {questions_inserted} ({free_count} free, {premium_count} premium)")
# Commit and close
conn.commit()
conn.close()
print(f"\n{'='*60}")
print(f"Database built: {output_path}")
print(f"Categories: {len(categories_processed)}")
print(f"Total questions: {total_questions}")
print(f"{'='*60}")
def main():
"""Main entry point."""
# Paths
script_dir = Path(__file__).parent
json_dir = script_dir / 'questions'
output_dir = script_dir.parent / 'app' / 'src' / 'main' / 'assets' / 'database'
output_path = output_dir / 'app.db'
print("Building SQLite database from question JSON files...")
print(f"JSON directory: {json_dir}")
print(f"Output path: {output_path}")
# Verify JSON directory exists
if not json_dir.exists():
print(f"❌ JSON directory not found: {json_dir}")
return
if not list(json_dir.glob('*.json')):
print(f"❌ No question JSON files found in: {json_dir}")
return
# Build database
build_database(str(json_dir), str(output_path))
if __name__ == '__main__':
main()