# Local Student Fees Extractor - Standalone Version

This notebook is a standalone version of the local fees extraction strategy from the AI Data Extractor project.
It includes all the necessary components: vector search, reranking, LLM extraction, and result processing.

## Requirements
- Conda environment: `rag_env` (activate before running: `conda activate rag_env`)
- Required API keys: OPENAI_API_KEY, GROQ_API_KEY, COHERE_API_KEY
- Vector database with indexed university data


## 1- import libraries

In [15]:
#!/usr/bin/env python3
import os
import sys
import json
import time
import datetime
import json_repair
from typing import List, Dict
from pathlib import Path
from pydantic import BaseModel, Field
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# LangChain imports
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_chroma import Chroma
from langchain_groq import ChatGroq

# Optional imports
try:
    from sentence_transformers import CrossEncoder
    CROSSENCODER_AVAILABLE = True
except ImportError:
    CROSSENCODER_AVAILABLE = False
    print("‚ö†Ô∏è CrossEncoder not available. Install sentence-transformers for local reranking.")

try:
    import cohere
    COHERE_AVAILABLE = True
    print("‚úÖ Cohere library imported successfully")
except ImportError:
    COHERE_AVAILABLE = False
    print("‚ö†Ô∏è Cohere not available. Install cohere for API-based reranking.")
    print("üí° Run: pip install cohere")
    print("üí° Or in conda: conda install -c conda-forge cohere")

print("‚úÖ All basic dependencies imported successfully!")


‚úÖ Cohere library imported successfully
‚úÖ All basic dependencies imported successfully!


## 2- configuration

In [16]:
class Config:
    """Standalone configuration based on the original config.py"""
    
    # ============================================================================
    # UNIVERSITY SETTINGS
    # ============================================================================
    # Set this to your target university for testing
    UNIVERSITY_NAME = "cyberjaya.edu.my_urls_cleaned.txt"  # Change this for different university
    
    # Project paths (adjust if needed)
    _PROJECT_ROOT = Path.cwd()  # Current working directory
    CLEANED_DATA_DIR = str(_PROJECT_ROOT / "Cleaned Data")
    OUTPUT_DIRECTORY = str(_PROJECT_ROOT / "programs_names_output_test")
    
    # ============================================================================
    # LLM SETTINGS
    # ============================================================================
    LLM_PROVIDER = os.getenv("LLM_PROVIDER", "groq")  # "groq", "openai", or "deepseek"
    
    # GROQ SETTINGS (default)
    LLM_MODEL = os.getenv("GROQ_LLM_MODEL", "openai/gpt-oss-20b")  #openai/gpt-oss-20b   #Qwen/Qwen3-32B
    LLM_API_KEY = os.getenv("GROQ_API_KEY")
    LLM_API_BASE = os.getenv("GROQ_API_BASE", "https://api.groq.com/openai/v1")
    LLM_TEMPERATURE = float(os.getenv("LLM_TEMPERATURE", "0.01"))
    
    # OPENAI SETTINGS
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    OPENAI_LLM_MODEL = os.getenv("OPENAI_LLM_MODEL", "gpt-4o")
    
    # ============================================================================
    # VECTOR STORE SETTINGS
    # ============================================================================
    EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-large")
    VECTOR_DB_PATH = str(_PROJECT_ROOT / "agents" / "chroma_langchain_db")
    
    # ============================================================================
    # RERANKER SETTINGS
    # ============================================================================
    CROSSENCODER_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    COHERE_MODEL = "rerank-english-v3.0"
    COHERE_API_KEY = os.getenv("CO_API_KEY") 
    USE_COHERE_RERANKER = True  # Set to False to use CrossEncoder
    
    # Debug Cohere settings
    print(f"üîç Debug - COHERE_API_KEY exists: {bool(COHERE_API_KEY)}")
    if COHERE_API_KEY:
        print(f"üîç Debug - COHERE_API_KEY length: {len(COHERE_API_KEY)}")
    print(f"üîç Debug - USE_COHERE_RERANKER: {USE_COHERE_RERANKER}")
    
    # ============================================================================
    # PROCESSING SETTINGS
    # ============================================================================
    DEFAULT_BATCH_SIZE = 10
    DEFAULT_VECTOR_SEARCH_K = 30
    DEFAULT_RERANK_TOP_N = 4
    BATCH_DELAY_SECONDS = 2
    
    @classmethod
    def get_collection_name(cls, university_name=None):
        """Get collection name for specified university"""
        university = university_name or cls.UNIVERSITY_NAME
        clean_name = university.replace('_urls_cleaned.txt', '')
        return f"{clean_name}_collection"
    
    @classmethod
    def get_programs_file(cls, university_name=None):
        """Get the programs file path for specified university"""
        university = university_name or cls.UNIVERSITY_NAME
        clean_domain = university.replace('_urls_cleaned.txt', '')
        filename = f"{university}_programs.json"
        return Path(cls.OUTPUT_DIRECTORY) / clean_domain / filename
    
    @classmethod
    def setup_environment(cls):
        """Setup environment variables"""
        if cls.OPENAI_API_KEY:
            os.environ['OPENAI_API_KEY'] = cls.OPENAI_API_KEY
        if cls.LLM_API_KEY:
            os.environ['GROQ_API_KEY'] = cls.LLM_API_KEY
        print(f"ü§ñ LLM Provider: {cls.LLM_PROVIDER}")
        print(f"üè´ University: {cls.UNIVERSITY_NAME}")

# Initialize configuration
Config.setup_environment()
print("‚úÖ Configuration initialized successfully!")


üîç Debug - COHERE_API_KEY exists: True
üîç Debug - COHERE_API_KEY length: 40
üîç Debug - USE_COHERE_RERANKER: True
ü§ñ LLM Provider: groq
üè´ University: cyberjaya.edu.my_urls_cleaned.txt
‚úÖ Configuration initialized successfully!


