import pandas as pd import numpy as np from typing import Dict, List, Tuple, Any from dataclasses import dataclass def normalize_episode(episode: str) -> str: """Normalize episode numbers to handle cases like '54' vs '54.0'""" if not episode or episode.strip() == '': return episode try: # Convert to float first to handle both int and float formats episode_float = float(episode.strip()) # If it's a whole number (like 54.0), convert to int format if episode_float.is_integer(): return str(int(episode_float)) else: # Keep decimal format for non-whole numbers return str(episode_float) except (ValueError, TypeError): # If conversion fails, return original episode string return episode.strip() class ComparisonItem: """Represents a single item for comparison""" title: str episode: str source_sheet: str row_index: int def __init__(self, title: str, episode: str, source_sheet: str, row_index: int): self.title = title self.episode = normalize_episode(episode) # Normalize episode on creation self.source_sheet = source_sheet self.row_index = row_index def __hash__(self): return hash((self.title, self.episode)) def __eq__(self, other): if not isinstance(other, ComparisonItem): return False return self.title == other.title and self.episode == other.episode class KSTCoordiComparator: """ Compare KST and Coordi data to identify mismatches and ensure count reconciliation """ def __init__(self, excel_file_path: str): self.excel_file_path = excel_file_path self.data = {} self.kst_items = set() self.coordi_items = set() self.comparison_results = {} def load_data(self) -> bool: """Load data from Excel file""" try: excel_file = pd.ExcelFile(self.excel_file_path) for sheet_name in excel_file.sheet_names: self.data[sheet_name] = pd.read_excel(self.excel_file_path, sheet_name=sheet_name) return True except Exception as e: print(f"Error loading data: {e}") return False def extract_kst_coordi_items_for_sheet(self, sheet_name: str) -> Dict[str, Any]: """Extract KST and Coordi items from a specific sheet using fixed column positions""" if sheet_name not in self.data: raise ValueError(f"Sheet '{sheet_name}' not found in data") df = self.data[sheet_name] columns = df.columns.tolist() kst_items = set() coordi_items = set() kst_details = [] coordi_details = [] kst_all_items = [] # Keep all items including duplicates coordi_all_items = [] # Keep all items including duplicates # Try fixed column positions first, then fall back to header names # KST columns: I (index 8) for title, J (index 9) for chapter # Coordi columns: C (index 2) for title, D (index 3) for chapter kst_title_col_idx = 8 # Column I kst_episode_col_idx = 9 # Column J coordi_title_col_idx = 2 # Column C coordi_episode_col_idx = 3 # Column D # Get column names by index (if they exist) kst_title_col = columns[kst_title_col_idx] if len(columns) > kst_title_col_idx else None kst_episode_col = columns[kst_episode_col_idx] if len(columns) > kst_episode_col_idx else None coordi_title_col = columns[coordi_title_col_idx] if len(columns) > coordi_title_col_idx else None coordi_episode_col = columns[coordi_episode_col_idx] if len(columns) > coordi_episode_col_idx else None # Fallback: search by header names if fixed positions don't work if not kst_title_col or not kst_episode_col: for i, col in enumerate(columns): if col == 'Title KR': kst_title_col = col kst_title_col_idx = i elif col == 'Epi.': kst_episode_col = col kst_episode_col_idx = i if not coordi_title_col or not coordi_episode_col: for i, col in enumerate(columns): if col == 'KR title': coordi_title_col = col coordi_title_col_idx = i elif col == 'Chap': coordi_episode_col = col coordi_episode_col_idx = i print(f"Sheet: {sheet_name}") print(f" KST columns - Title: Column {chr(65 + kst_title_col_idx) if kst_title_col else 'None'} ({kst_title_col}), Episode: Column {chr(65 + kst_episode_col_idx) if kst_episode_col else 'None'} ({kst_episode_col})") print(f" Coordi columns - Title: Column {chr(65 + coordi_title_col_idx) if coordi_title_col else 'None'} ({coordi_title_col}), Episode: Column {chr(65 + coordi_episode_col_idx) if coordi_episode_col else 'None'} ({coordi_episode_col})") # Extract items from each row for idx, row in df.iterrows(): # Extract KST data if kst_title_col and kst_episode_col: kst_title = str(row.get(kst_title_col, '')).strip() kst_episode = str(row.get(kst_episode_col, '')).strip() # Check if this row has valid KST data has_kst_data = ( kst_title and kst_title != 'nan' and kst_episode and kst_episode != 'nan' and pd.notna(row[kst_title_col]) and pd.notna(row[kst_episode_col]) ) if has_kst_data: item = ComparisonItem(kst_title, kst_episode, sheet_name, idx) kst_items.add(item) kst_all_items.append(item) # Keep all items for duplicate detection kst_details.append({ 'title': kst_title, 'episode': kst_episode, 'sheet': sheet_name, 'row_index': idx, 'kst_data': { kst_title_col: row[kst_title_col], kst_episode_col: row[kst_episode_col] } }) # Extract Coordi data if coordi_title_col and coordi_episode_col: coordi_title = str(row.get(coordi_title_col, '')).strip() coordi_episode = str(row.get(coordi_episode_col, '')).strip() # Check if this row has valid Coordi data has_coordi_data = ( coordi_title and coordi_title != 'nan' and coordi_episode and coordi_episode != 'nan' and pd.notna(row[coordi_title_col]) and pd.notna(row[coordi_episode_col]) ) if has_coordi_data: item = ComparisonItem(coordi_title, coordi_episode, sheet_name, idx) coordi_items.add(item) coordi_all_items.append(item) # Keep all items for duplicate detection coordi_details.append({ 'title': coordi_title, 'episode': coordi_episode, 'sheet': sheet_name, 'row_index': idx, 'coordi_data': { coordi_title_col: row[coordi_title_col], coordi_episode_col: row[coordi_episode_col] } }) return { 'kst_items': kst_items, 'coordi_items': coordi_items, 'kst_details': kst_details, 'coordi_details': coordi_details, 'kst_all_items': kst_all_items, 'coordi_all_items': coordi_all_items } def categorize_mismatches_for_sheet(self, sheet_data: Dict[str, Any]) -> Dict[str, Any]: """Categorize data into KST-only, Coordi-only, and matched items for a specific sheet""" kst_items = sheet_data['kst_items'] coordi_items = sheet_data['coordi_items'] kst_all_items = sheet_data['kst_all_items'] coordi_all_items = sheet_data['coordi_all_items'] # Find overlaps and differences matched_items = kst_items.intersection(coordi_items) kst_only_items = kst_items - coordi_items coordi_only_items = coordi_items - kst_items # Find duplicates within each dataset - FIXED LOGIC kst_duplicates = self._find_duplicates_in_list(kst_all_items) coordi_duplicates = self._find_duplicates_in_list(coordi_all_items) categorization = { 'matched_items': list(matched_items), 'kst_only_items': list(kst_only_items), 'coordi_only_items': list(coordi_only_items), 'kst_duplicates': kst_duplicates, 'coordi_duplicates': coordi_duplicates, 'counts': { 'total_kst': len(kst_items), 'total_coordi': len(coordi_items), 'matched': len(matched_items), 'kst_only': len(kst_only_items), 'coordi_only': len(coordi_only_items), 'kst_duplicates_count': len(kst_duplicates), 'coordi_duplicates_count': len(coordi_duplicates) } } # Calculate reconciled counts (after removing mismatches) reconciled_kst_count = len(matched_items) reconciled_coordi_count = len(matched_items) categorization['reconciliation'] = { 'original_kst_count': len(kst_items), 'original_coordi_count': len(coordi_items), 'reconciled_kst_count': reconciled_kst_count, 'reconciled_coordi_count': reconciled_coordi_count, 'counts_match_after_reconciliation': reconciled_kst_count == reconciled_coordi_count, 'items_to_exclude_from_kst': len(kst_only_items) + len(kst_duplicates), 'items_to_exclude_from_coordi': len(coordi_only_items) + len(coordi_duplicates) } return categorization def _find_duplicates_in_list(self, items_list: List[ComparisonItem]) -> List[ComparisonItem]: """Find duplicate items within a dataset - FIXED to only return actual duplicates""" from collections import Counter # Count occurrences of each (title, episode) pair key_counts = Counter((item.title, item.episode) for item in items_list) # Only return items that appear more than once duplicates = [] for item in items_list: key = (item.title, item.episode) if key_counts[key] > 1: duplicates.append(item) return duplicates def _find_sheet_specific_mixed_duplicates(self, sheet_data: Dict[str, Any], sheet_filter: str) -> List[Dict]: """Find mixed duplicates within a specific sheet only""" mixed_duplicates = [] kst_sheet_items = sheet_data['kst_all_items'] coordi_sheet_items = sheet_data['coordi_all_items'] # Find duplicates within this sheet kst_sheet_duplicates = self._find_duplicates_in_list(kst_sheet_items) coordi_sheet_duplicates = self._find_duplicates_in_list(coordi_sheet_items) # Create sets for items that exist in both KST and Coordi within this sheet kst_sheet_set = {(item.title, item.episode) for item in kst_sheet_items} coordi_sheet_set = {(item.title, item.episode) for item in coordi_sheet_items} matched_in_sheet = kst_sheet_set.intersection(coordi_sheet_set) # Create sets of duplicate keys within this sheet kst_duplicate_keys = {(item.title, item.episode) for item in kst_sheet_duplicates} coordi_duplicate_keys = {(item.title, item.episode) for item in coordi_sheet_duplicates} # Find matched items that also have duplicates within the same sheet for title, episode in matched_in_sheet: # Check if this matched item has duplicates in KST within this sheet if (title, episode) in kst_duplicate_keys: mixed_duplicates.append({ 'title': title, 'episode': episode, 'sheet': sheet_filter, 'row_index': None, # Could get from items if needed 'reason': f'Item exists in both datasets but has duplicates in KST within {sheet_filter}', 'mismatch_type': 'MIXED_DUPLICATE_KST', 'duplicate_side': 'KST' }) # Check if this matched item has duplicates in Coordi within this sheet if (title, episode) in coordi_duplicate_keys: mixed_duplicates.append({ 'title': title, 'episode': episode, 'sheet': sheet_filter, 'row_index': None, # Could get from items if needed 'reason': f'Item exists in both datasets but has duplicates in Coordi within {sheet_filter}', 'mismatch_type': 'MIXED_DUPLICATE_COORDI', 'duplicate_side': 'COORDI' }) return mixed_duplicates def generate_mismatch_details_for_sheet(self, categorization: Dict[str, Any], sheet_data: Dict[str, Any], sheet_filter: str) -> Dict[str, List[Dict]]: """Generate detailed information about each type of mismatch with reasons for a specific sheet""" mismatch_details = { 'kst_only': [], 'coordi_only': [], 'kst_duplicates': [], 'coordi_duplicates': [], 'mixed_duplicates': [] } # KST-only items for item in categorization['kst_only_items']: mismatch_details['kst_only'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Item exists in KST data but not in Coordi data', 'mismatch_type': 'KST_ONLY' }) # Coordi-only items for item in categorization['coordi_only_items']: mismatch_details['coordi_only'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Item exists in Coordi data but not in KST data', 'mismatch_type': 'COORDI_ONLY' }) # Find mixed duplicates first (they take priority) mixed_duplicates = self._find_sheet_specific_mixed_duplicates(sheet_data, sheet_filter) mismatch_details['mixed_duplicates'] = mixed_duplicates # Create set of items that are already covered by mixed duplicates mixed_duplicate_keys = {(item['title'], item['episode']) for item in mixed_duplicates} # KST duplicates - exclude those already covered by mixed duplicates for item in categorization['kst_duplicates']: key = (item.title, item.episode) if key not in mixed_duplicate_keys: mismatch_details['kst_duplicates'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Duplicate entry in KST data', 'mismatch_type': 'KST_DUPLICATE' }) # Coordi duplicates - exclude those already covered by mixed duplicates for item in categorization['coordi_duplicates']: key = (item.title, item.episode) if key not in mixed_duplicate_keys: mismatch_details['coordi_duplicates'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Duplicate entry in Coordi data', 'mismatch_type': 'COORDI_DUPLICATE' }) return mismatch_details def get_comparison_summary(self, sheet_filter: str | None = None) -> Dict[str, Any]: """Get a comprehensive summary of the comparison for a specific sheet only""" # Get sheet names for filtering options sheet_names = list(self.data.keys()) if self.data else [] # If no sheet filter provided, default to first sheet if not sheet_filter: sheet_filter = sheet_names[0] if sheet_names else None if not sheet_filter: raise ValueError("No sheets available or sheet filter not specified") # Validate that the requested sheet exists if sheet_filter not in sheet_names: raise ValueError(f"Sheet '{sheet_filter}' not found in data. Available sheets: {sheet_names}") # Extract data for the specific sheet only sheet_data = self.extract_kst_coordi_items_for_sheet(sheet_filter) # Categorize mismatches for this sheet categorization = self.categorize_mismatches_for_sheet(sheet_data) # Generate mismatch details for this sheet mismatch_details = self.generate_mismatch_details_for_sheet(categorization, sheet_data, sheet_filter) # Group data by title for this sheet grouped_data = self.group_by_title_for_sheet(categorization, sheet_filter) # Calculate counts matched_count = len(categorization['matched_items']) kst_total = len(sheet_data['kst_items']) coordi_total = len(sheet_data['coordi_items']) summary = { 'sheet_names': sheet_names, 'current_sheet_filter': sheet_filter, 'original_counts': { 'kst_total': kst_total, 'coordi_total': coordi_total }, 'matched_items_count': matched_count, 'mismatches': { 'kst_only_count': len(mismatch_details['kst_only']), 'coordi_only_count': len(mismatch_details['coordi_only']), 'kst_duplicates_count': len(mismatch_details['kst_duplicates']), 'coordi_duplicates_count': len(mismatch_details['coordi_duplicates']), 'mixed_duplicates_count': len(mismatch_details['mixed_duplicates']) }, 'reconciliation': categorization['reconciliation'], 'mismatch_details': mismatch_details, 'grouped_by_title': grouped_data } return summary def group_by_title_for_sheet(self, categorization: Dict[str, Any], sheet_filter: str) -> Dict[str, Any]: """Group mismatches and matches by KR title for a specific sheet""" from collections import defaultdict grouped = { 'kst_only_by_title': defaultdict(list), 'coordi_only_by_title': defaultdict(list), 'matched_by_title': defaultdict(list), 'title_summaries': {} } # Group KST only items by title for item in categorization['kst_only_items']: title = item.title grouped['kst_only_by_title'][title].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Item exists in KST data but not in Coordi data' }) # Group Coordi only items by title for item in categorization['coordi_only_items']: title = item.title grouped['coordi_only_by_title'][title].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Item exists in Coordi data but not in KST data' }) # Group matched items by title for item in categorization['matched_items']: title = item.title grouped['matched_by_title'][title].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Perfect match' }) # Create summary for each title all_titles = set() all_titles.update(grouped['kst_only_by_title'].keys()) all_titles.update(grouped['coordi_only_by_title'].keys()) all_titles.update(grouped['matched_by_title'].keys()) for title in all_titles: kst_only_count = len(grouped['kst_only_by_title'][title]) coordi_only_count = len(grouped['coordi_only_by_title'][title]) matched_count = len(grouped['matched_by_title'][title]) total_episodes = kst_only_count + coordi_only_count + matched_count grouped['title_summaries'][title] = { 'total_episodes': total_episodes, 'matched_count': matched_count, 'kst_only_count': kst_only_count, 'coordi_only_count': coordi_only_count, 'match_percentage': round((matched_count / total_episodes * 100) if total_episodes > 0 else 0, 1), 'has_mismatches': kst_only_count > 0 or coordi_only_count > 0 } # Convert defaultdicts to regular dicts for JSON serialization grouped['kst_only_by_title'] = dict(grouped['kst_only_by_title']) grouped['coordi_only_by_title'] = dict(grouped['coordi_only_by_title']) grouped['matched_by_title'] = dict(grouped['matched_by_title']) return grouped def generate_visualize_data(self, sheet_filter: str | None = None) -> List[Dict[str, Any]]: """Generate data structure for Excel-like visualization""" # Get comparison data for the specified sheet summary = self.get_comparison_summary(sheet_filter) mismatch_details = summary['mismatch_details'] visualize_rows = [] # Helper function to create a row def create_row(coordi_title="", coordi_chapter="", kst_title="", kst_chapter="", row_type="matched", reason="", title_for_sort=""): return { 'coordi_title': coordi_title, 'coordi_chapter': coordi_chapter, 'kst_title': kst_title, 'kst_chapter': kst_chapter, 'row_type': row_type, 'reason': reason, 'title_for_sort': title_for_sort or coordi_title or kst_title, 'priority': 1 if row_type != 'matched' else 2 # Mismatches first } # 1. Handle Coordi-only items for item in mismatch_details['coordi_only']: visualize_rows.append(create_row( coordi_title=item['title'], coordi_chapter=item['episode'], row_type='coordi_only', reason='Only in Coordi' )) # 2. Handle KST-only items for item in mismatch_details['kst_only']: visualize_rows.append(create_row( kst_title=item['title'], kst_chapter=item['episode'], row_type='kst_only', reason='Only in KST' )) # 3. Handle Mixed duplicates (exists in both but duplicated on one side) mixed_items = {} # Group by title+episode for item in mismatch_details['mixed_duplicates']: key = f"{item['title']}_{item['episode']}" if key not in mixed_items: mixed_items[key] = { 'title': item['title'], 'episode': item['episode'], 'has_kst_duplicate': False, 'has_coordi_duplicate': False } if item['duplicate_side'] == 'KST': mixed_items[key]['has_kst_duplicate'] = True elif item['duplicate_side'] == 'COORDI': mixed_items[key]['has_coordi_duplicate'] = True for key, item in mixed_items.items(): # First row: show it exists in both visualize_rows.append(create_row( coordi_title=item['title'], coordi_chapter=item['episode'], kst_title=item['title'], kst_chapter=item['episode'], row_type='mixed_duplicate', reason='Mixed duplicate' )) # Additional rows for duplicates if item['has_kst_duplicate']: visualize_rows.append(create_row( kst_title=item['title'], kst_chapter=item['episode'], row_type='mixed_duplicate', reason='Duplicate in KST', title_for_sort=item['title'] )) if item['has_coordi_duplicate']: visualize_rows.append(create_row( coordi_title=item['title'], coordi_chapter=item['episode'], row_type='mixed_duplicate', reason='Duplicate in Coordi', title_for_sort=item['title'] )) # 4. Handle Pure duplicates for item in mismatch_details['kst_duplicates']: visualize_rows.append(create_row( kst_title=item['title'], kst_chapter=item['episode'], row_type='pure_duplicate', reason='Duplicate in KST' )) for item in mismatch_details['coordi_duplicates']: visualize_rows.append(create_row( coordi_title=item['title'], coordi_chapter=item['episode'], row_type='pure_duplicate', reason='Duplicate in Coordi' )) # 5. Handle Matched items (perfect matches) matched_by_title = summary['grouped_by_title']['matched_by_title'] for title, items in matched_by_title.items(): for item in items: visualize_rows.append(create_row( coordi_title=item['title'], coordi_chapter=item['episode'], kst_title=item['title'], kst_chapter=item['episode'], row_type='matched', reason='Perfect match' )) # Sort: Mismatches first (priority 1), then matches (priority 2), then by Korean title + chapter def sort_key(x): # Extract episode number for proper numeric sorting coordi_episode = x.get('coordi_chapter', '') or '' kst_episode = x.get('kst_chapter', '') or '' episode = coordi_episode or kst_episode # Try to convert episode to number for proper sorting, fallback to string try: episode_num = float(episode) if episode else 0 except (ValueError, TypeError): episode_num = 0 return (x['priority'], x['title_for_sort'], episode_num) visualize_rows.sort(key=sort_key) return visualize_rows def print_comparison_summary(self, sheet_filter: str | None = None): """Print a formatted summary of the comparison for a specific sheet""" summary = self.get_comparison_summary(sheet_filter) print("=" * 80) print(f"KST vs COORDI COMPARISON SUMMARY - Sheet: {summary['current_sheet_filter']}") print("=" * 80) print(f"Original Counts:") print(f" KST Total: {summary['original_counts']['kst_total']}") print(f" Coordi Total: {summary['original_counts']['coordi_total']}") print() print(f"Matched Items: {summary['matched_items_count']}") print() print(f"Mismatches:") print(f" KST Only: {summary['mismatches']['kst_only_count']}") print(f" Coordi Only: {summary['mismatches']['coordi_only_count']}") print(f" KST Duplicates: {summary['mismatches']['kst_duplicates_count']}") print(f" Coordi Duplicates: {summary['mismatches']['coordi_duplicates_count']}") print(f" Mixed Duplicates: {summary['mismatches']['mixed_duplicates_count']}") print() print(f"Reconciliation:") reconciliation = summary['reconciliation'] print(f" After excluding mismatches:") print(f" KST Count: {reconciliation['reconciled_kst_count']}") print(f" Coordi Count: {reconciliation['reconciled_coordi_count']}") print(f" Counts Match: {reconciliation['counts_match_after_reconciliation']}") print() # Show sample mismatches for mismatch_type, details in summary['mismatch_details'].items(): if details: print(f"{mismatch_type.upper()} (showing first 3):") for i, item in enumerate(details[:3]): print(f" {i+1}. {item['title']} - Episode {item['episode']} ({item['reason']})") if len(details) > 3: print(f" ... and {len(details) - 3} more") print() if __name__ == "__main__": # Test the comparator comparator = KSTCoordiComparator("data/sample-data.xlsx") if comparator.load_data(): print("Data loaded successfully!") comparator.print_comparison_summary() else: print("Failed to load data!")