#!/usr/bin/env python3 """ Build SQLite database from question JSON files for Room asset loading. This script reads JSON question files and creates a pre-seeded SQLite database that can be bundled with the APK and loaded by Room via createFromAsset(). """ import json import sqlite3 import os from pathlib import Path from typing import Dict, List, Any from validate_question_variety import load_json_records, validate_records def load_json_file(filepath: str) -> Dict[str, Any]: """Load and parse a JSON file.""" with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) def get_category_id_from_filename(filename: str) -> str: """Extract category id from filename.""" # Handle both v1 and v2 filenames # examples: questions_communication---...json, questions_communication_v2---...json basename = os.path.basename(filename) if '_v2---' in basename: return basename.split('_v2---')[0].replace('questions_', '') elif '---' in basename: return basename.split('---')[0].replace('questions_', '') return 'unknown' def build_database(json_dir: str, output_path: str) -> None: """Build SQLite database from JSON files.""" variety_errors = validate_records(load_json_records(Path(json_dir)), "JSON") if variety_errors: raise ValueError( "Question variety check failed:\n" + "\n".join(variety_errors) ) # Ensure output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) # Remove existing database if present if os.path.exists(output_path): os.remove(output_path) # Connect to new database conn = sqlite3.connect(output_path) cursor = conn.cursor() # Create tables with Room-compatible schema # Question table cursor.execute(''' CREATE TABLE Question ( id TEXT NOT NULL, text TEXT NOT NULL, category_id TEXT NOT NULL, depth_level INTEGER NOT NULL, is_premium INTEGER NOT NULL, type TEXT NOT NULL, tags TEXT NOT NULL, answer_config TEXT NOT NULL, pack_id TEXT, created_at INTEGER NOT NULL, status TEXT NOT NULL, sex TEXT, PRIMARY KEY (id) ) ''') # QuestionCategory table cursor.execute(''' CREATE TABLE QuestionCategory ( id TEXT NOT NULL, display_name TEXT NOT NULL, description TEXT NOT NULL, access TEXT NOT NULL, icon_name TEXT NOT NULL, PRIMARY KEY (id) ) ''') # Create indexes cursor.execute(''' CREATE INDEX idx_question_category_id ON Question(category_id) ''') # Process each JSON file (support both prefixed and clean filenames) json_files = list(Path(json_dir).glob('*.json')) total_questions = 0 categories_processed = set() for json_file in json_files: print(f"\nProcessing: {json_file.name}") try: data = load_json_file(str(json_file)) except json.JSONDecodeError as e: print(f" ❌ Invalid JSON: {e}") continue except Exception as e: print(f" ❌ Error reading file: {e}") continue # Extract category info category_data = data.get('category', {}) category_id = category_data.get('id', get_category_id_from_filename(str(json_file))) category_display_name = category_data.get('display_name', category_id) category_description = category_data.get('description', '') category_access = category_data.get('access', 'free') category_icon = category_data.get('icon_name', 'question') # Insert category (ignore duplicates) cursor.execute(''' INSERT OR IGNORE INTO QuestionCategory (id, display_name, description, access, icon_name) VALUES (?, ?, ?, ?, ?) ''', (category_id, category_display_name, category_description, category_access, category_icon)) if category_id not in categories_processed: categories_processed.add(category_id) print(f" Category: {category_display_name} (id: {category_id})") # Insert questions questions = data.get('questions', []) questions_inserted = 0 for q in questions: question_id = q.get('id') text = q.get('text', '') category_id_q = q.get('category_id', category_id) depth_level = q.get('depth', q.get('depth_level', 1)) is_premium = q.get('access', 'free') == 'premium' question_type = q.get('type', 'written') tags = q.get('tags', []) # Handle answer_config - check both patterns answer_config = q.get('answer_config', {}) options = q.get('options', []) # Alternative location for choice types # Build answer_config JSON ac_json = {} if question_type == 'written': ac_json = { 'type': 'written', 'config': answer_config if answer_config else { 'minLength': 1, 'maxLength': 1000, 'placeholder': 'Write your answer...' } } elif question_type == 'single_choice': if options: ac_json = { 'type': 'single_choice', 'config': {'options': options} } elif answer_config: ac_json = { 'type': 'single_choice', 'config': answer_config } elif question_type == 'multi_choice': if options: ac_json = { 'type': 'multi_choice', 'config': {'options': options} } elif answer_config: ac_json = { 'type': 'multi_choice', 'config': answer_config } elif question_type == 'scale': ac_json = { 'type': 'scale', 'config': answer_config if answer_config else { 'minScale': 1, 'maxScale': 5, 'minLabel': 'Disagree', 'maxLabel': 'Agree' } } elif question_type == 'this_or_that': if options: ac_json = { 'type': 'this_or_that', 'config': { 'optionA': options[0] if len(options) > 0 else {'id': 'a', 'text': ''}, 'optionB': options[1] if len(options) > 1 else {'id': 'b', 'text': ''} } } elif answer_config: ac_json = { 'type': 'this_or_that', 'config': answer_config } # Convert tags and answer_config to JSON strings tags_json = json.dumps(tags, separators=(',', ':')) ac_json_str = json.dumps(ac_json, separators=(',', ':')) # Extract optional sex field (used for Desire Sync filtering) sex = q.get('sex') # Insert question cursor.execute(''' INSERT OR REPLACE INTO Question (id, text, category_id, depth_level, is_premium, type, tags, answer_config, created_at, status, sex) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( question_id, text, category_id_q, depth_level, 1 if is_premium else 0, question_type, tags_json, ac_json_str, int(json_file.stat().st_mtime), # Use file mtime as created_at 'active', sex )) questions_inserted += 1 total_questions += 1 free_count = sum(1 for q in questions if q.get('access', 'free') == 'free') premium_count = sum(1 for q in questions if q.get('access', 'free') == 'premium') print(f" Questions: {questions_inserted} ({free_count} free, {premium_count} premium)") # Commit and close conn.commit() conn.close() print(f"\n{'='*60}") print(f"Database built: {output_path}") print(f"Categories: {len(categories_processed)}") print(f"Total questions: {total_questions}") print(f"{'='*60}") def main(): """Main entry point.""" # Paths script_dir = Path(__file__).parent json_dir = script_dir / 'questions' output_dir = script_dir.parent / 'app' / 'src' / 'main' / 'assets' / 'database' output_path = output_dir / 'app.db' print("Building SQLite database from question JSON files...") print(f"JSON directory: {json_dir}") print(f"Output path: {output_path}") # Verify JSON directory exists if not json_dir.exists(): print(f"❌ JSON directory not found: {json_dir}") return if not list(json_dir.glob('*.json')): print(f"❌ No question JSON files found in: {json_dir}") return # Build database build_database(str(json_dir), str(output_path)) if __name__ == '__main__': main()