## 3. Pydantic Models for local Student Fees

In [17]:
# Pydantic models for local student fees (from local_fees_strategy.py)
class LocalStudentFees(BaseModel):
    program_name: str = Field(description="The name of the academic program")
    local_tuition_fees: Dict = Field(
        description="Structured local tuition fees with keys: _unit, _currency, _official_amount, _discounted_amount"
    )
    local_deposit_fees: Dict = Field(
        description="Structured local deposit fees with keys: _currency, _amount"
    )
    local_application_fees: Dict = Field(
        description="Structured local application fees with keys: _currency, _amount"
    )
    confidence: float = Field(description="Confidence score (0.0 to 1.0)", ge=0.0, le=1.0)

class BatchLocalStudentFees(BaseModel):
    programs: List[LocalStudentFees] = Field(description="List of local student fees information")

print("‚úÖ Pydantic models defined successfully!")

‚úÖ Pydantic models defined successfully!


## 4- UniversityServices class

In [18]:
class UniversityServices:
    """Services for vector search, reranking, and LLM calls"""
    
    def __init__(self, collection_name=None):
        print("üîÑ Initializing services...")
        self.collection_name = collection_name or Config.get_collection_name()
        self.embeddings = self._init_embeddings()
        self.vector_store = self._init_vector_store()
        self.cross_encoder = self._init_cross_encoder()
        self.cohere_client = self._init_cohere()
        self.llm = self._init_llm()
        print("‚úÖ All services initialized successfully!")
    
    def _init_embeddings(self):
        """Initialize OpenAI embeddings"""
        return OpenAIEmbeddings(model=Config.EMBEDDING_MODEL)
    
    def _init_vector_store(self):
        """Initialize Chroma vector store"""
        print(f"üè´ Using collection: {self.collection_name}")
        return Chroma(
            collection_name=self.collection_name,
            embedding_function=self.embeddings,
            persist_directory=Config.VECTOR_DB_PATH
        )
    
    def _init_cross_encoder(self):
        """Initialize CrossEncoder for reranking"""
        if not CROSSENCODER_AVAILABLE:
            return None
        try:
            cross_encoder = CrossEncoder(Config.CROSSENCODER_MODEL)
            print(f"‚úÖ CrossEncoder ({Config.CROSSENCODER_MODEL}) initialized successfully.")
            return cross_encoder
        except Exception as e:
            print(f"‚ö†Ô∏è CrossEncoder initialization error: {e}")
            return None
    
    def _init_cohere(self):
        """Initialize Cohere client for reranking"""
        if not COHERE_AVAILABLE or not Config.COHERE_API_KEY:
            return None
            
        if not Config.COHERE_API_KEY:
            print("‚ö†Ô∏è COHERE_API_KEY not found in environment variables")
            return None
            
        try:
            cohere_client = cohere.Client(Config.COHERE_API_KEY)
            print(f"‚úÖ Cohere client ({Config.COHERE_MODEL}) initialized successfully.")
            return cohere_client
        except Exception as e:
            print(f"‚ö†Ô∏è Cohere initialization error: {e}")
            return None
    
    def _init_llm(self):
        """Initialize LLM based on provider"""
        provider = Config.LLM_PROVIDER.lower()
        
        if provider == "openai":
            llm = ChatOpenAI(
                model=Config.OPENAI_LLM_MODEL,
                api_key=Config.OPENAI_API_KEY,
                temperature=Config.LLM_TEMPERATURE
            )
            print("‚úÖ OpenAI LLM initialized successfully.")
            return llm
        
        # Default to Groq
        llm = ChatGroq(
            model=Config.LLM_MODEL,
            api_key=Config.LLM_API_KEY,
            temperature=Config.LLM_TEMPERATURE,
            max_tokens=40960,  # Maximum allowed output tokens
            #reasoning_format="hidden",  # Suppress <think> tags for Qwen models
            timeout=None
        )
        print("‚úÖ Groq LLM initialized successfully.")
        return llm
    
    def vector_search(self, query: str, k: int = None) -> List[Dict]:
        """Perform vector search and return contexts"""
        if k is None:
            k = Config.DEFAULT_VECTOR_SEARCH_K
        
        print("üìö Performing vector search...")
        docs = self.vector_store.similarity_search(query, k=k)
        
        # Convert to contexts format
        contexts = []
        for doc in docs:
            contexts.append({
                'text': doc.page_content,
                'metadata': doc.metadata
            })
        
        print(f"‚úÖ Found {len(contexts)} documents")
        return contexts
    
    def rerank_with_cross_encoder(self, query: str, contexts: List[Dict], top_n: int = None) -> List[Dict]:
        """Rerank contexts using CrossEncoder"""
        if top_n is None:
            top_n = Config.DEFAULT_RERANK_TOP_N
        
        if not contexts or not self.cross_encoder:
            return contexts[:top_n]
        
        print(f"üîÑ Reranking {len(contexts)} documents with CrossEncoder...")
        
        try:
            query_context_pairs = [[query, context['text']] for context in contexts]
            scores = self.cross_encoder.predict(query_context_pairs)
            
            for i, score in enumerate(scores):
                contexts[i]['rerank_score'] = float(score)
            
            reranked_contexts = sorted(contexts, key=lambda x: x['rerank_score'], reverse=True)[:top_n]
            print(f"‚úÖ Reranked to top {len(reranked_contexts)} documents")
            return reranked_contexts
            
        except Exception as e:
            print(f"‚ùå CrossEncoder Error: {e}")
            return contexts[:top_n]
    
    def rerank_with_cohere(self, query: str, contexts: List[Dict], top_n: int = None) -> List[Dict]:
        """Rerank contexts using Cohere Rerank API"""
        if top_n is None:
            top_n = Config.DEFAULT_RERANK_TOP_N
        
        if not contexts or not self.cohere_client:
            return contexts[:top_n]
        
        documents_to_rerank = [context['text'] for context in contexts]
        print(f"üîÑ Sending {len(documents_to_rerank)} documents to Cohere Rerank API...")
        
        try:
            rerank_results = self.cohere_client.rerank(
                query=query,
                documents=documents_to_rerank,
                top_n=top_n,
                model=Config.COHERE_MODEL
            )
            
            reranked_contexts = []
            for result in rerank_results.results:
                original_context = contexts[result.index]
                original_context['rerank_score'] = result.relevance_score
                reranked_contexts.append(original_context)
            
            print(f"‚úÖ Reranked to top {len(reranked_contexts)} documents")
            return reranked_contexts
            
        except Exception as e:
            print(f"‚ùå Cohere API Error: {e}")
            return contexts[:top_n]

