from __future__ import annotations

import argparse
import re
import unicodedata
from pathlib import Path
from typing import Iterable

import pandas as pd


DEFAULT_GENERIC_POSITIVE_REVIEWS = {
    "好评", "很好", "系统默认好评", "挺好", "不错", "还不错", "可以", "推荐",
    "值得推荐", "满意", "很满意", "总体满意", "整体不错", "体验不错", "体验很好",
    "环境不错", "服务不错", "值得一去", "下次再来", "下次还来", "总体来说不错",
    "整体体验不错", "玩得很开心",
}

DEFAULT_GENERIC_PATTERNS = [
    r"^好评[!！。\.~～]*$",
    r"^非常好[!！。\.~～]*$",
    r"^很好[!！。\.~～]*$",
    r"^不错[!！。\.~～]*$",
    r"^可以[!！。\.~～]*$",
    r"^满意[!！。\.~～]*$",
    r"^推荐[!！。\.~～]*$",
    r"^值得推荐[!！。\.~～]*$",
    r"^下次再来[!！。\.~～]*$",
    r"^下次还来[!！。\.~～]*$",
]

SENTENCE_SPLIT_PATTERN = re.compile(r"[。！？!?；;]\s*|(?<=\.)\s+")
MULTI_SPACE_PATTERN = re.compile(r"\s+")
HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
URL_PATTERN = re.compile(r"https?://\S+|www\.\S+")
REPEATED_PUNCT_PATTERN = re.compile(r"([。！？!?,，；;~～])\1+")
CONTROL_CHAR_PATTERN = re.compile(r"[\u0000-\u001F\u007F-\u009F]")
BRACKET_CONTENT_PATTERN = re.compile(r"\[[^\]]*\]|\([^)]*\)|（[^）]*）")


def load_input_data(file_path: Path) -> pd.DataFrame:
    if not file_path.exists():
        raise FileNotFoundError(f"Input file not found: {file_path}")

    suffix = file_path.suffix.lower()
    if suffix == ".xlsx":
        return pd.read_excel(file_path)
    if suffix == ".csv":
        return pd.read_csv(file_path)
    raise ValueError("Unsupported input file type. Please use .xlsx or .csv.")


def save_output_data(df: pd.DataFrame, file_path: Path) -> None:
    file_path.parent.mkdir(parents=True, exist_ok=True)
    suffix = file_path.suffix.lower()

    if suffix == ".xlsx":
        df.to_excel(file_path, index=False)
    elif suffix == ".csv":
        df.to_csv(file_path, index=False, encoding="utf-8-sig")
    else:
        raise ValueError("Unsupported output file type. Please use .xlsx or .csv.")


def normalize_text(text: str) -> str:
    """
    Basic text normalization for review data.
    """
    if pd.isna(text):
        return ""

    text = str(text)
    text = unicodedata.normalize("NFKC", text)
    text = HTML_TAG_PATTERN.sub(" ", text)
    text = URL_PATTERN.sub(" ", text)
    text = CONTROL_CHAR_PATTERN.sub(" ", text)
    text = text.replace("\r", " ").replace("\n", " ").replace("\t", " ")
    text = REPEATED_PUNCT_PATTERN.sub(r"\1", text)
    text = MULTI_SPACE_PATTERN.sub(" ", text).strip()
    return text


def strip_for_length_check(text: str) -> str:
    text = re.sub(r"\s+", "", text)
    text = re.sub(r"[，。！？!?,；;:：、\"'“”‘’《》〈〉【】\[\]\(\)（）\-—~～…·]", "", text)
    return text


def is_generic_positive_review(text: str) -> bool:
    if not text:
        return True

    compact = text.strip()
    if compact in DEFAULT_GENERIC_POSITIVE_REVIEWS:
        return True

    for pattern in DEFAULT_GENERIC_PATTERNS:
        if re.match(pattern, compact):
            return True

    return False


def split_into_sentences_rule_based(text: str) -> list[str]:
    """
    Transparent rule-based sentence segmentation.

    Note:
    This script uses rule-based punctuation segmentation rather than dependency parsing.
    If the manuscript mentions dependency parsing, the wording in the manuscript should be updated
    to match the actual implementation used for reproducibility.
    """
    if not text:
        return []

    parts = SENTENCE_SPLIT_PATTERN.split(text.strip())
    cleaned_parts = []

    for part in parts:
        part = part.strip()
        if not part:
            continue

        part = BRACKET_CONTENT_PATTERN.sub(lambda m: m.group(0), part).strip()

        if len(part) > 120 and "，" in part:
            subparts = [p.strip() for p in part.split("，") if p.strip()]
            cleaned_parts.extend(subparts)
        else:
            cleaned_parts.append(part)

    return cleaned_parts


def iter_optional_columns(df: pd.DataFrame, preferred_columns: Iterable[str]) -> list[str]:
    return [col for col in preferred_columns if col in df.columns]


