data-comparison/data_comparator.py
2025-08-27 08:46:13 +07:00

798 lines
36 KiB
Python

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
def normalize_episode(episode: str) -> str:
"""Normalize episode numbers to handle cases like '54' vs '54.0'"""
if not episode or episode.strip() == '':
return episode
try:
# Convert to float first to handle both int and float formats
episode_float = float(episode.strip())
# If it's a whole number (like 54.0), convert to int format
if episode_float.is_integer():
return str(int(episode_float))
else:
# Keep decimal format for non-whole numbers
return str(episode_float)
except (ValueError, TypeError):
# If conversion fails, return original episode string
return episode.strip()
class ComparisonItem:
"""Represents a single item for comparison"""
language: str
title: str
episode: str
source_sheet: str
row_index: int
def __init__(self, language: str, title: str, episode: str, source_sheet: str, row_index: int):
self.language = language
self.title = title
self.episode = normalize_episode(episode) # Normalize episode on creation
self.source_sheet = source_sheet
self.row_index = row_index
def __hash__(self):
return hash((self.language, self.title, self.episode))
def __eq__(self, other):
if not isinstance(other, ComparisonItem):
return False
return self.language == other.language and self.title == other.title and self.episode == other.episode
class KSTCoordiComparator:
"""
Compare KST and Coordi data to identify mismatches and ensure count reconciliation
"""
def __init__(self, excel_file_path: str):
self.excel_file_path = excel_file_path
self.data = {}
self.kst_items = set()
self.coordi_items = set()
self.comparison_results = {}
def load_data(self) -> bool:
"""Load data from Excel file"""
try:
excel_file = pd.ExcelFile(self.excel_file_path)
for sheet_name in excel_file.sheet_names:
self.data[sheet_name] = pd.read_excel(self.excel_file_path, sheet_name=sheet_name)
return True
except Exception as e:
print(f"Error loading data: {e}")
return False
def extract_kst_coordi_items_for_sheet(self, sheet_name: str) -> Dict[str, Any]:
"""Extract KST and Coordi items from a specific sheet using fixed column positions"""
if sheet_name not in self.data:
raise ValueError(f"Sheet '{sheet_name}' not found in data")
df = self.data[sheet_name]
columns = df.columns.tolist()
kst_items = set()
coordi_items = set()
kst_details = []
coordi_details = []
kst_all_items = [] # Keep all items including duplicates
coordi_all_items = [] # Keep all items including duplicates
# Fixed column positions - NO header name search
# Coordi: B(1), C(2), D(3) = Language, Title, Chapter
# KST: H(7), I(8), J(9) = Language, Title, Chapter
coordi_language_col_idx = 1 # Column B
coordi_title_col_idx = 2 # Column C
coordi_episode_col_idx = 3 # Column D
kst_language_col_idx = 7 # Column H
kst_title_col_idx = 8 # Column I
kst_episode_col_idx = 9 # Column J
# Validate that all required columns exist
required_columns = [
(coordi_language_col_idx, 'Coordi Language (Column B)'),
(coordi_title_col_idx, 'Coordi Title (Column C)'),
(coordi_episode_col_idx, 'Coordi Episode (Column D)'),
(kst_language_col_idx, 'KST Language (Column H)'),
(kst_title_col_idx, 'KST Title (Column I)'),
(kst_episode_col_idx, 'KST Episode (Column J)')
]
missing_columns = []
for col_idx, col_name in required_columns:
if len(columns) <= col_idx:
missing_columns.append(col_name)
if missing_columns:
error_msg = f"Missing required columns in sheet '{sheet_name}':\n" + "\n".join(f" - {col}" for col in missing_columns)
raise ValueError(error_msg)
# Get column names by fixed positions
coordi_language_col = columns[coordi_language_col_idx]
coordi_title_col = columns[coordi_title_col_idx]
coordi_episode_col = columns[coordi_episode_col_idx]
kst_language_col = columns[kst_language_col_idx]
kst_title_col = columns[kst_title_col_idx]
kst_episode_col = columns[kst_episode_col_idx]
# Extract dynamic label from column A (index 0) for flexible naming
coordi_label = "Default" # Default fallback
if len(columns) > 0 and columns[0] and str(columns[0]).strip():
coordi_label = str(columns[0]).strip()
elif len(df) > 0:
# Try to get from first data row if header is empty
first_row_col_a = str(df.iloc[0, 0]).strip() if not pd.isna(df.iloc[0, 0]) else ""
if first_row_col_a and first_row_col_a.lower() not in ['nan', 'none', '']:
coordi_label = first_row_col_a
print(f"Sheet: {sheet_name}")
print(f" KST columns - Language: Column {chr(65 + kst_language_col_idx)} ({kst_language_col}), Title: Column {chr(65 + kst_title_col_idx)} ({kst_title_col}), Episode: Column {chr(65 + kst_episode_col_idx)} ({kst_episode_col})")
print(f" Coordi columns - Language: Column {chr(65 + coordi_language_col_idx)} ({coordi_language_col}), Title: Column {chr(65 + coordi_title_col_idx)} ({coordi_title_col}), Episode: Column {chr(65 + coordi_episode_col_idx)} ({coordi_episode_col})")
# Extract items from each row
for idx, row in df.iterrows():
# Extract KST data from fixed positions H, I, J
kst_language = str(row.get(kst_language_col, '')).strip()
kst_title = str(row.get(kst_title_col, '')).strip()
kst_episode = str(row.get(kst_episode_col, '')).strip()
# Check if this row has valid KST data
has_kst_data = (
kst_language and kst_language != 'nan' and
kst_title and kst_title != 'nan' and
kst_episode and kst_episode != 'nan' and
pd.notna(row[kst_language_col]) and pd.notna(row[kst_title_col]) and pd.notna(row[kst_episode_col])
)
# Validate language is not empty - raise error if found
if pd.notna(row[kst_title_col]) and pd.notna(row[kst_episode_col]): # Only check if this is a real data row
if not kst_language or kst_language == 'nan' or pd.isna(row[kst_language_col]):
raise ValueError(f"Empty language value found in KST data at row {idx + 1} ('{kst_title}' - Episode {kst_episode}): All language fields must be populated")
if has_kst_data:
item = ComparisonItem(kst_language, kst_title, kst_episode, sheet_name, idx)
kst_items.add(item)
kst_all_items.append(item) # Keep all items for duplicate detection
kst_details.append({
'language': kst_language,
'title': kst_title,
'episode': kst_episode,
'sheet': sheet_name,
'row_index': idx,
'kst_data': {
kst_language_col: row[kst_language_col],
kst_title_col: row[kst_title_col],
kst_episode_col: row[kst_episode_col]
}
})
# Extract Coordi data from fixed positions B, C, D
coordi_language = str(row.get(coordi_language_col, '')).strip()
coordi_title = str(row.get(coordi_title_col, '')).strip()
coordi_episode = str(row.get(coordi_episode_col, '')).strip()
# Check if this row has valid Coordi data
has_coordi_data = (
coordi_language and coordi_language != 'nan' and
coordi_title and coordi_title != 'nan' and
coordi_episode and coordi_episode != 'nan' and
pd.notna(row[coordi_language_col]) and pd.notna(row[coordi_title_col]) and pd.notna(row[coordi_episode_col])
)
# Validate language is not empty - raise error if found
if pd.notna(row[coordi_title_col]) and pd.notna(row[coordi_episode_col]): # Only check if this is a real data row
if not coordi_language or coordi_language == 'nan' or pd.isna(row[coordi_language_col]):
raise ValueError(f"Empty language value found in Coordi data at row {idx + 1} ('{coordi_title}' - Episode {coordi_episode}): All language fields must be populated")
if has_coordi_data:
item = ComparisonItem(coordi_language, coordi_title, coordi_episode, sheet_name, idx)
coordi_items.add(item)
coordi_all_items.append(item) # Keep all items for duplicate detection
coordi_details.append({
'language': coordi_language,
'title': coordi_title,
'episode': coordi_episode,
'sheet': sheet_name,
'row_index': idx,
'coordi_data': {
coordi_language_col: row[coordi_language_col],
coordi_title_col: row[coordi_title_col],
coordi_episode_col: row[coordi_episode_col]
}
})
return {
'kst_items': kst_items,
'coordi_items': coordi_items,
'kst_details': kst_details,
'coordi_details': coordi_details,
'kst_all_items': kst_all_items,
'coordi_all_items': coordi_all_items,
'coordi_label': coordi_label # Dynamic label from column A
}
def categorize_mismatches_for_sheet(self, sheet_data: Dict[str, Any]) -> Dict[str, Any]:
"""Categorize data into KST-only, Coordi-only, and matched items for a specific sheet"""
kst_items = sheet_data['kst_items']
coordi_items = sheet_data['coordi_items']
kst_all_items = sheet_data['kst_all_items']
coordi_all_items = sheet_data['coordi_all_items']
# Find duplicates within each dataset first
kst_duplicates = self._find_duplicates_in_list(kst_all_items)
coordi_duplicates = self._find_duplicates_in_list(coordi_all_items)
# Create sets of items that have duplicates (to exclude from "only" lists)
kst_duplicate_keys = {(item.language, item.title, item.episode) for item in kst_duplicates}
coordi_duplicate_keys = {(item.language, item.title, item.episode) for item in coordi_duplicates}
# Find overlaps and differences - exclude items that have duplicates
matched_items = kst_items.intersection(coordi_items)
# For "only" items: exclude those that have duplicates within their own dataset
kst_only_items = {item for item in kst_items - coordi_items
if (item.language, item.title, item.episode) not in kst_duplicate_keys}
coordi_only_items = {item for item in coordi_items - kst_items
if (item.language, item.title, item.episode) not in coordi_duplicate_keys}
categorization = {
'matched_items': list(matched_items),
'kst_only_items': list(kst_only_items),
'coordi_only_items': list(coordi_only_items),
'kst_duplicates': kst_duplicates,
'coordi_duplicates': coordi_duplicates,
'counts': {
'total_kst': len(kst_items),
'total_coordi': len(coordi_items),
'matched': len(matched_items),
'kst_only': len(kst_only_items),
'coordi_only': len(coordi_only_items),
'kst_duplicates_count': len(kst_duplicates),
'coordi_duplicates_count': len(coordi_duplicates)
}
}
# Calculate reconciled counts (after removing mismatches)
reconciled_kst_count = len(matched_items)
reconciled_coordi_count = len(matched_items)
categorization['reconciliation'] = {
'original_kst_count': len(kst_items),
'original_coordi_count': len(coordi_items),
'reconciled_kst_count': reconciled_kst_count,
'reconciled_coordi_count': reconciled_coordi_count,
'counts_match_after_reconciliation': reconciled_kst_count == reconciled_coordi_count,
'items_to_exclude_from_kst': len(kst_only_items) + len(kst_duplicates),
'items_to_exclude_from_coordi': len(coordi_only_items) + len(coordi_duplicates)
}
return categorization
def _find_duplicates_in_list(self, items_list: List[ComparisonItem]) -> List[ComparisonItem]:
"""Find duplicate items within a dataset - FIXED to only return actual duplicates"""
from collections import Counter
# Count occurrences of each (language, title, episode) tuple
key_counts = Counter((item.language, item.title, item.episode) for item in items_list)
# Only return items that appear more than once
duplicates = []
for item in items_list:
key = (item.language, item.title, item.episode)
if key_counts[key] > 1:
duplicates.append(item)
return duplicates
def _find_sheet_specific_mixed_duplicates(self, sheet_data: Dict[str, Any], sheet_filter: str) -> List[Dict]:
"""Find mixed duplicates within a specific sheet only"""
mixed_duplicates = []
kst_sheet_items = sheet_data['kst_all_items']
coordi_sheet_items = sheet_data['coordi_all_items']
# Find duplicates within this sheet
kst_sheet_duplicates = self._find_duplicates_in_list(kst_sheet_items)
coordi_sheet_duplicates = self._find_duplicates_in_list(coordi_sheet_items)
# Create sets for items that exist in both KST and Coordi within this sheet
kst_sheet_set = {(item.title, item.episode) for item in kst_sheet_items}
coordi_sheet_set = {(item.title, item.episode) for item in coordi_sheet_items}
matched_in_sheet = kst_sheet_set.intersection(coordi_sheet_set)
# Create sets of duplicate keys within this sheet
kst_duplicate_keys = {(item.title, item.episode) for item in kst_sheet_duplicates}
coordi_duplicate_keys = {(item.title, item.episode) for item in coordi_sheet_duplicates}
# Count actual instances for each item
from collections import Counter
kst_counts = Counter((item.title, item.episode) for item in kst_sheet_items)
coordi_counts = Counter((item.title, item.episode) for item in coordi_sheet_items)
# Find matched items that also have duplicates within the same sheet
for title, episode in matched_in_sheet:
# Check if this matched item has duplicates in KST within this sheet
if (title, episode) in kst_duplicate_keys:
kst_count = kst_counts[(title, episode)]
mixed_duplicates.append({
'title': title,
'episode': episode,
'sheet': sheet_filter,
'row_index': None, # Could get from items if needed
'reason': f'Item exists in both datasets but has duplicates in KST within {sheet_filter}',
'mismatch_type': 'MIXED_DUPLICATE_KST',
'duplicate_side': 'KST',
'duplicate_count': kst_count
})
# Check if this matched item has duplicates in Coordi within this sheet
if (title, episode) in coordi_duplicate_keys:
coordi_count = coordi_counts[(title, episode)]
mixed_duplicates.append({
'title': title,
'episode': episode,
'sheet': sheet_filter,
'row_index': None, # Could get from items if needed
'reason': f'Item exists in both datasets but has duplicates in Coordi within {sheet_filter}',
'mismatch_type': 'MIXED_DUPLICATE_COORDI',
'duplicate_side': 'COORDI',
'duplicate_count': coordi_count
})
return mixed_duplicates
def generate_mismatch_details_for_sheet(self, categorization: Dict[str, Any], sheet_data: Dict[str, Any], sheet_filter: str) -> Dict[str, List[Dict]]:
"""Generate detailed information about each type of mismatch with reasons for a specific sheet"""
mismatch_details = {
'kst_only': [],
'coordi_only': [],
'kst_duplicates': [],
'coordi_duplicates': [],
'mixed_duplicates': []
}
# KST-only items
for item in categorization['kst_only_items']:
mismatch_details['kst_only'].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Item exists in KST data but not in Coordi data',
'mismatch_type': 'KST_ONLY'
})
# Coordi-only items
for item in categorization['coordi_only_items']:
mismatch_details['coordi_only'].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Item exists in Coordi data but not in KST data',
'mismatch_type': 'COORDI_ONLY'
})
# Find mixed duplicates first (they take priority)
mixed_duplicates = self._find_sheet_specific_mixed_duplicates(sheet_data, sheet_filter)
mismatch_details['mixed_duplicates'] = mixed_duplicates
# Create set of items that are already covered by mixed duplicates
mixed_duplicate_keys = {(item['title'], item['episode']) for item in mixed_duplicates}
# KST duplicates - exclude those already covered by mixed duplicates
for item in categorization['kst_duplicates']:
key = (item.title, item.episode)
if key not in mixed_duplicate_keys:
mismatch_details['kst_duplicates'].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Duplicate entry in KST data',
'mismatch_type': 'KST_DUPLICATE'
})
# Coordi duplicates - exclude those already covered by mixed duplicates
for item in categorization['coordi_duplicates']:
key = (item.title, item.episode)
if key not in mixed_duplicate_keys:
mismatch_details['coordi_duplicates'].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Duplicate entry in Coordi data',
'mismatch_type': 'COORDI_DUPLICATE'
})
return mismatch_details
def get_comparison_summary(self, sheet_filter: str | None = None) -> Dict[str, Any]:
"""Get a comprehensive summary of the comparison for a specific sheet only"""
# Get sheet names for filtering options
sheet_names = list(self.data.keys()) if self.data else []
# If no sheet filter provided, default to first sheet
if not sheet_filter:
sheet_filter = sheet_names[0] if sheet_names else None
if not sheet_filter:
raise ValueError("No sheets available or sheet filter not specified")
# Validate that the requested sheet exists
if sheet_filter not in sheet_names:
raise ValueError(f"Sheet '{sheet_filter}' not found in data. Available sheets: {sheet_names}")
# Extract data for the specific sheet only
sheet_data = self.extract_kst_coordi_items_for_sheet(sheet_filter)
# Categorize mismatches for this sheet
categorization = self.categorize_mismatches_for_sheet(sheet_data)
# Generate mismatch details for this sheet
mismatch_details = self.generate_mismatch_details_for_sheet(categorization, sheet_data, sheet_filter)
# Group data by title for this sheet
grouped_data = self.group_by_title_for_sheet(categorization, sheet_filter)
# Calculate counts
matched_count = len(categorization['matched_items'])
kst_total = len(sheet_data['kst_items'])
coordi_total = len(sheet_data['coordi_items'])
summary = {
'sheet_names': sheet_names,
'current_sheet_filter': sheet_filter,
'original_counts': {
'kst_total': kst_total,
'coordi_total': coordi_total
},
'matched_items_count': matched_count,
'mismatches': {
'kst_only_count': len(mismatch_details['kst_only']),
'coordi_only_count': len(mismatch_details['coordi_only']),
'kst_duplicates_count': len(mismatch_details['kst_duplicates']),
'coordi_duplicates_count': len(mismatch_details['coordi_duplicates']),
'mixed_duplicates_count': len(mismatch_details['mixed_duplicates'])
},
'reconciliation': categorization['reconciliation'],
'mismatch_details': mismatch_details,
'grouped_by_title': grouped_data
}
return summary
def group_by_title_for_sheet(self, categorization: Dict[str, Any], sheet_filter: str) -> Dict[str, Any]:
"""Group mismatches and matches by KR title for a specific sheet"""
from collections import defaultdict
grouped = {
'kst_only_by_title': defaultdict(list),
'coordi_only_by_title': defaultdict(list),
'matched_by_title': defaultdict(list),
'title_summaries': {}
}
# Group KST only items by title
for item in categorization['kst_only_items']:
title = item.title
grouped['kst_only_by_title'][title].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Item exists in KST data but not in Coordi data'
})
# Group Coordi only items by title
for item in categorization['coordi_only_items']:
title = item.title
grouped['coordi_only_by_title'][title].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Item exists in Coordi data but not in KST data'
})
# Group matched items by title
for item in categorization['matched_items']:
title = item.title
grouped['matched_by_title'][title].append({
'language': item.language,
'title': item.title,
'episode': item.episode,
'sheet': item.source_sheet,
'row_index': item.row_index,
'reason': 'Perfect match'
})
# Create summary for each title
all_titles = set()
all_titles.update(grouped['kst_only_by_title'].keys())
all_titles.update(grouped['coordi_only_by_title'].keys())
all_titles.update(grouped['matched_by_title'].keys())
for title in all_titles:
kst_only_count = len(grouped['kst_only_by_title'][title])
coordi_only_count = len(grouped['coordi_only_by_title'][title])
matched_count = len(grouped['matched_by_title'][title])
total_episodes = kst_only_count + coordi_only_count + matched_count
grouped['title_summaries'][title] = {
'total_episodes': total_episodes,
'matched_count': matched_count,
'kst_only_count': kst_only_count,
'coordi_only_count': coordi_only_count,
'match_percentage': round((matched_count / total_episodes * 100) if total_episodes > 0 else 0, 1),
'has_mismatches': kst_only_count > 0 or coordi_only_count > 0
}
# Convert defaultdicts to regular dicts for JSON serialization
grouped['kst_only_by_title'] = dict(grouped['kst_only_by_title'])
grouped['coordi_only_by_title'] = dict(grouped['coordi_only_by_title'])
grouped['matched_by_title'] = dict(grouped['matched_by_title'])
return grouped
def generate_visualize_data(self, sheet_filter: str | None = None) -> List[Dict[str, Any]]:
"""Generate data structure for Excel-like visualization"""
# Get comparison data for the specified sheet
summary = self.get_comparison_summary(sheet_filter)
mismatch_details = summary['mismatch_details']
visualize_rows = []
# Helper function to create a row
def create_row(coordi_language="", coordi_title="", coordi_chapter="", kst_language="", kst_title="", kst_chapter="",
row_type="matched", reason="", title_for_sort=""):
return {
'coordi_language': coordi_language,
'coordi_title': coordi_title,
'coordi_chapter': coordi_chapter,
'kst_language': kst_language,
'kst_title': kst_title,
'kst_chapter': kst_chapter,
'row_type': row_type,
'reason': reason,
'title_for_sort': title_for_sort or coordi_title or kst_title,
'priority': 1 if row_type != 'matched' else 2 # Mismatches first
}
# 1. Handle Coordi-only items
for item in mismatch_details['coordi_only']:
visualize_rows.append(create_row(
coordi_language=item.get('language', ''),
coordi_title=item['title'],
coordi_chapter=item['episode'],
row_type='coordi_only',
reason='Only in Coordi'
))
# 2. Handle KST-only items
for item in mismatch_details['kst_only']:
visualize_rows.append(create_row(
kst_language=item.get('language', ''),
kst_title=item['title'],
kst_chapter=item['episode'],
row_type='kst_only',
reason='Only in KST'
))
# 3. Handle Mixed duplicates (exists in both but duplicated on one side)
mixed_items = {} # Group by language+title+episode
for item in mismatch_details['mixed_duplicates']:
key = f"{item.get('language', '')}_{item['title']}_{item['episode']}"
if key not in mixed_items:
mixed_items[key] = {
'language': item.get('language', ''),
'title': item['title'],
'episode': item['episode'],
'kst_duplicate_count': 0,
'coordi_duplicate_count': 0
}
# Count the actual duplicates for each side
if item['duplicate_side'] == 'KST':
mixed_items[key]['kst_duplicate_count'] = item.get('duplicate_count', 1)
elif item['duplicate_side'] == 'COORDI':
mixed_items[key]['coordi_duplicate_count'] = item.get('duplicate_count', 1)
for key, item in mixed_items.items():
# First row: show it exists in both
visualize_rows.append(create_row(
coordi_language=item['language'],
coordi_title=item['title'],
coordi_chapter=item['episode'],
kst_language=item['language'],
kst_title=item['title'],
kst_chapter=item['episode'],
row_type='mixed_duplicate',
reason='Mixed duplicate'
))
# Additional rows for KST duplicates (count - 1 since first is already shown)
for _ in range(max(0, item['kst_duplicate_count'] - 1)):
visualize_rows.append(create_row(
kst_language=item['language'],
kst_title=item['title'],
kst_chapter=item['episode'],
row_type='mixed_duplicate',
reason='Duplicate in KST',
title_for_sort=item['title']
))
# Additional rows for Coordi duplicates (count - 1 since first is already shown)
for _ in range(max(0, item['coordi_duplicate_count'] - 1)):
visualize_rows.append(create_row(
coordi_language=item['language'],
coordi_title=item['title'],
coordi_chapter=item['episode'],
row_type='mixed_duplicate',
reason='Duplicate in Coordi',
title_for_sort=item['title']
))
# 4. Handle Pure duplicates
for item in mismatch_details['kst_duplicates']:
visualize_rows.append(create_row(
kst_language=item.get('language', ''),
kst_title=item['title'],
kst_chapter=item['episode'],
row_type='pure_duplicate',
reason='Duplicate in KST'
))
for item in mismatch_details['coordi_duplicates']:
visualize_rows.append(create_row(
coordi_language=item.get('language', ''),
coordi_title=item['title'],
coordi_chapter=item['episode'],
row_type='pure_duplicate',
reason='Duplicate in Coordi'
))
# 5. Handle Matched items (perfect matches)
matched_by_title = summary['grouped_by_title']['matched_by_title']
for _, items in matched_by_title.items():
for item in items:
visualize_rows.append(create_row(
coordi_language=item.get('language', ''),
coordi_title=item['title'],
coordi_chapter=item['episode'],
kst_language=item.get('language', ''),
kst_title=item['title'],
kst_chapter=item['episode'],
row_type='matched',
reason='Perfect match'
))
# Sort: Mismatches first (priority 1), then matches (priority 2), then by Korean title + chapter
def sort_key(x):
# Extract episode number for proper numeric sorting
coordi_episode = x.get('coordi_chapter', '') or ''
kst_episode = x.get('kst_chapter', '') or ''
episode = coordi_episode or kst_episode
# Try to convert episode to number for proper sorting, fallback to string
try:
episode_num = float(episode) if episode else 0
except (ValueError, TypeError):
episode_num = 0
return (x['priority'], x['title_for_sort'], episode_num)
visualize_rows.sort(key=sort_key)
return visualize_rows
def get_coordi_label_for_sheet(self, sheet_filter: str | None = None) -> str:
"""Get the dynamic coordi label from column A for a specific sheet"""
if not self.data:
return "Default"
# Use first sheet if no filter specified
sheet_name = sheet_filter if sheet_filter else list(self.data.keys())[0]
if sheet_name not in self.data:
return "Default"
# Extract the sheet data to get the dynamic label
sheet_data = self.extract_kst_coordi_items_for_sheet(sheet_name)
return sheet_data.get('coordi_label', 'Default')
def generate_excel_export_data(self) -> Dict[str, List[Dict[str, Any]]]:
"""Generate data for Excel export with all sheets in visualize format"""
export_data = {}
# Get all sheet names
sheet_names = list(self.data.keys()) if self.data else []
for sheet_name in sheet_names:
# Generate visualize data for each sheet
sheet_visualize_data = self.generate_visualize_data(sheet_name)
# Convert to Excel-friendly format
excel_rows = []
for row in sheet_visualize_data:
excel_rows.append({
'Coordi Language': row.get('coordi_language', ''),
'Coordi Title': row.get('coordi_title', ''),
'Coordi Chapter': row.get('coordi_chapter', ''),
'KST Language': row.get('kst_language', ''),
'KST Title': row.get('kst_title', ''),
'KST Chapter': row.get('kst_chapter', ''),
'Status': row.get('reason', ''),
'Type': row.get('row_type', '').replace('_', ' ').title()
})
export_data[sheet_name] = excel_rows
return export_data
def print_comparison_summary(self, sheet_filter: str | None = None):
"""Print a formatted summary of the comparison for a specific sheet"""
summary = self.get_comparison_summary(sheet_filter)
print("=" * 80)
print(f"KST vs COORDI COMPARISON SUMMARY - Sheet: {summary['current_sheet_filter']}")
print("=" * 80)
print(f"Original Counts:")
print(f" KST Total: {summary['original_counts']['kst_total']}")
print(f" Coordi Total: {summary['original_counts']['coordi_total']}")
print()
print(f"Matched Items: {summary['matched_items_count']}")
print()
print(f"Mismatches:")
print(f" KST Only: {summary['mismatches']['kst_only_count']}")
print(f" Coordi Only: {summary['mismatches']['coordi_only_count']}")
print(f" KST Duplicates: {summary['mismatches']['kst_duplicates_count']}")
print(f" Coordi Duplicates: {summary['mismatches']['coordi_duplicates_count']}")
print(f" Mixed Duplicates: {summary['mismatches']['mixed_duplicates_count']}")
print()
print(f"Reconciliation:")
reconciliation = summary['reconciliation']
print(f" After excluding mismatches:")
print(f" KST Count: {reconciliation['reconciled_kst_count']}")
print(f" Coordi Count: {reconciliation['reconciled_coordi_count']}")
print(f" Counts Match: {reconciliation['counts_match_after_reconciliation']}")
print()
# Show sample mismatches
for mismatch_type, details in summary['mismatch_details'].items():
if details:
print(f"{mismatch_type.upper()} (showing first 3):")
for i, item in enumerate(details[:3]):
language = item.get('language', 'N/A')
print(f" {i+1}. [{language}] {item['title']} - Episode {item['episode']} ({item['reason']})")
if len(details) > 3:
print(f" ... and {len(details) - 3} more")
print()
if __name__ == "__main__":
# Test the comparator
comparator = KSTCoordiComparator("data/sample-data.xlsx")
if comparator.load_data():
print("Data loaded successfully!")
comparator.print_comparison_summary()
else:
print("Failed to load data!")