print("‚úÖ UniversityServices class defined successfully!")


‚úÖ UniversityServices class defined successfully!


## 5. local Fees Strategy Implementation


In [19]:
# This cell should contain the LocalFeesStrategy class.
# Please replace the existing InternationalFeesStrategy class with this one.

class LocalFeesStrategy:
    """Local Student Fees Extraction Strategy - Standalone Implementation"""
    
    def get_description(self) -> str:
        return "Local Student Fees Information Extractor (BATCHED)"
    
    def get_search_description(self) -> str:
        return "local student fees"
    
    def get_search_query(self, program_batch: List[str]) -> str:
        """Create comprehensive search query for local student fees"""
        all_programs_query = " OR ".join([f'"{program}"' for program in program_batch])
        return f"local student tuition fees domestic national citizen deposit application ({all_programs_query})"
    
    def get_extraction_messages(self, context: str, program_batch: List[str]) -> List[Dict]:
        """Create messages for local student fees extraction"""
        schema_str = json.dumps(BatchLocalStudentFees.model_json_schema(), ensure_ascii=False)
        programs_list = "\\n".join([f"- {program}" for program in program_batch])
        
        system_instructions = [
            "Your task is to find LOCAL STUDENT tuition and fees for ALL the programs mentioned.",
            "IMPORTANT: Focus ONLY on LOCAL/DOMESTIC student fees, NOT international student fees.",
            "DEFAULT RULE: If tuition fees don't specify student type (local/international), treat them as LOCAL fees.",
            "OUTPUT FORMAT: For each program, return OBJECTS with STRICT keys:",
            "local_tuition_fees: { '_unit': 'Years|Semesters|Annual|Per credit', '_currency': 'USD|EUR|TRY|GBP|...', '_official_amount': 5000, '_discounted_amount': 4500|null }",
            "local_deposit_fees: { '_currency': 'USD|EUR|TRY|GBP|...', '_amount': 1000|null }",
            "local_application_fees: { '_currency': 'USD|EUR|TRY|GBP|...', '_amount': 100|null }",
            "Amounts must be NUMBERS (no symbols). Use currency CODES. If not found, set the field to null and keep the object.",
            "DEFINITION: '_discounted_amount' means the price AFTER discount and MUST be less than or equal to '_official_amount'.",
            "CONSTRAINTS: '_discounted_amount' MUST use the SAME '_unit' and '_currency' as '_official_amount'. If you are not certain or values are inconsistent, set '_discounted_amount' to null. Do NOT guess or swap meanings.",
            "KEY DEFINITIONS - Understand the difference:",
            "",
            "‚Ä¢ APPLICATION FEES:",
            "  - Timing: Paid BEFORE acceptance/enrollment (during application submission)",
            "  - Purpose: To process and review your application",
            "  - Common terms: 'application fee', 'processing fee', 'registration fee' (when applying)",
            "  - Example: '$100 application fee to submit your application'",
            "",
            "‚Ä¢ DEPOSIT FEES:",
            "  - Timing: Paid AFTER acceptance (to confirm enrollment)",
            "  - Purpose: To reserve/secure your spot, partial payment toward tuition",
            "  - Common terms: 'enrollment deposit', 'seat deposit', 'confirmation fee', 'reservation fee'",
            "  - Example: '$1000 deposit to secure your place, deducted from first semester tuition'",
            "",
            "‚ö†Ô∏è DISAMBIGUATION RULES:",
            "  - If fee is paid 'to apply' or 'with application' ‚Üí APPLICATION FEE",
            "  - If fee is paid 'after acceptance' or 'to confirm enrollment' ‚Üí DEPOSIT FEE",
            "  - If fee is 'deducted from tuition' or 'refundable' ‚Üí DEPOSIT FEE",
            "  - If unclear or ambiguous ‚Üí set to null",
            ""
        ]
        
        search_terms = [
            "IMPORTANT DISTINCTIONS:",
            "- Look for terms like 'local', 'domestic', 'national', 'in-state', 'citizens'",
            "- Exclude 'international', 'foreign', 'overseas' student fees",
            "- Identify units (annual, per year, per semester) and convert to a normalized label",
            "- Extract currency CODE and numeric AMOUNT"
        ]
        
        guidelines = [
            "- Extract numeric amounts only (e.g., 5,000 USD ‚Üí amount: 5000, currency: USD)",
            "- Normalize units to one of: 'Years', 'Semesters', 'Annual', 'Per credit'",
            "- If only one tuition amount is present, set it as _official_amount and leave _discounted_amount null",
            "- Ensure '_discounted_amount' ‚â§ '_official_amount'. If not, set '_discounted_amount' to null",
            "- Ensure both amounts (if present) share the SAME unit and currency; otherwise set '_discounted_amount' to null",
            "- If fees are 'free' or 'waived', set amount to 0",
            "- If information spans multiple sections, combine logically; prefer the most common values"
        ]
        
        fields_to_extract = [
            "Local Tuition Fees (unit, currency, official amount, discounted amount)",
            "Local Deposit Fees (currency, amount)",
            "Local Application Fees (currency, amount)"
        ]
        
        # This assumes _create_base_messages is defined in the notebook.
        # If not, you may need to copy it from the InternationalFeesStrategy as well.
        return self._create_base_messages(
            schema_str, programs_list, context, 
            system_instructions, search_terms, guidelines, fields_to_extract
        )

    def _create_base_messages(self, schema_str: str, programs_list: str, context: str, 
                             system_instructions: List[str], search_terms: List[str], 
                             guidelines: List[str], fields_to_extract: List[str]) -> List[Dict]:
        """Helper method to create standardized messages"""
        
        system_content = [
            "You are an expert at extracting academic program information.",
            f"Your task is to find {', '.join(fields_to_extract).upper()} for ALL the programs mentioned.",
            "IMPORTANT: Process ALL programs in the list and return information for each one.",
            "LANGUAGE POLICY: Always respond in English. If the provided context is not in English, translate all extracted information and labels into clear English. Preserve numeric values and currency codes; transliterate proper nouns if needed.",
            ""
        ] + system_instructions + [
            "",
            "SEARCH TERMS TO LOOK FOR:"
        ] + search_terms + [
            "",
            "IMPORTANT GUIDELINES:"
        ] + guidelines + [
            "",
            "If specific information is not found for a program, return empty strings for those fields.",
            "Assess your confidence based on how complete and relevant the found information is for each program.",
            "",
            "Please respond in exactly the following JSON format:",
            "",
            "```json",
            schema_str,
            "```"
        ]
        
        human_content = [
            "## Programs to Process:",
            programs_list,
            "",
            "## Context:",
            context,
            "",
            "## Output Schema:",
            schema_str,
            "",
            f"## Find {', '.join(fields_to_extract).upper()} for ALL programs listed above:",
            "Extract the following information for each program:"
        ] + [f"{i+1}. {field}" for i, field in enumerate(fields_to_extract)] + [
            "",
            "Return the information for ALL programs in the specified JSON format with a 'programs' array."
        ]
        
        return [
            {
                "role": "system",
                "content": "\\n".join(system_content)
            },
            {
                "role": "user", 
                "content": "\\n".join(human_content)
            }
        ]

    def create_result_from_data(self, found_data: Dict, program_name: str) -> Dict:
        """Create result dictionary from found data"""
        result = {
            "program_name": found_data.get("program_name", program_name),
            "local_tuition_fees": found_data.get("local_tuition_fees", {
                "_unit": "",
                "_currency": "",
                "_official_amount": None,
                "_discounted_amount": None
            }),
            "local_deposit_fees": found_data.get("local_deposit_fees", {
                "_currency": "",
                "_amount": None
            }),
            "local_application_fees": found_data.get("local_application_fees", {
                "_currency": "",
                "_amount": None
            }),
            "confidence": float(found_data.get("confidence", 0.0))
        }

        # Post-validation: discounted must be <= official and same unit/currency
        try:
            lt = result.get("local_tuition_fees", {})
            off = lt.get("_official_amount")
            disc = lt.get("_discounted_amount")
            unit = lt.get("_unit")
            cur = lt.get("_currency")

            if isinstance(off, (int, float)) and isinstance(disc, (int, float)):
                same_unit = isinstance(unit, str) and unit.strip() != ""
                same_currency = isinstance(cur, str) and cur.strip() != ""
                if disc > off or not (same_unit and same_currency):
                    lt["_discounted_amount"] = None
        except Exception:
            pass

        return result

    def create_empty_result(self, program_name: str) -> Dict:
        """Create empty result when no data found"""
        return {
            "program_name": program_name,
            "local_tuition_fees": {"_unit": "", "_currency": "", "_official_amount": None, "_discounted_amount": None},
            "local_deposit_fees": {"_currency": "", "_amount": None},
            "local_application_fees": {"_currency": "", "_amount": None},
            "confidence": 0.0
        }

    def create_empty_results(self, program_batch: List[str]) -> List[Dict]:
        """Create empty results for entire batch when no data found"""
        return [self.create_empty_result(program) for program in program_batch]

    def display_result(self, result: Dict) -> None:
        """Display result in a formatted way"""
        print(f"\\nüè† LOCAL STUDENT FEES for {result['program_name']}:")
        
        tuition = result.get('local_tuition_fees', {})
        if tuition.get('_official_amount'):
            official = f"{tuition.get('_official_amount', 'N/A')} {tuition.get('_currency', '')} per {tuition.get('_unit', '')}"
            print(f"   Official Tuition: {official}")
            if tuition.get('_discounted_amount'):
                discounted = f"{tuition.get('_discounted_amount', 'N/A')} {tuition.get('_currency', '')} per {tuition.get('_unit', '')}"
                print(f"   Discounted Tuition: {discounted}")
        else:
            print(f"   Tuition: Not found")
        
        deposit = result.get('local_deposit_fees', {})
        if deposit.get('_amount'):
            deposit_str = f"{deposit.get('_amount', 'N/A')} {deposit.get('_currency', '')}"
            print(f"   Deposit Fees: {deposit_str}")
        else:
            print(f"   Deposit Fees: Not found")
            
        application = result.get('local_application_fees', {})
        if application.get('_amount'):
            app_str = f"{application.get('_amount', 'N/A')} {application.get('_currency', '')}"
            print(f"   Application Fees: {app_str}")
        else:
            print(f"   Application Fees: Not found")
            
        print(f"   Confidence: {result.get('confidence', 0.0):.2f}")