def preprocess_reviews(
    df: pd.DataFrame,
    text_column: str,
    review_id_column: str | None = None,
    min_length: int = 10,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    if text_column not in df.columns:
        raise ValueError(f"Column '{text_column}' not found in input data.")

    df = df.copy()

    if review_id_column and review_id_column in df.columns:
        df["review_id"] = df[review_id_column].astype(str)
    else:
        df["review_id"] = [f"review_{i+1}" for i in range(len(df))]

    df["text_original"] = df[text_column].astype(str)
    df["text_normalized"] = df["text_original"].apply(normalize_text)

    optional_columns = iter_optional_columns(
        df,
        preferred_columns=[
            "destination_id", "destination_name", "platform", "review_date",
            "rating", "city", "province",
        ],
    )

    empty_mask = df["text_normalized"].eq("")
    removed_empty = df.loc[empty_mask, ["review_id", "text_original", "text_normalized"] + optional_columns].copy()
    removed_empty["removal_reason"] = "empty_after_normalization"

    working = df.loc[~empty_mask].copy()

    duplicated_mask = working.duplicated(subset=["text_normalized"], keep="first")
    removed_duplicates = working.loc[
        duplicated_mask, ["review_id", "text_original", "text_normalized"] + optional_columns
    ].copy()
    removed_duplicates["removal_reason"] = "duplicate_review"

    working = working.loc[~duplicated_mask].copy()

    generic_mask = working["text_normalized"].apply(is_generic_positive_review)
    removed_generic = working.loc[
        generic_mask, ["review_id", "text_original", "text_normalized"] + optional_columns
    ].copy()
    removed_generic["removal_reason"] = "generic_positive_review"

    working = working.loc[~generic_mask].copy()

    working["length_for_check"] = working["text_normalized"].apply(strip_for_length_check).str.len()
    short_mask = working["length_for_check"] < min_length
    removed_short = working.loc[
        short_mask, ["review_id", "text_original", "text_normalized", "length_for_check"] + optional_columns
    ].copy()
    removed_short["removal_reason"] = f"short_review_under_{min_length}_characters"

    working = working.loc[~short_mask].copy()

    working["segments"] = working["text_normalized"].apply(split_into_sentences_rule_based)

    segment_records = []
    for _, row in working.iterrows():
        segments = row["segments"]
        for idx, seg in enumerate(segments, start=1):
            seg_norm = normalize_text(seg)
            seg_len = len(strip_for_length_check(seg_norm))
            if seg_len == 0:
                continue

            record = {
                "review_id": row["review_id"],
                "segment_id": f"{row['review_id']}_seg_{idx}",
                "segment_order": idx,
                "text_segment": seg_norm,
                "segment_length": seg_len,
                "text_review_cleaned": row["text_normalized"],
            }
            for col in optional_columns:
                record[col] = row[col]
            segment_records.append(record)

    review_segments_df = pd.DataFrame(segment_records)
    if not review_segments_df.empty:
        review_segments_df = review_segments_df.loc[review_segments_df["segment_length"] >= 2].copy()

    cleaned_reviews_df = working[
        ["review_id", "text_original", "text_normalized", "length_for_check"] + optional_columns
    ].copy()
    cleaned_reviews_df = cleaned_reviews_df.rename(columns={"text_normalized": "text_review_cleaned"})

    removed_reviews_df = pd.concat(
        [removed_empty, removed_duplicates, removed_generic, removed_short],
        axis=0,
        ignore_index=True,
    )

    return cleaned_reviews_df, review_segments_df, removed_reviews_df


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Review preprocessing and rule-based sentence segmentation pipeline."
    )
    parser.add_argument("--input_file", type=str, default="data/Data_3_raw_reviews.xlsx")
    parser.add_argument("--text_column", type=str, default="review_text")
    parser.add_argument("--review_id_column", type=str, default=None)
    parser.add_argument("--min_length", type=int, default=10)
    parser.add_argument("--output_cleaned_reviews", type=str, default="outputs/cleaned_reviews.xlsx")
    parser.add_argument("--output_review_segments", type=str, default="outputs/review_segments.xlsx")
    parser.add_argument("--output_removed_reviews", type=str, default="outputs/removed_reviews_log.xlsx")

    args = parser.parse_args()

    df = load_input_data(Path(args.input_file))

    cleaned_reviews_df, review_segments_df, removed_reviews_df = preprocess_reviews(
        df=df,
        text_column=args.text_column,
        review_id_column=args.review_id_column,
        min_length=args.min_length,
    )

    save_output_data(cleaned_reviews_df, Path(args.output_cleaned_reviews))
    save_output_data(review_segments_df, Path(args.output_review_segments))
    save_output_data(removed_reviews_df, Path(args.output_removed_reviews))

    print("Review preprocessing and sentence segmentation completed successfully.")
    print(f"Number of cleaned reviews retained: {len(cleaned_reviews_df)}")
    print(f"Number of review segments retained: {len(review_segments_df)}")
    print(f"Number of reviews removed: {len(removed_reviews_df)}")


if __name__ == "__main__":
    main()
