Spaces:
Running
Running
| """ | |
| Data preprocessing script for LexiMind. | |
| Transforms raw datasets into standardized JSONL splits for training. Handles | |
| summarization, emotion classification, topic classification, and book paragraph | |
| extraction with text cleaning. | |
| Author: Oliver Perrin | |
| Date: December 2025 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, Iterable, Iterator, Sequence, Tuple | |
| from sklearn.model_selection import train_test_split | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from src.data.preprocessing import BasicTextCleaner | |
| from src.utils.config import load_yaml | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Preprocess datasets configured for LexiMind") | |
| parser.add_argument( | |
| "--config", | |
| default="configs/data/datasets.yaml", | |
| help="Path to data configuration YAML.", | |
| ) | |
| parser.add_argument( | |
| "--val-ratio", | |
| type=float, | |
| default=0.1, | |
| help="Validation split size for topic dataset when no validation split is present.", | |
| ) | |
| parser.add_argument( | |
| "--seed", type=int, default=17, help="Random seed for deterministic splitting." | |
| ) | |
| return parser.parse_args() | |
| def _resolve_csv(base: Path, filename: str) -> Path | None: | |
| primary = base / filename | |
| if primary.exists(): | |
| return primary | |
| nested = base / "cnn_dailymail" / filename | |
| if nested.exists(): | |
| return nested | |
| return None | |
| def _write_jsonl(records: Iterable[Dict[str, object]], destination: Path) -> None: | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| with destination.open("w", encoding="utf-8") as handle: | |
| for record in records: | |
| handle.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| def _read_jsonl(path: Path) -> Iterator[Dict[str, object]]: | |
| with path.open("r", encoding="utf-8") as handle: | |
| for line in handle: | |
| row = line.strip() | |
| if not row: | |
| continue | |
| yield json.loads(row) | |
| def preprocess_books( | |
| raw_dir: Path, | |
| processed_dir: Path, | |
| cleaner: BasicTextCleaner, | |
| *, | |
| min_tokens: int = 30, | |
| ) -> None: | |
| if not raw_dir.exists(): | |
| print(f"Skipping book preprocessing (missing directory: {raw_dir})") | |
| return | |
| processed_dir.mkdir(parents=True, exist_ok=True) | |
| index: list[Dict[str, object]] = [] | |
| for book_path in sorted(raw_dir.glob("*.txt")): | |
| text = book_path.read_text(encoding="utf-8").lstrip("\ufeff") | |
| normalized = text.replace("\r\n", "\n") | |
| paragraphs = [ | |
| paragraph.strip() for paragraph in normalized.split("\n\n") if paragraph.strip() | |
| ] | |
| records: list[Dict[str, object]] = [] | |
| for paragraph_id, paragraph in enumerate(paragraphs): | |
| cleaned = cleaner.transform([paragraph])[0] | |
| tokens = cleaned.split() | |
| if len(tokens) < min_tokens: | |
| continue | |
| record = { | |
| "book": book_path.stem, | |
| "title": book_path.stem.replace("_", " ").title(), | |
| "paragraph_id": paragraph_id, | |
| "text": paragraph, | |
| "clean_text": cleaned, | |
| "token_count": len(tokens), | |
| "char_count": len(paragraph), | |
| } | |
| records.append(record) | |
| if not records: | |
| print(f"No suitably sized paragraphs found in {book_path}; skipping.") | |
| continue | |
| output_path = processed_dir / f"{book_path.stem}.jsonl" | |
| print(f"Writing book segments for '{book_path.stem}' to {output_path}") | |
| _write_jsonl(records, output_path) | |
| index.append( | |
| { | |
| "book": book_path.stem, | |
| "title": records[0]["title"], | |
| "paragraphs": len(records), | |
| "source": str(book_path), | |
| "output": str(output_path), | |
| } | |
| ) | |
| if index: | |
| index_path = processed_dir / "index.json" | |
| with index_path.open("w", encoding="utf-8") as handle: | |
| json.dump(index, handle, ensure_ascii=False, indent=2) | |
| print(f"Book index written to {index_path}") | |
| def preprocess_summarization(raw_dir: Path, processed_dir: Path) -> None: | |
| if not raw_dir.exists(): | |
| print(f"Skipping summarization preprocessing (missing directory: {raw_dir})") | |
| return | |
| for split in ("train", "validation", "test"): | |
| # Check for JSONL first (from new download script), then CSV (legacy) | |
| jsonl_path = raw_dir / f"{split}.jsonl" | |
| csv_path = _resolve_csv(raw_dir, f"{split}.csv") | |
| if jsonl_path.exists(): | |
| source_path = jsonl_path | |
| is_jsonl = True | |
| elif csv_path is not None: | |
| source_path = csv_path | |
| is_jsonl = False | |
| else: | |
| print(f"Skipping summarization split '{split}' (file not found)") | |
| continue | |
| output_path = processed_dir / f"{split}.jsonl" | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| print(f"Writing summarization split '{split}' to {output_path}") | |
| with output_path.open("w", encoding="utf-8") as sink: | |
| if is_jsonl: | |
| # Process JSONL format (from new download script) | |
| for row in _read_jsonl(source_path): | |
| source = str(row.get("source") or row.get("article") or "") | |
| summary = str(row.get("summary") or row.get("highlights") or "") | |
| if source and summary: | |
| payload = {"source": source.strip(), "summary": summary.strip()} | |
| sink.write(json.dumps(payload, ensure_ascii=False) + "\n") | |
| else: | |
| # Process CSV format (legacy) | |
| with source_path.open("r", encoding="utf-8", newline="") as source_handle: | |
| reader = csv.DictReader(source_handle) | |
| for row in reader: | |
| article = str(row.get("article") or row.get("Article") or "") | |
| highlights = str(row.get("highlights") or row.get("summary") or "") | |
| payload = {"source": article.strip(), "summary": highlights.strip()} | |
| sink.write(json.dumps(payload, ensure_ascii=False) + "\n") | |
| def preprocess_emotion(raw_dir: Path, processed_dir: Path, cleaner: BasicTextCleaner) -> None: | |
| if not raw_dir.exists(): | |
| print(f"Skipping emotion preprocessing (missing directory: {raw_dir})") | |
| return | |
| split_aliases: Dict[str, Sequence[str]] = { | |
| "train": ("train",), | |
| "val": ("val", "validation"), | |
| "test": ("test",), | |
| } | |
| for split, aliases in split_aliases.items(): | |
| source_path: Path | None = None | |
| for alias in aliases: | |
| for extension in ("jsonl", "txt", "csv"): | |
| candidate = raw_dir / f"{alias}.{extension}" | |
| if candidate.exists(): | |
| source_path = candidate | |
| break | |
| if source_path is not None: | |
| break | |
| if source_path is None: | |
| print(f"Skipping emotion split '{split}' (file not found)") | |
| continue | |
| assert source_path is not None | |
| path = source_path | |
| def iter_records(path: Path = path) -> Iterator[Dict[str, object]]: | |
| if path.suffix == ".jsonl": | |
| for row in _read_jsonl(path): | |
| raw_text = str(row.get("text", "")) | |
| text = cleaner.transform([raw_text])[0] | |
| labels = row.get("emotions") or row.get("labels") or [] | |
| if isinstance(labels, str): | |
| labels = [label.strip() for label in labels.split(",") if label.strip()] | |
| elif isinstance(labels, Sequence): | |
| labels = [str(label) for label in labels] | |
| else: | |
| labels = [str(labels)] if labels else [] | |
| if not labels: | |
| labels = ["neutral"] | |
| yield {"text": text, "emotions": labels} | |
| else: | |
| delimiter = ";" if path.suffix == ".txt" else "," | |
| with path.open("r", encoding="utf-8", newline="") as handle: | |
| reader = csv.reader(handle, delimiter=delimiter) | |
| for csv_row in reader: | |
| if not csv_row: | |
| continue | |
| raw_text = str(csv_row[0]) | |
| text = cleaner.transform([raw_text])[0] | |
| raw_labels = csv_row[1] if len(csv_row) > 1 else "" | |
| labels = [label.strip() for label in raw_labels.split(",") if label.strip()] | |
| if not labels: | |
| labels = ["neutral"] | |
| yield {"text": text, "emotions": labels} | |
| output_path = processed_dir / f"{split}.jsonl" | |
| print(f"Writing emotion split '{split}' to {output_path}") | |
| _write_jsonl(iter_records(), output_path) | |
| def preprocess_topic( | |
| raw_dir: Path, | |
| processed_dir: Path, | |
| cleaner: BasicTextCleaner, | |
| val_ratio: float, | |
| seed: int, | |
| ) -> None: | |
| if not raw_dir.exists(): | |
| print(f"Skipping topic preprocessing (missing directory: {raw_dir})") | |
| return | |
| def locate(*names: str) -> Path | None: | |
| for name in names: | |
| candidate = raw_dir / name | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| train_path = locate("train.jsonl", "train.csv") | |
| if train_path is None: | |
| print(f"Skipping topic preprocessing (missing train split in {raw_dir})") | |
| return | |
| assert train_path is not None | |
| def load_topic_rows(path: Path) -> list[Tuple[str, str]]: | |
| rows: list[Tuple[str, str]] = [] | |
| if path.suffix == ".jsonl": | |
| for record in _read_jsonl(path): | |
| text = str(record.get("text") or record.get("content") or "") | |
| topic = record.get("topic") or record.get("label") | |
| cleaned_text = cleaner.transform([text])[0] | |
| rows.append((cleaned_text, str(topic).strip())) | |
| else: | |
| with path.open("r", encoding="utf-8", newline="") as handle: | |
| reader = csv.DictReader(handle) | |
| for row in reader: | |
| topic = row.get("Class Index") or row.get("topic") or row.get("label") | |
| title = str(row.get("Title") or "") | |
| description = str(row.get("Description") or row.get("text") or "") | |
| text = " ".join(filter(None, (title, description))) | |
| cleaned_text = cleaner.transform([text])[0] | |
| rows.append((cleaned_text, str(topic).strip())) | |
| return rows | |
| train_rows = load_topic_rows(train_path) | |
| if not train_rows: | |
| print("No topic training rows found; skipping topic preprocessing.") | |
| return | |
| texts = [row[0] for row in train_rows] | |
| topics = [row[1] for row in train_rows] | |
| validation_path = locate("val.jsonl", "validation.jsonl", "val.csv", "validation.csv") | |
| has_validation = validation_path is not None | |
| if has_validation and validation_path: | |
| val_rows = load_topic_rows(validation_path) | |
| train_records = train_rows | |
| else: | |
| train_texts, val_texts, train_topics, val_topics = train_test_split( | |
| texts, | |
| topics, | |
| test_size=val_ratio, | |
| random_state=seed, | |
| stratify=topics, | |
| ) | |
| train_records = list(zip(train_texts, train_topics, strict=False)) | |
| val_rows = list(zip(val_texts, val_topics, strict=False)) | |
| def to_records(pairs: Sequence[Tuple[str, str]]) -> Iterator[Dict[str, object]]: | |
| for text, topic in pairs: | |
| yield {"text": text, "topic": topic} | |
| print(f"Writing topic train split to {processed_dir / 'train.jsonl'}") | |
| _write_jsonl(to_records(train_records), processed_dir / "train.jsonl") | |
| print(f"Writing topic val split to {processed_dir / 'val.jsonl'}") | |
| _write_jsonl(to_records(val_rows), processed_dir / "val.jsonl") | |
| test_path = locate("test.jsonl", "test.csv") | |
| if test_path is not None: | |
| test_rows = load_topic_rows(test_path) | |
| print(f"Writing topic test split to {processed_dir / 'test.jsonl'}") | |
| _write_jsonl(to_records(test_rows), processed_dir / "test.jsonl") | |
| else: | |
| print(f"Skipping topic test split (missing test split in {raw_dir})") | |
| def main() -> None: | |
| args = parse_args() | |
| config = load_yaml(args.config).data | |
| raw_cfg = config.get("raw", {}) | |
| processed_cfg = config.get("processed", {}) | |
| books_raw = Path(raw_cfg.get("books", "data/raw/books")) | |
| summarization_raw = Path(raw_cfg.get("summarization", "data/raw/summarization")) | |
| emotion_raw = Path(raw_cfg.get("emotion", "data/raw/emotion")) | |
| topic_raw = Path(raw_cfg.get("topic", "data/raw/topic")) | |
| books_processed = Path(processed_cfg.get("books", "data/processed/books")) | |
| summarization_processed = Path( | |
| processed_cfg.get("summarization", "data/processed/summarization") | |
| ) | |
| emotion_processed = Path(processed_cfg.get("emotion", "data/processed/emotion")) | |
| topic_processed = Path(processed_cfg.get("topic", "data/processed/topic")) | |
| cleaner = BasicTextCleaner() | |
| preprocess_books(books_raw, books_processed, cleaner) | |
| preprocess_summarization(summarization_raw, summarization_processed) | |
| preprocess_emotion(emotion_raw, emotion_processed, cleaner) | |
| preprocess_topic(topic_raw, topic_processed, cleaner, val_ratio=args.val_ratio, seed=args.seed) | |
| print("Preprocessing complete.") | |
| if __name__ == "__main__": | |
| main() | |