print("‚úÖ LocalFeesStrategy class defined successfully!")

‚úÖ LocalFeesStrategy class defined successfully!


## 6- load_programs_from_json function

In [20]:
def _parse_json_safely(raw: str) -> dict:
    """Safely parse JSON from potentially corrupted LLM responses"""
    try:
        from typing import Optional, Dict, Any
        # Strip whitespace and common patterns
        text = raw.strip()
        
        # Handle multiple code fence variations
        if text.startswith("```json"):
            # Remove ```json at start
            text = text[7:].strip()
            if text.endswith("```"):
                text = text[:-3].strip()
        elif text.startswith("```"):
            # Remove ``` at start (without json)
            lines = text.split("\\n")
            if len(lines) > 1:
                text = "\\n".join(lines[1:])
            else:
                text = text[3:]
            if text.endswith("```"):
                text = text[:-3].strip()
        
        # Remove any remaining thinking tags or extra text
        if "<think>" in text and "</think>" in text:
            # Remove everything from <think> to </think>
            import re
            text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
        
        # Extract JSON array/object region heuristically
        text = text.strip()
        if "[" in text and "]" in text:
            # For arrays, find the outermost brackets
            start = text.find("[")
            end = text.rfind("]") + 1
            text = text[start:end]
        elif "{" in text and "}" in text:
            # For objects, find the outermost braces
            start = text.find("{")
            end = text.rfind("}") + 1
            text = text[start:end]
        
        # Try to parse with json_repair for better error recovery
        return json_repair.loads(text)
    except Exception as e:
        print(f"[JSON Parser] JSON parsing failed: {e}")
        print(f"[JSON Parser] Raw text (first 200 chars): {raw[:200]}")
        return None

