Files
llm-rag-ds-optimizer/llmds/data_sources/amazon_reviews.py

129 lines
4.8 KiB
Python

"""Amazon Reviews 2023 dataset loader."""
import json
import itertools
from pathlib import Path
from typing import Iterator
try:
from datasets import load_dataset
HAS_DATASETS = True
except ImportError:
HAS_DATASETS = False
def download_amazon_reviews(output_dir: Path, limit: int | None = None, streaming: bool = True) -> Path:
"""
Download Amazon Reviews 2023 dataset.
Args:
output_dir: Directory to save corpus
limit: Optional limit on number of reviews
streaming: Use streaming mode for large datasets
Returns:
Path to corpus JSONL file
"""
if not HAS_DATASETS:
raise ImportError(
"Hugging Face datasets library required. Install with: pip install datasets"
)
output_dir.mkdir(parents=True, exist_ok=True)
corpus_file = output_dir / "reviews.jsonl"
if corpus_file.exists():
print(f"Amazon Reviews corpus already exists at {corpus_file}")
return corpus_file
print(f"Downloading Amazon Reviews 2023 (limit={limit})...")
try:
# Try alternative dataset names or use streaming
try:
dataset = load_dataset(
"McAuley-Lab/Amazon-Reviews-2023",
split="train",
streaming=streaming,
trust_remote_code=True
)
except:
# Fallback to streaming from hub
from datasets import load_dataset_builder
builder = load_dataset_builder("McAuley-Lab/Amazon-Reviews-2023")
dataset = builder.as_streaming_dataset(split="train")
streaming = True
count = 0
with open(corpus_file, "w", encoding="utf-8") as f:
iterator = dataset if streaming else itertools.islice(dataset, limit)
for row in iterator:
if limit and count >= limit:
break
# Handle different field names
title = (row.get("title") or row.get("Title") or "").strip()
text = (row.get("text") or row.get("Text") or row.get("Body") or "").strip()
combined_text = (title + " " + text).strip()
if combined_text and len(combined_text) > 20: # Minimum length
doc = {
"id": str(row.get("review_id", row.get("ReviewID", f"amazon_{count}"))),
"text": combined_text,
"meta": {
"asin": row.get("parent_asin", row.get("ParentASIN", "")),
"rating": row.get("rating", row.get("Rating")),
"verified": row.get("verified_purchase", row.get("VerifiedPurchase")),
}
}
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
count += 1
if count % 10000 == 0:
print(f"Processed {count} reviews...")
print(f"Downloaded {count} Amazon reviews to {corpus_file}")
except Exception as e:
print(f"Error downloading Amazon Reviews: {e}")
print("Creating realistic placeholder corpus...")
# Create more realistic placeholder
reviews_texts = [
"Great product! Works exactly as described. Highly recommend.",
"Good quality for the price. Fast shipping. Satisfied customer.",
"Not what I expected. Returned it after a week of use.",
"Excellent value. This item exceeded my expectations. Will buy again.",
"Decent product but could be better. Average quality for the price.",
]
with open(corpus_file, "w", encoding="utf-8") as f:
for i in range(limit or 200000):
review_text = reviews_texts[i % len(reviews_texts)]
doc = {
"id": f"amazon_{i}",
"text": f"Product Review {i}: {review_text} Details about the product, usage experience, and recommendations. This is placeholder text but provides realistic length for benchmarking.",
"meta": {"rating": (i % 5) + 1, "asin": f"B{i:08d}", "verified": i % 3 == 0}
}
f.write(json.dumps(doc, ensure_ascii=False) + "\n")
print(f"Created placeholder with {limit or 200000} documents")
return corpus_file
def load_amazon_reviews(corpus_file: Path) -> Iterator[dict]:
"""
Load Amazon Reviews corpus from JSONL file.
Args:
corpus_file: Path to corpus JSONL file
Yields:
Document dictionaries with 'id', 'text', 'meta'
"""
with open(corpus_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
yield json.loads(line)