Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Ingestion script for Congressional Bioguide profiles. | |
| Creates SQLite database and FAISS semantic search index. | |
| """ | |
| import json | |
| import sqlite3 | |
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Any | |
| import faiss | |
| import numpy as np | |
| import pickle | |
| from sentence_transformers import SentenceTransformer | |
| class BioguideIngester: | |
| def __init__(self, data_dir: str = "BioguideProfiles", db_path: str = "congress.db"): | |
| self.data_dir = Path(data_dir) | |
| self.db_path = db_path | |
| self.model = None # Load model only when needed for FAISS indexing | |
| def create_database_schema(self): | |
| """Create SQLite database schema for Congressional profiles.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Main members table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS members ( | |
| bio_id TEXT PRIMARY KEY, | |
| family_name TEXT, | |
| given_name TEXT, | |
| middle_name TEXT, | |
| honorific_prefix TEXT, | |
| unaccented_family_name TEXT, | |
| unaccented_given_name TEXT, | |
| unaccented_middle_name TEXT, | |
| birth_date TEXT, | |
| birth_circa INTEGER, | |
| death_date TEXT, | |
| death_circa INTEGER, | |
| profile_text TEXT, | |
| full_name TEXT GENERATED ALWAYS AS ( | |
| COALESCE(honorific_prefix || ' ', '') || | |
| COALESCE(given_name, '') || ' ' || | |
| COALESCE(middle_name || ' ', '') || | |
| COALESCE(family_name, '') | |
| ) STORED | |
| ) | |
| """) | |
| # Images table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS images ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| bio_id TEXT, | |
| content_url TEXT, | |
| caption TEXT, | |
| FOREIGN KEY (bio_id) REFERENCES members(bio_id) | |
| ) | |
| """) | |
| # Job positions table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS job_positions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| bio_id TEXT, | |
| job_name TEXT, | |
| job_type TEXT, | |
| start_date TEXT, | |
| start_circa INTEGER, | |
| end_date TEXT, | |
| end_circa INTEGER, | |
| congress_number INTEGER, | |
| congress_name TEXT, | |
| party TEXT, | |
| caucus TEXT, | |
| region_type TEXT, | |
| region_code TEXT, | |
| note TEXT, | |
| FOREIGN KEY (bio_id) REFERENCES members(bio_id) | |
| ) | |
| """) | |
| # Relationships table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS relationships ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| bio_id TEXT, | |
| related_bio_id TEXT, | |
| relationship_type TEXT, | |
| FOREIGN KEY (bio_id) REFERENCES members(bio_id), | |
| FOREIGN KEY (related_bio_id) REFERENCES members(bio_id) | |
| ) | |
| """) | |
| # Creative works table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS creative_works ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| bio_id TEXT, | |
| citation_text TEXT, | |
| FOREIGN KEY (bio_id) REFERENCES members(bio_id) | |
| ) | |
| """) | |
| # Assets table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS assets ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| bio_id TEXT, | |
| name TEXT, | |
| asset_type TEXT, | |
| content_url TEXT, | |
| credit_line TEXT, | |
| accession_number TEXT, | |
| upload_date TEXT, | |
| FOREIGN KEY (bio_id) REFERENCES members(bio_id) | |
| ) | |
| """) | |
| # Create indexes for common queries | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_family_name ON members(unaccented_family_name)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_given_name ON members(unaccented_given_name)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_birth_date ON members(birth_date)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_death_date ON members(death_date)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_congress ON job_positions(congress_number)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_party ON job_positions(party)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_region ON job_positions(region_code)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_type ON job_positions(job_name)") | |
| conn.commit() | |
| conn.close() | |
| print("β Database schema created") | |
| def extract_data_field(self, data: Dict[str, Any], key: str, default=None): | |
| """Safely extract data from nested 'data' field if it exists.""" | |
| if 'data' in data: | |
| return data['data'].get(key, default) | |
| return data.get(key, default) | |
| def ingest_profiles(self): | |
| """Ingest all JSON profiles into SQLite database.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| profile_files = list(self.data_dir.glob("*.json")) | |
| total = len(profile_files) | |
| print(f"Ingesting {total} profiles...") | |
| for idx, profile_file in enumerate(profile_files, 1): | |
| if idx % 1000 == 0: | |
| print(f" Processed {idx}/{total} profiles...") | |
| try: | |
| with open(profile_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Handle nested 'data' structure | |
| bio_id = self.extract_data_field(data, 'usCongressBioId') | |
| if not bio_id: | |
| print(f" Skipping {profile_file}: no bio_id found") | |
| continue | |
| # Insert member data | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO members ( | |
| bio_id, family_name, given_name, middle_name, honorific_prefix, | |
| unaccented_family_name, unaccented_given_name, unaccented_middle_name, | |
| birth_date, birth_circa, death_date, death_circa, profile_text | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| bio_id, | |
| self.extract_data_field(data, 'familyName'), | |
| self.extract_data_field(data, 'givenName'), | |
| self.extract_data_field(data, 'middleName'), | |
| self.extract_data_field(data, 'honorificPrefix'), | |
| self.extract_data_field(data, 'unaccentedFamilyName'), | |
| self.extract_data_field(data, 'unaccentedGivenName'), | |
| self.extract_data_field(data, 'unaccentedMiddleName'), | |
| self.extract_data_field(data, 'birthDate'), | |
| 1 if self.extract_data_field(data, 'birthCirca') else 0, | |
| self.extract_data_field(data, 'deathDate'), | |
| 1 if self.extract_data_field(data, 'deathCirca') else 0, | |
| self.extract_data_field(data, 'profileText') | |
| )) | |
| # Insert images | |
| images = self.extract_data_field(data, 'image', []) | |
| for img in images: | |
| cursor.execute(""" | |
| INSERT INTO images (bio_id, content_url, caption) | |
| VALUES (?, ?, ?) | |
| """, (bio_id, img.get('contentUrl'), img.get('caption'))) | |
| # Insert job positions | |
| job_positions = self.extract_data_field(data, 'jobPositions', []) | |
| for job_pos in job_positions: | |
| job = job_pos.get('job', {}) | |
| congress_aff = job_pos.get('congressAffiliation', {}) | |
| congress = congress_aff.get('congress', {}) | |
| party_list = congress_aff.get('partyAffiliation', []) | |
| caucus_list = congress_aff.get('caucusAffiliation', []) | |
| represents = congress_aff.get('represents', {}) | |
| notes = congress_aff.get('note', []) | |
| note_text = notes[0].get('content') if notes else None | |
| cursor.execute(""" | |
| INSERT INTO job_positions ( | |
| bio_id, job_name, job_type, start_date, start_circa, | |
| end_date, end_circa, congress_number, congress_name, | |
| party, caucus, region_type, region_code, note | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| bio_id, | |
| job.get('name'), | |
| job.get('jobType'), | |
| job_pos.get('startDate'), | |
| 1 if job_pos.get('startCirca') else 0, | |
| job_pos.get('endDate'), | |
| 1 if job_pos.get('endCirca') else 0, | |
| congress.get('congressNumber'), | |
| congress.get('name'), | |
| party_list[0].get('party', {}).get('name') if party_list else None, | |
| caucus_list[0].get('party', {}).get('name') if caucus_list else None, | |
| represents.get('regionType'), | |
| represents.get('regionCode'), | |
| note_text | |
| )) | |
| # Insert relationships | |
| relationships = self.extract_data_field(data, 'relationship', []) | |
| for rel in relationships: | |
| related = rel.get('relatedTo', {}) | |
| cursor.execute(""" | |
| INSERT INTO relationships (bio_id, related_bio_id, relationship_type) | |
| VALUES (?, ?, ?) | |
| """, (bio_id, related.get('usCongressBioId'), rel.get('relationshipType'))) | |
| # Insert creative works | |
| creative_works = self.extract_data_field(data, 'creativeWork', []) | |
| for work in creative_works: | |
| cursor.execute(""" | |
| INSERT INTO creative_works (bio_id, citation_text) | |
| VALUES (?, ?) | |
| """, (bio_id, work.get('freeFormCitationText'))) | |
| # Insert assets | |
| assets = self.extract_data_field(data, 'asset', []) | |
| for asset in assets: | |
| cursor.execute(""" | |
| INSERT INTO assets ( | |
| bio_id, name, asset_type, content_url, credit_line, | |
| accession_number, upload_date | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| bio_id, | |
| asset.get('name'), | |
| asset.get('assetType'), | |
| asset.get('contentUrl'), | |
| asset.get('creditLine'), | |
| asset.get('accessionNumber'), | |
| asset.get('uploadDate') | |
| )) | |
| except Exception as e: | |
| print(f" Error processing {profile_file}: {e}") | |
| continue | |
| conn.commit() | |
| conn.close() | |
| print(f"β Ingested {total} profiles into database") | |
| def build_faiss_index(self): | |
| """Build FAISS index for semantic search on profile biographies.""" | |
| print("\n" + "=" * 60) | |
| print("BUILDING FAISS INDEX FOR SEMANTIC SEARCH") | |
| print("=" * 60) | |
| try: | |
| # Load model | |
| print("\n1. Loading sentence transformer model...") | |
| start_time = time.time() | |
| # Disable all parallelism to avoid Python 3.14 issues | |
| os.environ['TOKENIZERS_PARALLELISM'] = 'false' | |
| os.environ['OMP_NUM_THREADS'] = '1' | |
| os.environ['MKL_NUM_THREADS'] = '1' | |
| os.environ['OPENBLAS_NUM_THREADS'] = '1' | |
| import torch | |
| torch.set_num_threads(1) | |
| self.model = SentenceTransformer('all-MiniLM-L6-v2') | |
| print(f" β Model loaded in {time.time() - start_time:.3f}s") | |
| # Load biographies from database | |
| print("\n2. Loading biographies from database...") | |
| start_time = time.time() | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT bio_id, profile_text FROM members WHERE profile_text IS NOT NULL") | |
| profiles = cursor.fetchall() | |
| conn.close() | |
| print(f" β Loaded {len(profiles):,} biographies in {time.time() - start_time:.3f}s") | |
| if len(profiles) == 0: | |
| print("\nβ ERROR: No profiles with text found in database!") | |
| return False | |
| # Prepare data | |
| print("\n3. Preparing data for encoding...") | |
| start_time = time.time() | |
| bio_ids = [p[0] for p in profiles] | |
| texts = [p[1] if p[1] else "" for p in profiles] | |
| print(f" β Prepared {len(bio_ids):,} texts") | |
| print(f" β Time: {time.time() - start_time:.3f}s") | |
| # Generate embeddings in batches | |
| print("\n4. Generating embeddings...") | |
| start_time = time.time() | |
| batch_size = 32 | |
| embeddings = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch = texts[i:i + batch_size] | |
| batch_embeddings = self.model.encode( | |
| batch, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=False, | |
| device='cpu' # Explicit CPU to avoid GPU issues | |
| ) | |
| embeddings.extend(batch_embeddings) | |
| # Progress update every 100 batches | |
| if (i // batch_size + 1) % 100 == 0: | |
| elapsed = time.time() - start_time | |
| rate = (i + len(batch)) / elapsed | |
| print(f" Encoded {i + len(batch):,}/{len(texts):,} ({rate:.0f} texts/sec)") | |
| embeddings = np.array(embeddings, dtype=np.float32) | |
| elapsed = time.time() - start_time | |
| print(f" β Generated {len(embeddings):,} embeddings in {elapsed:.3f}s") | |
| print(f" β Shape: {embeddings.shape}") | |
| # Build FAISS index | |
| print("\n5. Building FAISS index...") | |
| start_time = time.time() | |
| dimension = embeddings.shape[1] | |
| print(f" Dimension: {dimension}") | |
| # Use IndexFlatIP for exact cosine similarity search | |
| index = faiss.IndexFlatIP(dimension) | |
| # Normalize embeddings for cosine similarity | |
| faiss.normalize_L2(embeddings) | |
| # Add to index | |
| index.add(embeddings) | |
| print(f" β Index built in {time.time() - start_time:.3f}s") | |
| print(f" β Total vectors in index: {index.ntotal:,}") | |
| # Save FAISS index | |
| print("\n6. Saving FAISS index to disk...") | |
| start_time = time.time() | |
| faiss.write_index(index, "congress_faiss.index") | |
| print(f" β Index saved to: congress_faiss.index") | |
| print(f" β Time: {time.time() - start_time:.3f}s") | |
| # Save note ID mapping | |
| print("\n7. Saving bio ID mapping...") | |
| start_time = time.time() | |
| with open("congress_bio_ids.pkl", "wb") as f: | |
| pickle.dump(bio_ids, f) | |
| print(f" β Mapping saved to: congress_bio_ids.pkl") | |
| print(f" β Time: {time.time() - start_time:.3f}s") | |
| # Get file sizes | |
| from pathlib import Path | |
| index_size_mb = Path("congress_faiss.index").stat().st_size / (1024**2) | |
| mapping_size_mb = Path("congress_bio_ids.pkl").stat().st_size / (1024**2) | |
| print("\n" + "=" * 60) | |
| print("FAISS INDEX BUILD COMPLETE") | |
| print("=" * 60) | |
| print(f"Total embeddings indexed: {len(bio_ids):,}") | |
| print(f"Index file size: {index_size_mb:.2f} MB") | |
| print(f"Mapping file size: {mapping_size_mb:.2f} MB") | |
| print(f"Total size: {index_size_mb + mapping_size_mb:.2f} MB") | |
| print("\nThe MCP server will load this index on startup for fast searches.") | |
| return True | |
| except Exception as e: | |
| print(f"\nβ ERROR building FAISS index: {e}") | |
| print(f" This may be due to Python 3.14 compatibility issues.") | |
| print(f" The database is still usable, but semantic search will not work.") | |
| print(f" Consider using Python 3.11 or 3.12 for full functionality.") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def run(self): | |
| """Run the complete ingestion pipeline.""" | |
| print("Starting Congressional Bioguide ingestion...") | |
| print("=" * 60) | |
| try: | |
| self.create_database_schema() | |
| self.ingest_profiles() | |
| faiss_success = self.build_faiss_index() | |
| print("\n" + "=" * 60) | |
| print("INGESTION COMPLETE") | |
| print("=" * 60) | |
| print(f"Database: {self.db_path}") | |
| if faiss_success: | |
| print(f"FAISS index: congress_faiss.index β") | |
| print(f"ID mapping: congress_bio_ids.pkl β") | |
| print("\nAll features available, including semantic search!") | |
| else: | |
| print(f"FAISS index: β (failed to build)") | |
| print("\nDatabase is ready, but semantic search is unavailable.") | |
| print("All other MCP tools will work normally.") | |
| return faiss_success | |
| except Exception as e: | |
| print(f"\nβ FATAL ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def main(): | |
| ingester = BioguideIngester() | |
| ingester.run() | |
| if __name__ == "__main__": | |
| main() | |