def load_programs_from_json(filepath: str = None) -> List[str]:
    """Load programs from university-specific programs file"""
    if filepath is None:
        filepath = Config.get_programs_file()
    
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        programs = data.get('programs', [])
        print(f"‚úÖ Loaded {len(programs)} programs from {filepath}")
        return programs
        
    except FileNotFoundError:
        print(f"‚ùå File {filepath} not found!")
        return []
    except json.JSONDecodeError as e:
        print(f"‚ùå Error parsing JSON file: {e}")
        return []
    except Exception as e:
        print(f"‚ùå Error loading programs: {e}")
        return []

def extract_url_from_text(text: str) -> str:
    """Extract the actual URL from text content"""
    import re
    url_pattern = r'https?://[^\\s\\n]+'
    urls = re.findall(url_pattern, text)
    return urls[0] if urls else 'unknown'

def prepare_shared_sources(reranked_contexts: List[Dict], top_n: int = 3) -> List[Dict]:
    """Prepare shared source information from reranked contexts"""
    shared_sources = []
    for i, chunk in enumerate(reranked_contexts[:top_n], 1):
        metadata = chunk.get('metadata', {})
        text = chunk.get('text', '')
        actual_url = extract_url_from_text(text)
        
        shared_sources.append({
            "rank": i,
            "score": chunk.get('rerank_score', 0.0),
            "source": metadata.get('source', 'unknown'),
            "url": actual_url,
            "chunk_id": metadata.get('chunk_id', 'unknown'),
            "text_preview": text[:200] + "..." if len(text) > 200 else text
        })
    
    return shared_sources

def match_program_in_results(program: str, parsed_results: Dict) -> Dict:
    """Find matching program in parsed results using fuzzy matching"""
    program_clean = program.strip()
    
    # Try to find exact match or partial match
    for parsed_name, parsed_data in parsed_results.items():
        if (parsed_name.lower() == program_clean.lower() or 
            program_clean.lower() in parsed_name.lower() or
            parsed_name.lower() in program_clean.lower()):
            return parsed_data
    
    return None

def save_results_to_json(results: List[Dict], university_name: str, output_filename: str = None) -> Dict:
    """Save results to JSON file"""
    if output_filename is None:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        # Changed from "international" to "local"
        output_filename = f"local_fees_results_{timestamp}.json"
    
    # Prepare output data  
    output_data = {
        "timestamp": datetime.datetime.now().isoformat(),
        "university": university_name,
        # Changed from "international" to "local"
        "agent_type": "local_student_fees",
        "total_programs": len(results),
        "average_confidence": sum(r.get('confidence', 0) for r in results) / len(results) if results else 0,
        "programs_with_tuition": len([r for r in results if r.get('local_tuition_fees', {}).get('_official_amount')]),
        "programs_with_deposit": len([r for r in results if r.get('local_deposit_fees', {}).get('_amount')]),
        "programs_with_application": len([r for r in results if r.get('local_application_fees', {}).get('_amount')]),
        "results": results
    }
    
    # Save to file
    with open(output_filename, 'w', encoding='utf-8') as f:
        json.dump(output_data, f, indent=2, ensure_ascii=False)
    
    print(f"\\nüíæ Results saved to {output_filename}")
    return output_data

print("‚úÖ Utility functions defined successfully!")


‚úÖ Utility functions defined successfully!


## 7- extract_local_fees_batch function

