""" Data preprocessing script for LexiMind. Transforms raw datasets into standardized JSONL splits for training. Handles summarization, emotion classification, topic classification, and book paragraph extraction with text cleaning. Author: Oliver Perrin Date: December 2025 """ from __future__ import annotations import argparse import csv import json import sys from pathlib import Path from typing import Dict, Iterable, Iterator, Sequence, Tuple from sklearn.model_selection import train_test_split PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from src.data.preprocessing import BasicTextCleaner from src.utils.config import load_yaml def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Preprocess datasets configured for LexiMind") parser.add_argument( "--config", default="configs/data/datasets.yaml", help="Path to data configuration YAML.", ) parser.add_argument( "--val-ratio", type=float, default=0.1, help="Validation split size for topic dataset when no validation split is present.", ) parser.add_argument( "--seed", type=int, default=17, help="Random seed for deterministic splitting." ) return parser.parse_args() def _resolve_csv(base: Path, filename: str) -> Path | None: primary = base / filename if primary.exists(): return primary nested = base / "cnn_dailymail" / filename if nested.exists(): return nested return None def _write_jsonl(records: Iterable[Dict[str, object]], destination: Path) -> None: destination.parent.mkdir(parents=True, exist_ok=True) with destination.open("w", encoding="utf-8") as handle: for record in records: handle.write(json.dumps(record, ensure_ascii=False) + "\n") def _read_jsonl(path: Path) -> Iterator[Dict[str, object]]: with path.open("r", encoding="utf-8") as handle: for line in handle: row = line.strip() if not row: continue yield json.loads(row) def preprocess_books( raw_dir: Path, processed_dir: Path, cleaner: BasicTextCleaner, *, min_tokens: int = 30, ) -> None: if not raw_dir.exists(): print(f"Skipping book preprocessing (missing directory: {raw_dir})") return processed_dir.mkdir(parents=True, exist_ok=True) index: list[Dict[str, object]] = [] for book_path in sorted(raw_dir.glob("*.txt")): text = book_path.read_text(encoding="utf-8").lstrip("\ufeff") normalized = text.replace("\r\n", "\n") paragraphs = [ paragraph.strip() for paragraph in normalized.split("\n\n") if paragraph.strip() ] records: list[Dict[str, object]] = [] for paragraph_id, paragraph in enumerate(paragraphs): cleaned = cleaner.transform([paragraph])[0] tokens = cleaned.split() if len(tokens) < min_tokens: continue record = { "book": book_path.stem, "title": book_path.stem.replace("_", " ").title(), "paragraph_id": paragraph_id, "text": paragraph, "clean_text": cleaned, "token_count": len(tokens), "char_count": len(paragraph), } records.append(record) if not records: print(f"No suitably sized paragraphs found in {book_path}; skipping.") continue output_path = processed_dir / f"{book_path.stem}.jsonl" print(f"Writing book segments for '{book_path.stem}' to {output_path}") _write_jsonl(records, output_path) index.append( { "book": book_path.stem, "title": records[0]["title"], "paragraphs": len(records), "source": str(book_path), "output": str(output_path), } ) if index: index_path = processed_dir / "index.json" with index_path.open("w", encoding="utf-8") as handle: json.dump(index, handle, ensure_ascii=False, indent=2) print(f"Book index written to {index_path}") def preprocess_summarization(raw_dir: Path, processed_dir: Path) -> None: if not raw_dir.exists(): print(f"Skipping summarization preprocessing (missing directory: {raw_dir})") return for split in ("train", "validation", "test"): # Check for JSONL first (from new download script), then CSV (legacy) jsonl_path = raw_dir / f"{split}.jsonl" csv_path = _resolve_csv(raw_dir, f"{split}.csv") if jsonl_path.exists(): source_path = jsonl_path is_jsonl = True elif csv_path is not None: source_path = csv_path is_jsonl = False else: print(f"Skipping summarization split '{split}' (file not found)") continue output_path = processed_dir / f"{split}.jsonl" output_path.parent.mkdir(parents=True, exist_ok=True) print(f"Writing summarization split '{split}' to {output_path}") with output_path.open("w", encoding="utf-8") as sink: if is_jsonl: # Process JSONL format (from new download script) for row in _read_jsonl(source_path): source = str(row.get("source") or row.get("article") or "") summary = str(row.get("summary") or row.get("highlights") or "") if source and summary: payload = {"source": source.strip(), "summary": summary.strip()} sink.write(json.dumps(payload, ensure_ascii=False) + "\n") else: # Process CSV format (legacy) with source_path.open("r", encoding="utf-8", newline="") as source_handle: reader = csv.DictReader(source_handle) for row in reader: article = str(row.get("article") or row.get("Article") or "") highlights = str(row.get("highlights") or row.get("summary") or "") payload = {"source": article.strip(), "summary": highlights.strip()} sink.write(json.dumps(payload, ensure_ascii=False) + "\n") def preprocess_emotion(raw_dir: Path, processed_dir: Path, cleaner: BasicTextCleaner) -> None: if not raw_dir.exists(): print(f"Skipping emotion preprocessing (missing directory: {raw_dir})") return split_aliases: Dict[str, Sequence[str]] = { "train": ("train",), "val": ("val", "validation"), "test": ("test",), } for split, aliases in split_aliases.items(): source_path: Path | None = None for alias in aliases: for extension in ("jsonl", "txt", "csv"): candidate = raw_dir / f"{alias}.{extension}" if candidate.exists(): source_path = candidate break if source_path is not None: break if source_path is None: print(f"Skipping emotion split '{split}' (file not found)") continue assert source_path is not None path = source_path def iter_records(path: Path = path) -> Iterator[Dict[str, object]]: if path.suffix == ".jsonl": for row in _read_jsonl(path): raw_text = str(row.get("text", "")) text = cleaner.transform([raw_text])[0] labels = row.get("emotions") or row.get("labels") or [] if isinstance(labels, str): labels = [label.strip() for label in labels.split(",") if label.strip()] elif isinstance(labels, Sequence): labels = [str(label) for label in labels] else: labels = [str(labels)] if labels else [] if not labels: labels = ["neutral"] yield {"text": text, "emotions": labels} else: delimiter = ";" if path.suffix == ".txt" else "," with path.open("r", encoding="utf-8", newline="") as handle: reader = csv.reader(handle, delimiter=delimiter) for csv_row in reader: if not csv_row: continue raw_text = str(csv_row[0]) text = cleaner.transform([raw_text])[0] raw_labels = csv_row[1] if len(csv_row) > 1 else "" labels = [label.strip() for label in raw_labels.split(",") if label.strip()] if not labels: labels = ["neutral"] yield {"text": text, "emotions": labels} output_path = processed_dir / f"{split}.jsonl" print(f"Writing emotion split '{split}' to {output_path}") _write_jsonl(iter_records(), output_path) def preprocess_topic( raw_dir: Path, processed_dir: Path, cleaner: BasicTextCleaner, val_ratio: float, seed: int, ) -> None: if not raw_dir.exists(): print(f"Skipping topic preprocessing (missing directory: {raw_dir})") return def locate(*names: str) -> Path | None: for name in names: candidate = raw_dir / name if candidate.exists(): return candidate return None train_path = locate("train.jsonl", "train.csv") if train_path is None: print(f"Skipping topic preprocessing (missing train split in {raw_dir})") return assert train_path is not None def load_topic_rows(path: Path) -> list[Tuple[str, str]]: rows: list[Tuple[str, str]] = [] if path.suffix == ".jsonl": for record in _read_jsonl(path): text = str(record.get("text") or record.get("content") or "") topic = record.get("topic") or record.get("label") cleaned_text = cleaner.transform([text])[0] rows.append((cleaned_text, str(topic).strip())) else: with path.open("r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle) for row in reader: topic = row.get("Class Index") or row.get("topic") or row.get("label") title = str(row.get("Title") or "") description = str(row.get("Description") or row.get("text") or "") text = " ".join(filter(None, (title, description))) cleaned_text = cleaner.transform([text])[0] rows.append((cleaned_text, str(topic).strip())) return rows train_rows = load_topic_rows(train_path) if not train_rows: print("No topic training rows found; skipping topic preprocessing.") return texts = [row[0] for row in train_rows] topics = [row[1] for row in train_rows] validation_path = locate("val.jsonl", "validation.jsonl", "val.csv", "validation.csv") has_validation = validation_path is not None if has_validation and validation_path: val_rows = load_topic_rows(validation_path) train_records = train_rows else: train_texts, val_texts, train_topics, val_topics = train_test_split( texts, topics, test_size=val_ratio, random_state=seed, stratify=topics, ) train_records = list(zip(train_texts, train_topics, strict=False)) val_rows = list(zip(val_texts, val_topics, strict=False)) def to_records(pairs: Sequence[Tuple[str, str]]) -> Iterator[Dict[str, object]]: for text, topic in pairs: yield {"text": text, "topic": topic} print(f"Writing topic train split to {processed_dir / 'train.jsonl'}") _write_jsonl(to_records(train_records), processed_dir / "train.jsonl") print(f"Writing topic val split to {processed_dir / 'val.jsonl'}") _write_jsonl(to_records(val_rows), processed_dir / "val.jsonl") test_path = locate("test.jsonl", "test.csv") if test_path is not None: test_rows = load_topic_rows(test_path) print(f"Writing topic test split to {processed_dir / 'test.jsonl'}") _write_jsonl(to_records(test_rows), processed_dir / "test.jsonl") else: print(f"Skipping topic test split (missing test split in {raw_dir})") def main() -> None: args = parse_args() config = load_yaml(args.config).data raw_cfg = config.get("raw", {}) processed_cfg = config.get("processed", {}) books_raw = Path(raw_cfg.get("books", "data/raw/books")) summarization_raw = Path(raw_cfg.get("summarization", "data/raw/summarization")) emotion_raw = Path(raw_cfg.get("emotion", "data/raw/emotion")) topic_raw = Path(raw_cfg.get("topic", "data/raw/topic")) books_processed = Path(processed_cfg.get("books", "data/processed/books")) summarization_processed = Path( processed_cfg.get("summarization", "data/processed/summarization") ) emotion_processed = Path(processed_cfg.get("emotion", "data/processed/emotion")) topic_processed = Path(processed_cfg.get("topic", "data/processed/topic")) cleaner = BasicTextCleaner() preprocess_books(books_raw, books_processed, cleaner) preprocess_summarization(summarization_raw, summarization_processed) preprocess_emotion(emotion_raw, emotion_processed, cleaner) preprocess_topic(topic_raw, topic_processed, cleaner, val_ratio=args.val_ratio, seed=args.seed) print("Preprocessing complete.") if __name__ == "__main__": main()