import pandas as pd import numpy as np from typing import Dict, List, Tuple, Any from dataclasses import dataclass @dataclass class ComparisonItem: """Represents a single item for comparison""" title: str episode: str source_sheet: str row_index: int def __hash__(self): return hash((self.title, self.episode)) def __eq__(self, other): if not isinstance(other, ComparisonItem): return False return self.title == other.title and self.episode == other.episode class KSTCoordiComparator: """ Compare KST and Coordi data to identify mismatches and ensure count reconciliation """ def __init__(self, excel_file_path: str): self.excel_file_path = excel_file_path self.data = {} self.kst_items = set() self.coordi_items = set() self.comparison_results = {} def load_data(self) -> bool: """Load data from Excel file""" try: excel_file = pd.ExcelFile(self.excel_file_path) for sheet_name in excel_file.sheet_names: self.data[sheet_name] = pd.read_excel(self.excel_file_path, sheet_name=sheet_name) return True except Exception as e: print(f"Error loading data: {e}") return False def extract_kst_coordi_items(self) -> Dict[str, Any]: """Extract KST and Coordi items from all sheets using column header names""" kst_items = set() coordi_items = set() kst_details = [] coordi_details = [] kst_all_items = [] # Keep all items including duplicates coordi_all_items = [] # Keep all items including duplicates for sheet_name, df in self.data.items(): columns = df.columns.tolist() # Find columns by header names # KST columns: 'Title KR' and 'Epi.' # Coordi columns: 'KR title' and 'Chap' kst_title_col = None kst_episode_col = None coordi_title_col = None coordi_episode_col = None # Find KST columns for col in columns: if col == 'Title KR': kst_title_col = col elif col == 'Epi.': kst_episode_col = col # Find Coordi columns for col in columns: if col == 'KR title': coordi_title_col = col elif col == 'Chap': coordi_episode_col = col print(f"Sheet: {sheet_name}") print(f" KST columns - Title: {kst_title_col}, Episode: {kst_episode_col}") print(f" Coordi columns - Title: {coordi_title_col}, Episode: {coordi_episode_col}") # Extract items from each row for idx, row in df.iterrows(): # Extract KST data if kst_title_col and kst_episode_col: kst_title = str(row.get(kst_title_col, '')).strip() kst_episode = str(row.get(kst_episode_col, '')).strip() # Check if this row has valid KST data has_kst_data = ( kst_title and kst_title != 'nan' and kst_episode and kst_episode != 'nan' and pd.notna(row[kst_title_col]) and pd.notna(row[kst_episode_col]) ) if has_kst_data: item = ComparisonItem(kst_title, kst_episode, sheet_name, idx) kst_items.add(item) kst_all_items.append(item) # Keep all items for duplicate detection kst_details.append({ 'title': kst_title, 'episode': kst_episode, 'sheet': sheet_name, 'row_index': idx, 'kst_data': { kst_title_col: row[kst_title_col], kst_episode_col: row[kst_episode_col] } }) # Extract Coordi data if coordi_title_col and coordi_episode_col: coordi_title = str(row.get(coordi_title_col, '')).strip() coordi_episode = str(row.get(coordi_episode_col, '')).strip() # Check if this row has valid Coordi data has_coordi_data = ( coordi_title and coordi_title != 'nan' and coordi_episode and coordi_episode != 'nan' and pd.notna(row[coordi_title_col]) and pd.notna(row[coordi_episode_col]) ) if has_coordi_data: item = ComparisonItem(coordi_title, coordi_episode, sheet_name, idx) coordi_items.add(item) coordi_all_items.append(item) # Keep all items for duplicate detection coordi_details.append({ 'title': coordi_title, 'episode': coordi_episode, 'sheet': sheet_name, 'row_index': idx, 'coordi_data': { coordi_title_col: row[coordi_title_col], coordi_episode_col: row[coordi_episode_col] } }) self.kst_items = kst_items self.coordi_items = coordi_items self.kst_all_items = kst_all_items # Store for duplicate detection self.coordi_all_items = coordi_all_items # Store for duplicate detection return { 'kst_items': kst_items, 'coordi_items': coordi_items, 'kst_details': kst_details, 'coordi_details': coordi_details, 'kst_all_items': kst_all_items, 'coordi_all_items': coordi_all_items } def categorize_mismatches(self) -> Dict[str, Any]: """Categorize data into KST-only, Coordi-only, and matched items""" if not self.kst_items or not self.coordi_items: self.extract_kst_coordi_items() # Find overlaps and differences matched_items = self.kst_items.intersection(self.coordi_items) kst_only_items = self.kst_items - self.coordi_items coordi_only_items = self.coordi_items - self.kst_items # Find duplicates within each dataset kst_duplicates = self._find_duplicates_in_list(self.kst_all_items) coordi_duplicates = self._find_duplicates_in_list(self.coordi_all_items) categorization = { 'matched_items': list(matched_items), 'kst_only_items': list(kst_only_items), 'coordi_only_items': list(coordi_only_items), 'kst_duplicates': kst_duplicates, 'coordi_duplicates': coordi_duplicates, 'counts': { 'total_kst': len(self.kst_items), 'total_coordi': len(self.coordi_items), 'matched': len(matched_items), 'kst_only': len(kst_only_items), 'coordi_only': len(coordi_only_items), 'kst_duplicates_count': len(kst_duplicates), 'coordi_duplicates_count': len(coordi_duplicates) } } # Calculate reconciled counts (after removing mismatches) reconciled_kst_count = len(matched_items) reconciled_coordi_count = len(matched_items) categorization['reconciliation'] = { 'original_kst_count': len(self.kst_items), 'original_coordi_count': len(self.coordi_items), 'reconciled_kst_count': reconciled_kst_count, 'reconciled_coordi_count': reconciled_coordi_count, 'counts_match_after_reconciliation': reconciled_kst_count == reconciled_coordi_count, 'items_to_exclude_from_kst': len(kst_only_items) + len(kst_duplicates), 'items_to_exclude_from_coordi': len(coordi_only_items) + len(coordi_duplicates) } return categorization def _find_duplicates_in_list(self, items_list: List[ComparisonItem]) -> List[ComparisonItem]: """Find duplicate items within a dataset""" seen = set() duplicates = [] for item in items_list: key = (item.title, item.episode) if key in seen: duplicates.append(item) else: seen.add(key) return duplicates def _find_sheet_specific_mixed_duplicates(self, sheet_filter: str) -> List[Dict]: """Find mixed duplicates within a specific sheet only""" if not sheet_filter: return [] mixed_duplicates = [] # Extract items specific to this sheet extract_results = self.extract_kst_coordi_items() kst_sheet_items = [item for item in extract_results['kst_all_items'] if item.source_sheet == sheet_filter] coordi_sheet_items = [item for item in extract_results['coordi_all_items'] if item.source_sheet == sheet_filter] # Find duplicates within this sheet kst_sheet_duplicates = self._find_duplicates_in_list(kst_sheet_items) coordi_sheet_duplicates = self._find_duplicates_in_list(coordi_sheet_items) # Create sets for items that exist in both KST and Coordi within this sheet kst_sheet_set = {(item.title, item.episode) for item in kst_sheet_items} coordi_sheet_set = {(item.title, item.episode) for item in coordi_sheet_items} matched_in_sheet = kst_sheet_set.intersection(coordi_sheet_set) # Create sets of duplicate keys within this sheet kst_duplicate_keys = {(item.title, item.episode) for item in kst_sheet_duplicates} coordi_duplicate_keys = {(item.title, item.episode) for item in coordi_sheet_duplicates} # Find matched items that also have duplicates within the same sheet for title, episode in matched_in_sheet: # Check if this matched item has duplicates in KST within this sheet if (title, episode) in kst_duplicate_keys: mixed_duplicates.append({ 'title': title, 'episode': episode, 'sheet': sheet_filter, 'row_index': None, # Could get from items if needed 'reason': f'Item exists in both datasets but has duplicates in KST within {sheet_filter}', 'mismatch_type': 'MIXED_DUPLICATE_KST', 'duplicate_side': 'KST' }) # Check if this matched item has duplicates in Coordi within this sheet if (title, episode) in coordi_duplicate_keys: mixed_duplicates.append({ 'title': title, 'episode': episode, 'sheet': sheet_filter, 'row_index': None, # Could get from items if needed 'reason': f'Item exists in both datasets but has duplicates in Coordi within {sheet_filter}', 'mismatch_type': 'MIXED_DUPLICATE_COORDI', 'duplicate_side': 'COORDI' }) return mixed_duplicates def generate_mismatch_details(self) -> Dict[str, List[Dict]]: """Generate detailed information about each type of mismatch with reasons""" categorization = self.categorize_mismatches() mismatch_details = { 'kst_only': [], 'coordi_only': [], 'kst_duplicates': [], 'coordi_duplicates': [], 'mixed_duplicates': [] } # KST-only items for item in categorization['kst_only_items']: mismatch_details['kst_only'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Item exists in KST data but not in Coordi data', 'mismatch_type': 'KST_ONLY' }) # Coordi-only items for item in categorization['coordi_only_items']: mismatch_details['coordi_only'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Item exists in Coordi data but not in KST data', 'mismatch_type': 'COORDI_ONLY' }) # KST duplicates for item in categorization['kst_duplicates']: mismatch_details['kst_duplicates'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Duplicate entry in KST data', 'mismatch_type': 'KST_DUPLICATE' }) # Coordi duplicates for item in categorization['coordi_duplicates']: mismatch_details['coordi_duplicates'].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Duplicate entry in Coordi data', 'mismatch_type': 'COORDI_DUPLICATE' }) # Mixed duplicates will be calculated per sheet in get_comparison_summary mismatch_details['mixed_duplicates'] = [] return mismatch_details def get_comparison_summary(self, sheet_filter: str = None) -> Dict[str, Any]: """Get a comprehensive summary of the comparison, filtered by a specific sheet""" # Get sheet names for filtering options sheet_names = list(self.data.keys()) if self.data else [] # If no sheet filter provided, default to first sheet if not sheet_filter: sheet_filter = sheet_names[0] if sheet_names else None if not sheet_filter: raise ValueError("No sheets available or sheet filter not specified") categorization = self.categorize_mismatches() mismatch_details = self.generate_mismatch_details() grouped_data = self.group_by_title() # Always apply sheet filtering (no more "All Sheets" option) mismatch_details = self.filter_by_sheet(mismatch_details, sheet_filter) grouped_data = self.filter_grouped_data_by_sheet(grouped_data, sheet_filter) # Calculate mixed duplicates specific to this sheet mismatch_details['mixed_duplicates'] = self._find_sheet_specific_mixed_duplicates(sheet_filter) # Recalculate counts for filtered data filtered_counts = self.calculate_filtered_counts(mismatch_details) summary = { 'sheet_names': sheet_names, 'current_sheet_filter': sheet_filter, 'original_counts': { 'kst_total': filtered_counts['kst_total'], 'coordi_total': filtered_counts['coordi_total'] }, 'matched_items_count': filtered_counts['matched'], 'mismatches': { 'kst_only_count': filtered_counts['kst_only_count'], 'coordi_only_count': filtered_counts['coordi_only_count'], 'kst_duplicates_count': filtered_counts['kst_duplicates_count'], 'coordi_duplicates_count': filtered_counts['coordi_duplicates_count'] }, 'reconciliation': categorization['reconciliation'], 'mismatch_details': mismatch_details, 'grouped_by_title': grouped_data } return summary def filter_by_sheet(self, mismatch_details: Dict[str, List], sheet_filter: str) -> Dict[str, List]: """Filter mismatch details by specific sheet""" filtered = {} for category, items in mismatch_details.items(): filtered[category] = [item for item in items if item.get('sheet') == sheet_filter] return filtered def filter_grouped_data_by_sheet(self, grouped_data: Dict, sheet_filter: str) -> Dict: """Filter grouped data by specific sheet""" filtered = { 'kst_only_by_title': {}, 'coordi_only_by_title': {}, 'matched_by_title': {}, 'title_summaries': {} } # Filter each category for category in ['kst_only_by_title', 'coordi_only_by_title', 'matched_by_title']: for title, items in grouped_data[category].items(): filtered_items = [item for item in items if item.get('sheet') == sheet_filter] if filtered_items: filtered[category][title] = filtered_items # Recalculate title summaries for filtered data all_titles = set() all_titles.update(filtered['kst_only_by_title'].keys()) all_titles.update(filtered['coordi_only_by_title'].keys()) all_titles.update(filtered['matched_by_title'].keys()) for title in all_titles: kst_only_count = len(filtered['kst_only_by_title'].get(title, [])) coordi_only_count = len(filtered['coordi_only_by_title'].get(title, [])) matched_count = len(filtered['matched_by_title'].get(title, [])) total_episodes = kst_only_count + coordi_only_count + matched_count filtered['title_summaries'][title] = { 'total_episodes': total_episodes, 'matched_count': matched_count, 'kst_only_count': kst_only_count, 'coordi_only_count': coordi_only_count, 'match_percentage': round((matched_count / total_episodes * 100) if total_episodes > 0 else 0, 1), 'has_mismatches': kst_only_count > 0 or coordi_only_count > 0 } return filtered def calculate_filtered_counts(self, filtered_mismatch_details: Dict[str, List]) -> Dict[str, int]: """Calculate counts for filtered data""" return { 'kst_total': len(filtered_mismatch_details['kst_only']) + len(filtered_mismatch_details['kst_duplicates']), 'coordi_total': len(filtered_mismatch_details['coordi_only']) + len(filtered_mismatch_details['coordi_duplicates']), 'matched': 0, # Will be calculated from matched data separately 'kst_only_count': len(filtered_mismatch_details['kst_only']), 'coordi_only_count': len(filtered_mismatch_details['coordi_only']), 'kst_duplicates_count': len(filtered_mismatch_details['kst_duplicates']), 'coordi_duplicates_count': len(filtered_mismatch_details['coordi_duplicates']), 'mixed_duplicates_count': len(filtered_mismatch_details.get('mixed_duplicates', [])) } def group_by_title(self) -> Dict[str, Any]: """Group mismatches and matches by KR title""" from collections import defaultdict grouped = { 'kst_only_by_title': defaultdict(list), 'coordi_only_by_title': defaultdict(list), 'matched_by_title': defaultdict(list), 'title_summaries': {} } # Get mismatch details mismatch_details = self.generate_mismatch_details() # Group KST only items by title for item in mismatch_details['kst_only']: title = item['title'] grouped['kst_only_by_title'][title].append(item) # Group Coordi only items by title for item in mismatch_details['coordi_only']: title = item['title'] grouped['coordi_only_by_title'][title].append(item) # Group matched items by title if hasattr(self, 'kst_items') and hasattr(self, 'coordi_items'): categorization = self.categorize_mismatches() matched_items = categorization['matched_items'] for item in matched_items: title = item.title grouped['matched_by_title'][title].append({ 'title': item.title, 'episode': item.episode, 'sheet': item.source_sheet, 'row_index': item.row_index, 'reason': 'Perfect match' }) # Create summary for each title all_titles = set() all_titles.update(grouped['kst_only_by_title'].keys()) all_titles.update(grouped['coordi_only_by_title'].keys()) all_titles.update(grouped['matched_by_title'].keys()) for title in all_titles: kst_only_count = len(grouped['kst_only_by_title'][title]) coordi_only_count = len(grouped['coordi_only_by_title'][title]) matched_count = len(grouped['matched_by_title'][title]) total_episodes = kst_only_count + coordi_only_count + matched_count grouped['title_summaries'][title] = { 'total_episodes': total_episodes, 'matched_count': matched_count, 'kst_only_count': kst_only_count, 'coordi_only_count': coordi_only_count, 'match_percentage': round((matched_count / total_episodes * 100) if total_episodes > 0 else 0, 1), 'has_mismatches': kst_only_count > 0 or coordi_only_count > 0 } # Convert defaultdicts to regular dicts for JSON serialization grouped['kst_only_by_title'] = dict(grouped['kst_only_by_title']) grouped['coordi_only_by_title'] = dict(grouped['coordi_only_by_title']) grouped['matched_by_title'] = dict(grouped['matched_by_title']) return grouped def print_comparison_summary(self): """Print a formatted summary of the comparison""" summary = self.get_comparison_summary() print("=" * 80) print("KST vs COORDI COMPARISON SUMMARY") print("=" * 80) print(f"Original Counts:") print(f" KST Total: {summary['original_counts']['kst_total']}") print(f" Coordi Total: {summary['original_counts']['coordi_total']}") print() print(f"Matched Items: {summary['matched_items_count']}") print() print(f"Mismatches:") print(f" KST Only: {summary['mismatches']['kst_only_count']}") print(f" Coordi Only: {summary['mismatches']['coordi_only_count']}") print(f" KST Duplicates: {summary['mismatches']['kst_duplicates_count']}") print(f" Coordi Duplicates: {summary['mismatches']['coordi_duplicates_count']}") print() print(f"Reconciliation:") reconciliation = summary['reconciliation'] print(f" After excluding mismatches:") print(f" KST Count: {reconciliation['reconciled_kst_count']}") print(f" Coordi Count: {reconciliation['reconciled_coordi_count']}") print(f" Counts Match: {reconciliation['counts_match_after_reconciliation']}") print() # Show sample mismatches for mismatch_type, details in summary['mismatch_details'].items(): if details: print(f"{mismatch_type.upper()} (showing first 3):") for i, item in enumerate(details[:3]): print(f" {i+1}. {item['title']} - Episode {item['episode']} ({item['reason']})") if len(details) > 3: print(f" ... and {len(details) - 3} more") print() if __name__ == "__main__": # Test the comparator comparator = KSTCoordiComparator("data/sample-data.xlsx") if comparator.load_data(): print("Data loaded successfully!") comparator.print_comparison_summary() else: print("Failed to load data!")