In [21]:
def extract_local_fees_batch(services: UniversityServices, strategy: LocalFeesStrategy, 
                                   program_batch: List[str]) -> List[Dict]:
    """Extract local fees for a batch of programs"""
    
    print(f"\\nüîç Searching {strategy.get_search_description()} for batch of {len(program_batch)} programs")
    print(f"Programs: {', '.join(program_batch[:3])}{'...' if len(program_batch) > 3 else ''}")
    
    # Get search query from strategy
    search_query = strategy.get_search_query(program_batch)
    print(f"üîç Search query: {search_query[:100]}...")
    
    # Single vector search for all programs
    contexts = services.vector_search(search_query)
    
    if not contexts:
        print("‚ùå No documents found in vector search")
        return strategy.create_empty_results(program_batch)
    
    # Reranking - choose between CrossEncoder or Cohere
    if Config.USE_COHERE_RERANKER and services.cohere_client:
        print("üöÄ Applying Cohere reranking for batch...")
        reranked_contexts = services.rerank_with_cohere(search_query, contexts)
    else:
        print("üöÄ Applying CrossEncoder reranking for batch...")
        if not Config.USE_COHERE_RERANKER:
            print("   Reason: USE_COHERE_RERANKER is False")
        elif not services.cohere_client:
            print("   Reason: Cohere client is not available")
        reranked_contexts = services.rerank_with_cross_encoder(search_query, contexts)
    
    if not reranked_contexts:
        print("‚ùå No documents returned from reranking")
        return strategy.create_empty_results(program_batch)
    
    # Combine contexts for batch processing
    combined_context = "\\n".join([ctx['text'] for ctx in reranked_contexts])
    print(f"üìù Combined context length: {len(combined_context):,} characters")
    
    # Single LLM call for ALL programs in the batch
    print(f"ü§ñ Processing ALL {len(program_batch)} programs with SINGLE LLM call...")
    
    # Get LLM response for ALL programs at once using strategy
    messages = strategy.get_extraction_messages(combined_context, program_batch)
    
    try:
        response = services.llm.invoke(messages)
        raw_content = response.content
        print(f"Raw response preview: {raw_content[:200]}...")  # shortened preview
        
        # ‚úÖ Log the raw response to file
        llm_logger.info("=== Raw LLM Response for Batch ===")
        llm_logger.info(f"Programs: {program_batch}")
        llm_logger.info(raw_content)
        llm_logger.info("=================================")
        
        # Parse response for all programs using improved JSON parser
        try:
            print("üîß Using improved JSON parser for LLM response...")
            data = _parse_json_safely(response.content)
            
            if data is None:
                print("‚ùå JSON parsing returned None - creating empty results")
                programs_data = []
            else:
                # Handle different response formats
                if isinstance(data, dict):
                    # Expected format: {"programs": [...]}
                    programs_data = data.get("programs", [])
                    print(f"üìä Found 'programs' key with {len(programs_data)} entries")
                elif isinstance(data, list):
                    # Direct list format: [...]
                    programs_data = data
                    print(f"üìä Found direct list with {len(programs_data)} entries")
                else:
                    # Unexpected format
                    print(f"‚ö†Ô∏è Unexpected response format: {type(data)}")
                    programs_data = []
                
                if not programs_data:
                    print("‚ö†Ô∏è No programs data found in response, creating empty results")
                    programs_data = []
                
                print(f"‚úÖ Successfully parsed data for {len(programs_data)} programs")
            
        except Exception as e:
            print(f"‚ùå Error in improved JSON parsing: {e}")
            programs_data = []
    
    except Exception as e:
        print(f"‚ùå Error calling LLM: {e}")
        programs_data = []
    
    # Process results and ensure we have data for all programs
    return process_batch_results(strategy, program_batch, programs_data, reranked_contexts)

def process_batch_results(strategy: LocalFeesStrategy, program_batch: List[str], 
                         programs_data: List[Dict], reranked_contexts: List[Dict]) -> List[Dict]:
    """Process batch results and ensure we have data for all programs"""
    batch_results = []
    
    # Create a mapping of parsed results
    parsed_results = {}
    for prog_data in programs_data:
        if isinstance(prog_data, dict):
            prog_name = prog_data.get("program_name", "").strip()
            if prog_name:
                parsed_results[prog_name] = prog_data
        else:
            print(f"‚ö†Ô∏è Skipping non-dictionary program data: {type(prog_data)} - {prog_data}")
            continue
    
    # Prepare shared source information
    shared_sources = prepare_shared_sources(reranked_contexts)
    
    # Ensure we have results for all requested programs
    for program in program_batch:
        program_clean = program.strip()
        
        # Try to find matching result
        found_result = match_program_in_results(program_clean, parsed_results)
        
        if found_result:
            result = strategy.create_result_from_data(found_result, program_clean)
        else:
            # No result found for this program
            result = strategy.create_empty_result(program_clean)
        
        # Add shared sources to each program
        result['sources'] = shared_sources.copy()
        batch_results.append(result)
        
        # Display results immediately using strategy
        strategy.display_result(result)
    
    return batch_results

print("‚úÖ Main extraction functions defined successfully!")


‚úÖ Main extraction functions defined successfully!


## ‚úÖ **Improved JSON Parser Benefits**

The new `_parse_json_safely` function provides significant improvements over basic `json_repair.loads`:

### **üõ†Ô∏è Enhanced Error Recovery:**
- **Thinking Tags**: Automatically removes `<think>...</think>` tags that some LLMs add
- **Code Fences**: Handles both ```` ```json` and ```` ``` ` variations
- **Multiple Formats**: Supports both object (`{"programs": [...]}`) and array (`[...]`) responses
- **Heuristic Extraction**: Finds JSON content even with extra text before/after

### **üîç Better Debugging:**
- **Detailed Error Messages**: Shows exactly what went wrong during parsing
- **Response Preview**: Displays first 200 characters of problematic content
- **Step-by-Step Processing**: Logs each parsing step for troubleshooting

### **üéØ Targeted for Batch Failures:**
This improvement should significantly reduce batch failures like the **batch 8 issue** you experienced, where valid retrieval data failed during JSON parsing due to LLM response formatting inconsistencies.

