File size: 13,936 Bytes
590a604
 
 
 
 
 
 
 
 
 
ee1a8a3
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a18e93d
 
 
 
 
 
 
 
 
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a18e93d
 
 
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6bae907
 
 
 
 
 
 
 
 
 
 
1fbc47b
 
 
 
 
 
6bae907
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a18e93d
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a18e93d
 
1fbc47b
a18e93d
1fbc47b
a18e93d
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60f8a12
 
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a18e93d
 
 
1fbc47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""
Data preprocessing script for LexiMind.

Transforms raw datasets into standardized JSONL splits for training. Handles
summarization, emotion classification, topic classification, and book paragraph
extraction with text cleaning.

Author: Oliver Perrin
Date: December 2025
"""

from __future__ import annotations

import argparse
import csv
import json
import sys
from pathlib import Path
from typing import Dict, Iterable, Iterator, Sequence, Tuple

from sklearn.model_selection import train_test_split

PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from src.data.preprocessing import BasicTextCleaner
from src.utils.config import load_yaml


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Preprocess datasets configured for LexiMind")
    parser.add_argument(
        "--config",
        default="configs/data/datasets.yaml",
        help="Path to data configuration YAML.",
    )
    parser.add_argument(
        "--val-ratio",
        type=float,
        default=0.1,
        help="Validation split size for topic dataset when no validation split is present.",
    )
    parser.add_argument(
        "--seed", type=int, default=17, help="Random seed for deterministic splitting."
    )
    return parser.parse_args()


def _resolve_csv(base: Path, filename: str) -> Path | None:
    primary = base / filename
    if primary.exists():
        return primary
    nested = base / "cnn_dailymail" / filename
    if nested.exists():
        return nested
    return None


def _write_jsonl(records: Iterable[Dict[str, object]], destination: Path) -> None:
    destination.parent.mkdir(parents=True, exist_ok=True)
    with destination.open("w", encoding="utf-8") as handle:
        for record in records:
            handle.write(json.dumps(record, ensure_ascii=False) + "\n")


def _read_jsonl(path: Path) -> Iterator[Dict[str, object]]:
    with path.open("r", encoding="utf-8") as handle:
        for line in handle:
            row = line.strip()
            if not row:
                continue
            yield json.loads(row)


def preprocess_books(
    raw_dir: Path,
    processed_dir: Path,
    cleaner: BasicTextCleaner,
    *,
    min_tokens: int = 30,
) -> None:
    if not raw_dir.exists():
        print(f"Skipping book preprocessing (missing directory: {raw_dir})")
        return

    processed_dir.mkdir(parents=True, exist_ok=True)
    index: list[Dict[str, object]] = []

    for book_path in sorted(raw_dir.glob("*.txt")):
        text = book_path.read_text(encoding="utf-8").lstrip("\ufeff")
        normalized = text.replace("\r\n", "\n")
        paragraphs = [
            paragraph.strip() for paragraph in normalized.split("\n\n") if paragraph.strip()
        ]

        records: list[Dict[str, object]] = []
        for paragraph_id, paragraph in enumerate(paragraphs):
            cleaned = cleaner.transform([paragraph])[0]
            tokens = cleaned.split()
            if len(tokens) < min_tokens:
                continue
            record = {
                "book": book_path.stem,
                "title": book_path.stem.replace("_", " ").title(),
                "paragraph_id": paragraph_id,
                "text": paragraph,
                "clean_text": cleaned,
                "token_count": len(tokens),
                "char_count": len(paragraph),
            }
            records.append(record)

        if not records:
            print(f"No suitably sized paragraphs found in {book_path}; skipping.")
            continue

        output_path = processed_dir / f"{book_path.stem}.jsonl"
        print(f"Writing book segments for '{book_path.stem}' to {output_path}")
        _write_jsonl(records, output_path)
        index.append(
            {
                "book": book_path.stem,
                "title": records[0]["title"],
                "paragraphs": len(records),
                "source": str(book_path),
                "output": str(output_path),
            }
        )

    if index:
        index_path = processed_dir / "index.json"
        with index_path.open("w", encoding="utf-8") as handle:
            json.dump(index, handle, ensure_ascii=False, indent=2)
        print(f"Book index written to {index_path}")


def preprocess_summarization(raw_dir: Path, processed_dir: Path) -> None:
    if not raw_dir.exists():
        print(f"Skipping summarization preprocessing (missing directory: {raw_dir})")
        return

    for split in ("train", "validation", "test"):
        # Check for JSONL first (from new download script), then CSV (legacy)
        jsonl_path = raw_dir / f"{split}.jsonl"
        csv_path = _resolve_csv(raw_dir, f"{split}.csv")

        if jsonl_path.exists():
            source_path = jsonl_path
            is_jsonl = True
        elif csv_path is not None:
            source_path = csv_path
            is_jsonl = False
        else:
            print(f"Skipping summarization split '{split}' (file not found)")
            continue

        output_path = processed_dir / f"{split}.jsonl"
        output_path.parent.mkdir(parents=True, exist_ok=True)
        print(f"Writing summarization split '{split}' to {output_path}")

        with output_path.open("w", encoding="utf-8") as sink:
            if is_jsonl:
                # Process JSONL format (from new download script)
                for row in _read_jsonl(source_path):
                    source = str(row.get("source") or row.get("article") or "")
                    summary = str(row.get("summary") or row.get("highlights") or "")
                    if source and summary:
                        payload = {"source": source.strip(), "summary": summary.strip()}
                        sink.write(json.dumps(payload, ensure_ascii=False) + "\n")
            else:
                # Process CSV format (legacy)
                with source_path.open("r", encoding="utf-8", newline="") as source_handle:
                    reader = csv.DictReader(source_handle)
                    for row in reader:
                        article = str(row.get("article") or row.get("Article") or "")
                        highlights = str(row.get("highlights") or row.get("summary") or "")
                        payload = {"source": article.strip(), "summary": highlights.strip()}
                        sink.write(json.dumps(payload, ensure_ascii=False) + "\n")


def preprocess_emotion(raw_dir: Path, processed_dir: Path, cleaner: BasicTextCleaner) -> None:
    if not raw_dir.exists():
        print(f"Skipping emotion preprocessing (missing directory: {raw_dir})")
        return

    split_aliases: Dict[str, Sequence[str]] = {
        "train": ("train",),
        "val": ("val", "validation"),
        "test": ("test",),
    }

    for split, aliases in split_aliases.items():
        source_path: Path | None = None
        for alias in aliases:
            for extension in ("jsonl", "txt", "csv"):
                candidate = raw_dir / f"{alias}.{extension}"
                if candidate.exists():
                    source_path = candidate
                    break
            if source_path is not None:
                break
        if source_path is None:
            print(f"Skipping emotion split '{split}' (file not found)")
            continue

        assert source_path is not None
        path = source_path

        def iter_records(path: Path = path) -> Iterator[Dict[str, object]]:
            if path.suffix == ".jsonl":
                for row in _read_jsonl(path):
                    raw_text = str(row.get("text", ""))
                    text = cleaner.transform([raw_text])[0]
                    labels = row.get("emotions") or row.get("labels") or []
                    if isinstance(labels, str):
                        labels = [label.strip() for label in labels.split(",") if label.strip()]
                    elif isinstance(labels, Sequence):
                        labels = [str(label) for label in labels]
                    else:
                        labels = [str(labels)] if labels else []
                    if not labels:
                        labels = ["neutral"]
                    yield {"text": text, "emotions": labels}
            else:
                delimiter = ";" if path.suffix == ".txt" else ","
                with path.open("r", encoding="utf-8", newline="") as handle:
                    reader = csv.reader(handle, delimiter=delimiter)
                    for csv_row in reader:
                        if not csv_row:
                            continue
                        raw_text = str(csv_row[0])
                        text = cleaner.transform([raw_text])[0]
                        raw_labels = csv_row[1] if len(csv_row) > 1 else ""
                        labels = [label.strip() for label in raw_labels.split(",") if label.strip()]
                        if not labels:
                            labels = ["neutral"]
                        yield {"text": text, "emotions": labels}

        output_path = processed_dir / f"{split}.jsonl"
        print(f"Writing emotion split '{split}' to {output_path}")
        _write_jsonl(iter_records(), output_path)


def preprocess_topic(
    raw_dir: Path,
    processed_dir: Path,
    cleaner: BasicTextCleaner,
    val_ratio: float,
    seed: int,
) -> None:
    if not raw_dir.exists():
        print(f"Skipping topic preprocessing (missing directory: {raw_dir})")
        return

    def locate(*names: str) -> Path | None:
        for name in names:
            candidate = raw_dir / name
            if candidate.exists():
                return candidate
        return None

    train_path = locate("train.jsonl", "train.csv")
    if train_path is None:
        print(f"Skipping topic preprocessing (missing train split in {raw_dir})")
        return

    assert train_path is not None

    def load_topic_rows(path: Path) -> list[Tuple[str, str]]:
        rows: list[Tuple[str, str]] = []
        if path.suffix == ".jsonl":
            for record in _read_jsonl(path):
                text = str(record.get("text") or record.get("content") or "")
                topic = record.get("topic") or record.get("label")
                cleaned_text = cleaner.transform([text])[0]
                rows.append((cleaned_text, str(topic).strip()))
        else:
            with path.open("r", encoding="utf-8", newline="") as handle:
                reader = csv.DictReader(handle)
                for row in reader:
                    topic = row.get("Class Index") or row.get("topic") or row.get("label")
                    title = str(row.get("Title") or "")
                    description = str(row.get("Description") or row.get("text") or "")
                    text = " ".join(filter(None, (title, description)))
                    cleaned_text = cleaner.transform([text])[0]
                    rows.append((cleaned_text, str(topic).strip()))
        return rows

    train_rows = load_topic_rows(train_path)
    if not train_rows:
        print("No topic training rows found; skipping topic preprocessing.")
        return

    texts = [row[0] for row in train_rows]
    topics = [row[1] for row in train_rows]

    validation_path = locate("val.jsonl", "validation.jsonl", "val.csv", "validation.csv")
    has_validation = validation_path is not None

    if has_validation and validation_path:
        val_rows = load_topic_rows(validation_path)
        train_records = train_rows
    else:
        train_texts, val_texts, train_topics, val_topics = train_test_split(
            texts,
            topics,
            test_size=val_ratio,
            random_state=seed,
            stratify=topics,
        )
        train_records = list(zip(train_texts, train_topics, strict=False))
        val_rows = list(zip(val_texts, val_topics, strict=False))

    def to_records(pairs: Sequence[Tuple[str, str]]) -> Iterator[Dict[str, object]]:
        for text, topic in pairs:
            yield {"text": text, "topic": topic}

    print(f"Writing topic train split to {processed_dir / 'train.jsonl'}")
    _write_jsonl(to_records(train_records), processed_dir / "train.jsonl")
    print(f"Writing topic val split to {processed_dir / 'val.jsonl'}")
    _write_jsonl(to_records(val_rows), processed_dir / "val.jsonl")

    test_path = locate("test.jsonl", "test.csv")
    if test_path is not None:
        test_rows = load_topic_rows(test_path)
        print(f"Writing topic test split to {processed_dir / 'test.jsonl'}")
        _write_jsonl(to_records(test_rows), processed_dir / "test.jsonl")
    else:
        print(f"Skipping topic test split (missing test split in {raw_dir})")


def main() -> None:
    args = parse_args()
    config = load_yaml(args.config).data

    raw_cfg = config.get("raw", {})
    processed_cfg = config.get("processed", {})

    books_raw = Path(raw_cfg.get("books", "data/raw/books"))
    summarization_raw = Path(raw_cfg.get("summarization", "data/raw/summarization"))
    emotion_raw = Path(raw_cfg.get("emotion", "data/raw/emotion"))
    topic_raw = Path(raw_cfg.get("topic", "data/raw/topic"))

    books_processed = Path(processed_cfg.get("books", "data/processed/books"))
    summarization_processed = Path(
        processed_cfg.get("summarization", "data/processed/summarization")
    )
    emotion_processed = Path(processed_cfg.get("emotion", "data/processed/emotion"))
    topic_processed = Path(processed_cfg.get("topic", "data/processed/topic"))

    cleaner = BasicTextCleaner()

    preprocess_books(books_raw, books_processed, cleaner)
    preprocess_summarization(summarization_raw, summarization_processed)
    preprocess_emotion(emotion_raw, emotion_processed, cleaner)
    preprocess_topic(topic_raw, topic_processed, cleaner, val_ratio=args.val_ratio, seed=args.seed)

    print("Preprocessing complete.")


if __name__ == "__main__":
    main()