### **üìä Expected Impact:**
- **Fewer "confidence: 0.0" results** due to parsing failures
- **Better handling of complex program names** with special characters
- **More reliable batch processing** across different LLM providers


## 8- Initialize services and strategy

In [22]:
# Initialize services and strategy
print("üöÄ Initializing local Student Fees Extractor...")
print("=" * 80)

# Initialize services
services = UniversityServices()

# Initialize strategy
strategy = LocalFeesStrategy()

print(f"\\n‚úÖ Initialized: {strategy.get_description()}")
print(f"üìä Collection: {services.collection_name}")
print(f"ü§ñ LLM: {Config.LLM_PROVIDER} - {Config.LLM_MODEL}")
print(f"üîÑ Reranker: {'Cohere' if Config.USE_COHERE_RERANKER else 'CrossEncoder'}")


üöÄ Initializing local Student Fees Extractor...
üîÑ Initializing services...


üè´ Using collection: cyberjaya.edu.my_collection
‚úÖ CrossEncoder (cross-encoder/ms-marco-MiniLM-L-6-v2) initialized successfully.
‚úÖ Cohere client (rerank-english-v3.0) initialized successfully.
‚úÖ Groq LLM initialized successfully.
‚úÖ All services initialized successfully!
\n‚úÖ Initialized: Local Student Fees Information Extractor (BATCHED)
üìä Collection: cyberjaya.edu.my_collection
ü§ñ LLM: groq - openai/gpt-oss-20b
üîÑ Reranker: Cohere


## 9- Load programs

In [23]:
# Load programs from file
print("\\nüìö Loading programs...")
programs = load_programs_from_json()

if not programs:
    print("‚ùå No programs found! Please check the programs file.")
else:
    print(f"‚úÖ Loaded {len(programs)} programs for processing")
    print(f"Sample programs: {programs[:3]}")
    
    # For testing, let's process a small batch first
    test_batch_size = 5  # Adjust this number as needed
    test_programs = programs[:test_batch_size]
    
    print(f"\\nüß™ Testing with {len(test_programs)} programs:")
    for i, prog in enumerate(test_programs, 1):
        print(f"  {i}. {prog}")


\nüìö Loading programs...
‚úÖ Loaded 66 programs from d:\AI_Data_Extractor\programs_names_output_test\cyberjaya.edu.my\cyberjaya.edu.my_urls_cleaned.txt_programs.json
‚úÖ Loaded 66 programs for processing
Sample programs: ['Certificate in Visual Design', 'Foundation in Science', 'Bachelor in Mass Communication (Honours)']
\nüß™ Testing with 5 programs:
  1. Certificate in Visual Design
  2. Foundation in Science
  3. Bachelor in Mass Communication (Honours)
  4. Bachelor of Counselling (Honours)
  5. Master in Business Administration (MBA) Dual Award with Federation University Australia


## 9.1. Enhanced Debug Functions for Batch Processing

These functions provide detailed debugging capabilities to analyze LLM responses 


In [24]:
import logging
import datetime

# Configure logger for LLM responses
llm_logger = logging.getLogger("llm_responses")
llm_logger.setLevel(logging.INFO)

# Create file handler with timestamp in filename
log_filename = f"local_fees_llm_responses_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}.log"
file_handler = logging.FileHandler(log_filename, mode="w", encoding="utf-8")
file_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))

llm_logger.addHandler(file_handler)

print(f"üìÇ LLM responses will be logged to: {log_filename}")


üìÇ LLM responses will be logged to: local_fees_llm_responses_20251006_151806.log


## 10. Execute Extraction (test)


In [11]:
# Execute the extraction
if programs:
    print("\\n" + "=" * 80)
    print("üöÄ STARTING local STUDENT FEES EXTRACTION")
    print("=" * 80)
    
    # Extract fees for the test batch
    results = extract_local_fees_batch(services, strategy, test_programs)
    
    print("\\n" + "=" * 80)
    print("üìä EXTRACTION COMPLETED")
    print("=" * 80)
    
    # Display summary statistics
    total_programs = len(results)
    avg_confidence = sum(r.get('confidence', 0) for r in results) / total_programs if results else 0
    programs_with_tuition = len([r for r in results if r.get('international_tuition_fees', {}).get('_official_amount')])
    programs_with_deposit = len([r for r in results if r.get('international_deposit_fees', {}).get('_amount')])
    programs_with_application = len([r for r in results if r.get('international_application_fees', {}).get('_amount')])
    
    print(f"\\nüìà SUMMARY STATISTICS:")
    print(f"   Total programs processed: {total_programs}")
    print(f"   Average confidence: {avg_confidence:.2f}")
    print(f"   Programs with tuition fees: {programs_with_tuition}")
    print(f"   Programs with deposit fees: {programs_with_deposit}")
    print(f"   Programs with application fees: {programs_with_application}")
    
    # Save results to JSON file
    output_data = save_results_to_json(results, Config.UNIVERSITY_NAME)
    
    print(f"\\n‚úÖ International student fees extraction completed successfully!")
else:
    print("‚ùå Cannot proceed without programs data.")


üöÄ STARTING local STUDENT FEES EXTRACTION
\nüîç Searching local student fees for batch of 5 programs
Programs: Certificate in Visual Design, Foundation in Science, Bachelor in Mass Communication (Honours)...
üîç Search query: local student tuition fees domestic national citizen deposit application ("Certificate in Visual Des...
üìö Performing vector search...
‚úÖ Found 30 documents
üöÄ Applying Cohere reranking for batch...
üîÑ Sending 30 documents to Cohere Rerank API...
‚úÖ Reranked to top 4 documents
üìù Combined context length: 78,271 characters
ü§ñ Processing ALL 5 programs with SINGLE LLM call...


Failed to multipart ingest runs: langsmith.utils.LangSmithRateLimitError: Rate limit exceeded for https://api.smith.langchain.com/runs/multipart. HTTPError('429 Client Error: Too Many Requests for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Too many requests: tenant exceeded usage limits: Monthly unique traces usage limit exceeded"}\n')trace=c7ece15e-ea80-4f71-a46a-5229e30486aa,id=c7ece15e-ea80-4f71-a46a-5229e30486aa


Raw response preview: ```json
{
  "programs": [
    {
      "program_name": "Certificate in Visual Design",
      "local_tuition_fees": {
        "_unit": "Semesters",
        "_currency": "MYR",
        "_official_amount"...
üîß Using improved JSON parser for LLM response...
üìä Found direct list with 5 entries
‚úÖ Successfully parsed data for 5 programs
\nüè† LOCAL STUDENT FEES for Certificate in Visual Design:
   Official Tuition: 8000 MYR per Semesters
   Deposit Fees: 1500 MYR
   Application Fees: Not found
   Confidence: 1.00
\nüè† LOCAL STUDENT FEES for Foundation in Science:
   Official Tuition: 18000 MYR per Semesters
   Deposit Fees: 1500 MYR
   Application Fees: Not found
   Confidence: 1.00
\nüè† LOCAL STUDENT FEES for Bachelor in Mass Communication (Honours):
   Official Tuition: 50000 MYR per Semesters
   Discounted Tuition: 40000 MYR per Semesters
   Deposit Fees: 1500 MYR
   Application Fees: Not found
   Confidence: 1.00
\nüè† LOCAL STUDENT FEES for Bachelor of C

Failed to send compressed multipart ingest: langsmith.utils.LangSmithRateLimitError: Rate limit exceeded for https://api.smith.langchain.com/runs/multipart. HTTPError('429 Client Error: Too Many Requests for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Too many requests: tenant exceeded usage limits: Monthly unique traces usage limit exceeded"}\n')trace=c7ece15e-ea80-4f71-a46a-5229e30486aa,id=c7ece15e-ea80-4f71-a46a-5229e30486aa
Failed to send compressed multipart ingest: langsmith.utils.LangSmithRateLimitError: Rate limit exceeded for https://api.smith.langchain.com/runs/multipart. HTTPError('429 Client Error: Too Many Requests for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Too many requests: tenant exceeded usage limits: Monthly unique traces usage limit exceeded"}\n')trace=07038809-a3b4-4be3-bba5-567e7bd7128d,id=07038809-a3b4-4be3-bba5-567e7bd7128d
Failed to send compressed multipart ingest: langsmith.utils.LangSmithRateLimitError: Rate limi

## 11- Process all programs

In [25]:
# Uncomment and run this cell to process ALL programs (not just the test batch)

if programs and len(programs) > test_batch_size:
    print(f"\\nüîÑ Processing ALL {len(programs)} programs in batches...")
    
    all_results = []
    batch_size = Config.DEFAULT_BATCH_SIZE
    total_batches = (len(programs) + batch_size - 1) // batch_size
    
    for i in range(0, len(programs), batch_size):
        batch_num = (i // batch_size) + 1
        batch_programs = programs[i:i + batch_size]
        
        print(f"\\nüì¶ BATCH {batch_num}/{total_batches}")
        print(f"Programs in this batch: {len(batch_programs)}")
        print("-" * 60)
        
        batch_results = extract_local_fees_batch(services, strategy, batch_programs)
        all_results.extend(batch_results)
        
        # Add delay between batches to respect rate limits
        if batch_num < total_batches:
            print(f"‚è≥ Waiting {Config.BATCH_DELAY_SECONDS} seconds before next batch...")
            time.sleep(Config.BATCH_DELAY_SECONDS)
    
    # Save final results
    final_output_data = save_results_to_json(all_results, Config.UNIVERSITY_NAME, "local_fees_all_programs.json")
    
    print(f"\\nüéâ ALL PROGRAMS PROCESSED SUCCESSFULLY!")
    print(f"Total results: {len(all_results)}")
else:
    print("\\nüí° To process all programs, uncomment and run this cell.")

print("\\nüí° This enhanced version provides detailed debugging of LLM responses and batch failures.")


\nüîÑ Processing ALL 66 programs in batches...
\nüì¶ BATCH 1/7
Programs in this batch: 10
------------------------------------------------------------
\nüîç Searching local student fees for batch of 10 programs
Programs: Certificate in Visual Design, Foundation in Science, Bachelor in Mass Communication (Honours)...
üîç Search query: local student tuition fees domestic national citizen deposit application ("Certificate in Visual Des...
üìö Performing vector search...


‚úÖ Found 30 documents
üöÄ Applying Cohere reranking for batch...
üîÑ Sending 30 documents to Cohere Rerank API...
‚úÖ Reranked to top 4 documents
üìù Combined context length: 77,852 characters
ü§ñ Processing ALL 10 programs with SINGLE LLM call...
Raw response preview: ```json
{
  "programs": [
    {
      "program_name": "Certificate in Visual Design",
      "local_tuition_fees": {
        "_unit": "Semesters",
        "_currency": "RM",
        "_official_amount":...
üîß Using improved JSON parser for LLM response...
üìä Found direct list with 10 entries
‚úÖ Successfully parsed data for 10 programs
\nüè† LOCAL STUDENT FEES for Certificate in Visual Design:
   Official Tuition: 8000 RM per Semesters
   Deposit Fees: 1500 RM
   Application Fees: Not found
   Confidence: 0.95
\nüè† LOCAL STUDENT FEES for Foundation in Science:
   Official Tuition: 18000 RM per Semesters
   Discounted Tuition: 17700 RM per Semesters
   Deposit Fees: 1650 RM
   Application Fees: Not found
   Conf