from fastapi import FastAPI, HTTPException from contextlib import asynccontextmanager from pydantic import BaseModel from typing import List, Optional, Dict import spacy from rapidfuzz import fuzz import logging import re import tarfile import json from datetime import datetime, timedelta import asyncio import os # Import from core from core import ( SessionLocal, CachedPackage, RegistryCacheInfo, refresh_status, app_config, sync_packages, should_sync_packages, download_package, logger, FHIR_REGISTRY_BASE_URL ) # Configure logging to capture more details logging.getLogger().setLevel(logging.DEBUG) # Set root logger to DEBUG logging.getLogger("uvicorn").setLevel(logging.DEBUG) # Ensure uvicorn logs are captured logging.getLogger("uvicorn.access").setLevel(logging.DEBUG) # Capture access logs # Load SpaCy model try: nlp = spacy.load("en_core_web_md") logger.info("SpaCy model 'en_core_web_md' loaded successfully.") except Exception as e: logger.error(f"Failed to load SpaCy model: {str(e)}") raise RuntimeError("SpaCy model 'en_core_web_md' is required for search functionality. Please install it.") # FastAPI app app = FastAPI(title="IggyAPI", description="API for searching and retrieving FHIR Implementation Guides and StructureDefinitions") logger.debug("FastAPI app initialized.") # Pydantic Models for Responses class VersionEntry(BaseModel): version: str pubDate: str class IGSearchResult(BaseModel): id: str name: str description: Optional[str] url: Optional[str] Author: Optional[str] fhir_version: Optional[str] Latest_Version: Optional[str] version_count: int all_versions: List[VersionEntry] relevance: float class SearchResponse(BaseModel): packages: List[IGSearchResult] total: int last_cached_timestamp: Optional[str] fetch_failed: bool is_fetching: bool class ProfileMetadata(BaseModel): name: str description: Optional[str] version: Optional[str] url: str class StructureDefinition(BaseModel): resource: dict class RefreshStatus(BaseModel): last_refresh: Optional[str] package_count: int errors: List[str] # Global variable to track the last refresh time last_refresh_time = datetime.utcnow() async def background_cache_refresh(db): """Run a cache refresh and update in-memory cache and database upon completion.""" global last_refresh_time logger.info("Starting background cache refresh") try: sync_packages() # This updates app_config["MANUAL_PACKAGE_CACHE"] and the database last_refresh_time = datetime.utcnow() # Update the last refresh time logger.info(f"Background cache refresh completed successfully at {last_refresh_time.isoformat()}") except Exception as e: logger.error(f"Background cache refresh failed: {str(e)}") refresh_status["errors"].append(f"Background cache refresh failed: {str(e)}") finally: db.close() logger.info("Closed database session after background cache refresh") async def scheduled_cache_refresh(): """Scheduler to run cache refresh every 8 hours after the last refresh.""" global last_refresh_time while True: # Calculate time since last refresh time_since_last_refresh = datetime.utcnow() - last_refresh_time # Calculate how long to wait until the next 8-hour mark wait_seconds = max(0, (8 * 3600 - time_since_last_refresh.total_seconds())) logger.info(f"Next scheduled cache refresh in {wait_seconds / 3600:.2f} hours") await asyncio.sleep(wait_seconds) # Create a new database session for the refresh task db = SessionLocal() await background_cache_refresh(db) @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan handler for FastAPI startup and shutdown.""" logger.debug("Lifespan handler starting.") os.makedirs("instance", exist_ok=True) db = SessionLocal() try: db_path = "instance/fhir_igs.db" # Always load existing data into memory on startup, regardless of age if os.path.exists(db_path) and os.path.getsize(db_path) > 0: logger.info("Database file exists and has data. Loading into memory...") cached_packages = db.query(CachedPackage).all() normalized_packages = [] for pkg in cached_packages: pkg_data = { "package_name": pkg.package_name, "version": pkg.version, "latest_official_version": pkg.latest_official_version, "author": pkg.author, "description": pkg.description, "fhir_version": pkg.fhir_version, "url": pkg.url, "canonical": pkg.canonical, "all_versions": pkg.all_versions, "dependencies": pkg.dependencies, "version_count": pkg.version_count, "last_updated": pkg.last_updated, "latest_version": pkg.latest_version } normalized_packages.append(pkg_data) app_config["MANUAL_PACKAGE_CACHE"] = normalized_packages db_timestamp_info = db.query(RegistryCacheInfo).first() db_timestamp = db_timestamp_info.last_fetch_timestamp if db_timestamp_info else None app_config["MANUAL_CACHE_TIMESTAMP"] = db_timestamp.isoformat() if db_timestamp else datetime.utcnow().isoformat() logger.info(f"Loaded {len(normalized_packages)} packages into in-memory cache from database.") else: logger.info("Database file does not exist or is empty, initializing empty cache") app_config["MANUAL_PACKAGE_CACHE"] = [] app_config["MANUAL_CACHE_TIMESTAMP"] = datetime.utcnow().isoformat() # Check if data is older than 8 hours or missing, and trigger a background refresh if needed should_refresh = False if app_config["MANUAL_PACKAGE_CACHE"]: latest_package = db.query(CachedPackage).order_by(CachedPackage.last_updated.desc()).first() if not latest_package or not latest_package.last_updated: logger.info("No valid last_updated timestamp, triggering background refresh") should_refresh = True else: try: last_updated = datetime.fromisoformat(latest_package.last_updated.replace('Z', '+00:00')) time_diff = datetime.utcnow() - last_updated if time_diff.total_seconds() > 8 * 3600: # 8 hours logger.info(f"Data is {time_diff.total_seconds()/3600:.2f} hours old, triggering background refresh") should_refresh = True else: logger.info(f"Data is {time_diff.total_seconds()/3600:.2f} hours old, no background refresh needed") except ValueError: logger.warning("Invalid last_updated format, triggering background refresh") should_refresh = True else: logger.info("No packages in cache, triggering background refresh") should_refresh = True # Start background refresh if needed if should_refresh: # Create a new database session for the background task background_db = SessionLocal() asyncio.create_task(background_cache_refresh(background_db)) # Start the scheduler to run every 8 hours after the last refresh asyncio.create_task(scheduled_cache_refresh()) logger.info("Lifespan startup completed, yielding control to FastAPI.") yield finally: db.close() logger.info("Closed database session after lifespan shutdown") app.lifespan = lifespan @app.get("/igs/search", response_model=SearchResponse) async def search_igs(query: str = '', search_type: str = 'semantic'): """ Search for Implementation Guides (IGs) using the specified search type. Args: query (str, optional): The search term to filter IGs by name or author (e.g., 'au core'). search_type (str, optional): The type of search to perform. Options are: - 'semantic': Uses SpaCy for semantic similarity (default). - 'string': Uses SpaCy for token-based string similarity, with a fallback to rapidfuzz for exact/near-exact matches. Returns: SearchResponse: A response containing a list of matching IGs, their metadata, and cache status. Raises: HTTPException: If the search_type is invalid or an error occurs during search. """ logger.info(f"Searching IGs with query: {query}, search_type: {search_type}") db = SessionLocal() try: # Validate search_type valid_search_types = ['semantic', 'string'] if search_type not in valid_search_types: logger.error(f"Invalid search_type: {search_type}. Must be one of {valid_search_types}.") raise HTTPException(status_code=400, detail=f"Invalid search_type: {search_type}. Must be one of {valid_search_types}.") in_memory_packages = app_config["MANUAL_PACKAGE_CACHE"] in_memory_timestamp = app_config["MANUAL_CACHE_TIMESTAMP"] db_timestamp_info = db.query(RegistryCacheInfo).first() db_timestamp = db_timestamp_info.last_fetch_timestamp if db_timestamp_info else None logger.debug(f"DB Timestamp: {db_timestamp}, In-Memory Timestamp: {in_memory_timestamp}") normalized_packages = None fetch_failed_flag = False display_timestamp = None is_fetching = False fetch_in_progress = app_config["FETCH_IN_PROGRESS"] if fetch_in_progress and in_memory_packages is not None: normalized_packages = in_memory_packages display_timestamp = in_memory_timestamp fetch_failed_flag = len(refresh_status["errors"]) > 0 app_config["FETCH_IN_PROGRESS"] = False elif in_memory_packages is not None: logger.info(f"Using in-memory cached package list from {in_memory_timestamp}.") normalized_packages = in_memory_packages display_timestamp = in_memory_timestamp fetch_failed_flag = len(refresh_status["errors"]) > 0 else: cached_packages = db.query(CachedPackage).all() if cached_packages: logger.info(f"Loading {len(cached_packages)} packages from CachedPackage table.") normalized_packages = [] for pkg in cached_packages: pkg_data = { "package_name": pkg.package_name, "version": pkg.version, "latest_official_version": pkg.latest_official_version, "author": pkg.author, "description": pkg.description, "fhir_version": pkg.fhir_version, "url": pkg.url, "canonical": pkg.canonical, "all_versions": pkg.all_versions, "dependencies": pkg.dependencies, "version_count": pkg.version_count, "last_updated": pkg.last_updated, "latest_version": pkg.latest_version } normalized_packages.append(pkg_data) app_config["MANUAL_PACKAGE_CACHE"] = normalized_packages app_config["MANUAL_CACHE_TIMESTAMP"] = db_timestamp.isoformat() if db_timestamp else datetime.utcnow().isoformat() display_timestamp = app_config["MANUAL_CACHE_TIMESTAMP"] fetch_failed_flag = len(refresh_status["errors"]) > 0 logger.info(f"Loaded {len(normalized_packages)} packages into in-memory cache from database.") else: logger.info("No packages found in CachedPackage table. Fetching from registries...") is_fetching = True app_config["FETCH_IN_PROGRESS"] = True sync_packages() normalized_packages = app_config["MANUAL_PACKAGE_CACHE"] display_timestamp = app_config["MANUAL_CACHE_TIMESTAMP"] fetch_failed_flag = len(refresh_status["errors"]) > 0 if not isinstance(normalized_packages, list): logger.error(f"normalized_packages is not a list (type: {type(normalized_packages)}). Using empty list.") normalized_packages = [] fetch_failed_flag = True logger.info("Filtering packages based on query") if query: # Split the query into individual words query_words = query.lower().split() filtered_packages = [ pkg for pkg in normalized_packages if isinstance(pkg, dict) and ( all(word in pkg.get('package_name', '').lower() for word in query_words) or all(word in pkg.get('author', '').lower() for word in query_words) ) ] logger.debug(f"Filtered {len(normalized_packages)} cached packages down to {len(filtered_packages)} for terms '{query_words}'") else: filtered_packages = normalized_packages logger.debug(f"No search term provided, using all {len(filtered_packages)} cached packages.") logger.info(f"Starting search with search_type: {search_type}") results = [] query_doc = nlp(query.lower()) # Process the query with SpaCy if search_type == 'semantic': # Semantic similarity search using SpaCy's word embeddings for pkg in filtered_packages: name = pkg['package_name'] description = pkg['description'] if pkg['description'] else '' author = pkg['author'] if pkg['author'] else '' # Combine fields for a comprehensive semantic search combined_text = f"{name} {description} {author}".lower() doc = nlp(combined_text) similarity = query_doc.similarity(doc) # Compute semantic similarity if similarity > 0.3: # Lowered threshold for semantic similarity logger.info(f"Semantic match: {name}, similarity: {similarity}") results.append((name, pkg, 'combined', similarity)) else: # Fallback to rapidfuzz for exact/near-exact string matching name_score = fuzz.partial_ratio(query.lower(), name.lower()) desc_score = fuzz.partial_ratio(query.lower(), description.lower()) if description else 0 author_score = fuzz.partial_ratio(query.lower(), author.lower()) if author else 0 max_score = max(name_score, desc_score, author_score) if max_score > 70: # Threshold for rapidfuzz source = 'name' if max_score == name_score else ('description' if max_score == desc_score else 'author') logger.info(f"Rapidfuzz fallback in semantic mode: {name}, source: {source}, score: {max_score}") results.append((name, pkg, source, max_score / 100.0)) else: # String similarity search # First try SpaCy's token-based similarity for pkg in filtered_packages: name = pkg['package_name'] description = pkg['description'] if pkg['description'] else '' author = pkg['author'] if pkg['author'] else '' combined_text = f"{name} {description} {author}".lower() doc = nlp(combined_text) # Use token-based similarity for string matching token_similarity = query_doc.similarity(doc) # Still using similarity but focusing on token overlap if token_similarity > 0.7: # Higher threshold for token similarity logger.info(f"SpaCy token match: {name}, similarity: {token_similarity}") results.append((name, pkg, 'combined', token_similarity)) else: # Fallback to rapidfuzz for exact/near-exact string matching name_score = fuzz.partial_ratio(query.lower(), name.lower()) desc_score = fuzz.partial_ratio(query.lower(), description.lower()) if description else 0 author_score = fuzz.partial_ratio(query.lower(), author.lower()) if author else 0 max_score = max(name_score, desc_score, author_score) if max_score > 70: # Threshold for rapidfuzz source = 'name' if max_score == name_score else ('description' if max_score == desc_score else 'author') logger.info(f"Rapidfuzz match: {name}, source: {source}, score: {max_score}") results.append((name, pkg, source, max_score / 100.0)) logger.info(f"Search completed with {len(results)} results") logger.info("Building response packages") packages_to_display = [] seen_names = set() for matched_text, pkg, source, score in sorted(results, key=lambda x: x[3], reverse=True): if pkg['package_name'] not in seen_names: seen_names.add(pkg['package_name']) adjusted_score = score * 1.5 if source in ['name', 'combined'] else score * 0.8 logger.info(f"Matched IG: {pkg['package_name']} (source: {source}, score: {score}, adjusted: {adjusted_score})") packages_to_display.append({ "id": pkg['package_name'], "name": pkg['package_name'], "description": pkg['description'], "url": pkg['url'], "Author": pkg['author'], "fhir_version": pkg['fhir_version'], "Latest_Version": pkg['latest_version'], "version_count": pkg['version_count'], "all_versions": pkg['all_versions'] or [], "relevance": adjusted_score }) packages_to_display.sort(key=lambda x: x['relevance'], reverse=True) total = len(packages_to_display) logger.info(f"Total packages to display: {total}") logger.info("Returning search response") return SearchResponse( packages=packages_to_display, total=total, last_cached_timestamp=display_timestamp, fetch_failed=fetch_failed_flag, is_fetching=is_fetching ) finally: db.close() logger.info("Closed database session after search") #-----------------------------------------------------------------------------OLD # @app.get("/igs/{ig_id}/profiles", response_model=List[ProfileMetadata]) # async def list_profiles(ig_id: str, version: Optional[str] = None): # """List StructureDefinition profiles in the specified IG, optionally for a specific version.""" # logger.info(f"Listing profiles for IG: {ig_id}, version: {version}") # # Parse ig_id for version if it includes a '#' # ig_name = ig_id # if '#' in ig_id: # parts = ig_id.split('#', 1) # ig_name = parts[0] # if version and parts[1] != version: # logger.warning(f"Version specified in ig_id ({parts[1]}) conflicts with version parameter ({version}). Using version parameter.") # else: # version = parts[1] # logger.info(f"Parsed ig_id: name={ig_name}, version={version}") # # Validate ig_name # if not ig_name or not re.match(r'^[a-zA-Z0-9\.\-_]+$', ig_name): # logger.error(f"Invalid IG name: {ig_name}") # raise HTTPException(status_code=400, detail="Invalid IG name. Use format like 'hl7.fhir.au.core'.") # # Validate version if provided # if version and not re.match(r'^[a-zA-Z0-9\.\-_]+$', version): # logger.error(f"Invalid version: {version}") # raise HTTPException(status_code=400, detail="Invalid version format. Use format like '1.1.0-preview'.") # # Check if profiles are cached # cache_key = f"{ig_name}#{version if version else 'latest'}" # if cache_key in app_config["PROFILE_CACHE"]: # logger.info(f"Returning cached profiles for IG {ig_name} (version: {version if version else 'latest'})") # return app_config["PROFILE_CACHE"][cache_key] # # Fetch package metadata from cache # packages = app_config["MANUAL_PACKAGE_CACHE"] # if not packages: # logger.error("Package cache is empty. Please refresh the cache using /refresh-cache.") # raise HTTPException(status_code=500, detail="Package cache is empty. Please refresh the cache.") # # Find the package # package = None # for pkg in packages: # if pkg['package_name'].lower() == ig_name.lower(): # package = pkg # break # if not package: # logger.error(f"IG {ig_name} not found in cached packages.") # raise HTTPException(status_code=404, detail=f"IG '{ig_name}' not found.") # # Determine the version to fetch # if version: # target_version = None # for ver_entry in package['all_versions']: # if ver_entry['version'] == version: # target_version = ver_entry['version'] # break # if not target_version: # logger.error(f"Version {version} not found for IG {ig_name}.") # raise HTTPException(status_code=404, detail=f"Version '{version}' not found for IG '{ig_name}'.") # else: # target_version = package['latest_version'] # version = target_version # logger.info(f"No version specified, using latest version: {target_version}") # # Download the package # tgz_path, error = download_package(ig_name, version, package) # if not tgz_path: # logger.error(f"Failed to download package for IG {ig_name} (version: {version}): {error}") # if "404" in error: # raise HTTPException(status_code=404, detail=f"Package for IG '{ig_name}' (version: {version}) not found.") # raise HTTPException(status_code=500, detail=f"Failed to fetch package: {error}") # # Extract profiles from the .tgz file # profiles = [] # try: # with tarfile.open(tgz_path, mode="r:gz") as tar: # for member in tar.getmembers(): # if member.name.endswith('.json') and 'StructureDefinition' in member.name: # f = tar.extractfile(member) # if f: # resource = json.load(f) # if resource.get("resourceType") == "StructureDefinition": # profiles.append(ProfileMetadata( # name=resource.get("name", ""), # description=resource.get("description"), # version=resource.get("version"), # url=resource.get("url", "") # )) # except Exception as e: # logger.error(f"Failed to extract profiles from package for IG {ig_name} (version: {version}): {str(e)}") # raise HTTPException(status_code=500, detail=f"Failed to extract profiles: {str(e)}") # # Cache the profiles # app_config["PROFILE_CACHE"][cache_key] = profiles # logger.info(f"Cached {len(profiles)} profiles for IG {ig_name} (version: {version})") # logger.info(f"Found {len(profiles)} profiles in IG {ig_name} (version: {version})") # return profiles #----------------------------------------------------------------------------end @app.get("/igs/{ig_id}/profiles", response_model=List[ProfileMetadata]) async def list_profiles(ig_id: str, version: Optional[str] = None): """List StructureDefinition profiles in the specified IG, optionally for a specific version.""" logger.info(f"Listing profiles for IG: {ig_id}, version: {version}") # Parse ig_id for version if it includes a '#' ig_name = ig_id if '#' in ig_id: parts = ig_id.split('#', 1) ig_name = parts[0] if version and parts[1] != version: logger.warning(f"Version specified in ig_id ({parts[1]}) conflicts with version parameter ({version}). Using version parameter.") else: version = parts[1] logger.info(f"Parsed ig_id: name={ig_name}, version={version}") # Validate ig_name if not ig_name or not re.match(r'^[a-zA-Z0-9\.\-_]+$', ig_name): logger.error(f"Invalid IG name: {ig_name}") raise HTTPException(status_code=400, detail="Invalid IG name. Use format like 'hl7.fhir.au.core'.") # Validate version if provided if version and not re.match(r'^[a-zA-Z0-9\.\-_]+$', version): logger.error(f"Invalid version: {version}") raise HTTPException(status_code=400, detail="Invalid version format. Use format like '1.1.0-preview'.") # Check if profiles are cached cache_key = f"{ig_name}#{version if version else 'latest'}" if cache_key in app_config["PROFILE_CACHE"]: logger.info(f"Returning cached profiles for IG {ig_name} (version: {version if version else 'latest'})") return app_config["PROFILE_CACHE"][cache_key] # Fetch package metadata from cache packages = app_config["MANUAL_PACKAGE_CACHE"] if not packages: logger.error("Package cache is empty. Please refresh the cache using /refresh-cache.") raise HTTPException(status_code=500, detail="Package cache is empty. Please refresh the cache.") # Find the package package = None for pkg in packages: if pkg['package_name'].lower() == ig_name.lower(): package = pkg break if not package: logger.error(f"IG {ig_name} not found in cached packages.") raise HTTPException(status_code=404, detail=f"IG '{ig_name}' not found.") # Determine the version to fetch if version: target_version = None for ver_entry in package['all_versions']: if ver_entry['version'] == version: target_version = ver_entry['version'] break if not target_version: logger.error(f"Version {version} not found for IG {ig_name}.") raise HTTPException(status_code=404, detail=f"Version '{version}' not found for IG '{ig_name}'.") else: target_version = package['latest_version'] version = target_version logger.info(f"No version specified, using latest version: {target_version}") # Download the package tgz_path, error = download_package(ig_name, version, package) if not tgz_path: logger.error(f"Failed to download package for IG {ig_name} (version: {version}): {error}") if "404" in error: raise HTTPException(status_code=404, detail=f"Package for IG '{ig_name}' (version: {version}) not found.") raise HTTPException(status_code=500, detail=f"Failed to fetch package: {error}") # Extract profiles from the .tgz file profiles = [] try: with tarfile.open(tgz_path, mode="r:gz") as tar: for member in tar.getmembers(): if member.name.endswith('.json'): # Check all JSON files logger.debug(f"Processing file: {member.name}") f = tar.extractfile(member) if f: try: resource = json.load(f) # Check if the resource is a StructureDefinition if resource.get("resourceType") == "StructureDefinition": logger.debug(f"Found StructureDefinition in file: {member.name}") profiles.append(ProfileMetadata( name=resource.get("name", ""), description=resource.get("description"), version=resource.get("version"), url=resource.get("url", "") )) else: logger.debug(f"File {member.name} is not a StructureDefinition, resourceType: {resource.get('resourceType', 'unknown')}") except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON in file {member.name}: {str(e)}") except Exception as e: logger.warning(f"Error processing file {member.name}: {str(e)}") except Exception as e: logger.error(f"Failed to extract profiles from package for IG {ig_name} (version: {version}): {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to extract profiles: {str(e)}") # Cache the profiles app_config["PROFILE_CACHE"][cache_key] = profiles logger.info(f"Cached {len(profiles)} profiles for IG {ig_name} (version: {version})") logger.info(f"Found {len(profiles)} profiles in IG {ig_name} (version: {version})") return profiles #------------------------------------------------------------------OLD # @app.get("/igs/{ig_id}/profiles/{profile_id}", response_model=StructureDefinition) # async def get_profile(ig_id: str, profile_id: str, version: Optional[str] = None, include_narrative: bool = True): # """ # Retrieve a specific StructureDefinition from an Implementation Guide (IG). # This endpoint fetches a specific FHIR StructureDefinition (profile) from the given IG. # It supports optional version specification and an option to strip the narrative content. # Args: # ig_id (str): The ID of the Implementation Guide (e.g., 'hl7.fhir.au.core' or 'hl7.fhir.au.core#1.1.0-preview'). # If the version is included in the ig_id (after '#'), it takes precedence unless overridden by the version parameter. # profile_id (str): The ID or name of the profile to retrieve (e.g., 'AUCorePatient'). # version (str, optional): The version of the IG (e.g., '1.1.0-preview'). If not provided and ig_id contains a version, # the version from ig_id is used; otherwise, the latest version is used. # include_narrative (bool, optional): Whether to include the narrative (`text` element) in the StructureDefinition. # Defaults to True. Set to False to strip the narrative, removing human-readable content. # Returns: # StructureDefinition: A dictionary containing the requested StructureDefinition resource. # The response includes the `resource` field with the StructureDefinition JSON. # If `include_narrative=False`, the `text` element will be set to null. # Raises: # HTTPException: # - 400: If the IG name, version, or profile ID is invalid. # - 404: If the IG, version, or profile is not found. # - 500: If an error occurs during package retrieval or profile extraction. # Example: # - GET /igs/hl7.fhir.au.core/profiles/AUCorePatient?version=1.1.0-preview # Returns the AUCorePatient profile with narrative included. # - GET /igs/hl7.fhir.au.core/profiles/AUCorePatient?version=1.1.0-preview&include_narrative=false # Returns the AUCorePatient profile with the narrative (`text` element) stripped. # """ # logger.info(f"Retrieving profile {profile_id} for IG: {ig_id}, version: {version}, include_narrative: {include_narrative}") # # Parse ig_id for version if it includes a '#' # ig_name = ig_id # if '#' in ig_id: # parts = ig_id.split('#', 1) # ig_name = parts[0] # if version and parts[1] != version: # logger.warning(f"Version specified in ig_id ({parts[1]}) conflicts with version parameter ({version}). Using version parameter.") # else: # version = parts[1] # logger.info(f"Parsed ig_id: name={ig_name}, version={version}") # # Validate ig_name # if not ig_name or not re.match(r'^[a-zA-Z0-9\.\-_]+$', ig_name): # logger.error(f"Invalid IG name: {ig_name}") # raise HTTPException(status_code=400, detail="Invalid IG name. Use format like 'hl7.fhir.au.core'.") # # Validate version if provided # if version and not re.match(r'^[a-zA-Z0-9\.\-_]+$', version): # logger.error(f"Invalid version: {version}") # raise HTTPException(status_code=400, detail="Invalid version format. Use format like '1.1.0-preview'.") # # Validate profile_id # if not profile_id or not re.match(r'^[a-zA-Z0-9\.\-_]+$', profile_id): # logger.error(f"Invalid profile ID: {profile_id}") # raise HTTPException(status_code=400, detail="Invalid profile ID format.") # # Check if profiles are cached # cache_key = f"{ig_name}#{version if version else 'latest'}" # if cache_key in app_config["PROFILE_CACHE"]: # logger.info(f"Using cached profiles for IG {ig_name} (version: {version if version else 'latest'})") # profiles = app_config["PROFILE_CACHE"][cache_key] # for profile in profiles: # if profile.name == profile_id or profile.url.endswith(profile_id): # break # else: # logger.error(f"Profile {profile_id} not found in cached profiles for IG {ig_name} (version: {version if version else 'latest'})") # raise HTTPException(status_code=404, detail=f"Profile '{profile_id}' not found in IG '{ig_name}' (version: {version if version else 'latest'}).") # else: # profiles = await list_profiles(ig_id, version) # # Fetch package metadata # packages = app_config["MANUAL_PACKAGE_CACHE"] # if not packages: # logger.error("Package cache is empty. Please refresh the cache using /refresh-cache.") # raise HTTPException(status_code=500, detail="Package cache is empty. Please refresh the cache.") # package = None # for pkg in packages: # if pkg['package_name'].lower() == ig_name.lower(): # package = pkg # break # if not package: # logger.error(f"IG {ig_name} not found in cached packages.") # raise HTTPException(status_code=404, detail=f"IG '{ig_name}' not found.") # # Determine the version to fetch # if version: # target_version = None # for ver_entry in package['all_versions']: # if ver_entry['version'] == version: # target_version = ver_entry['version'] # break # if not target_version: # logger.error(f"Version {version} not found for IG {ig_name}.") # raise HTTPException(status_code=404, detail=f"Version '{version}' not found for IG '{ig_name}'.") # else: # target_version = package['latest_version'] # version = target_version # logger.info(f"No version specified, using latest version: {target_version}") # # Download the package # tgz_path, error = download_package(ig_name, version, package) # if not tgz_path: # logger.error(f"Failed to download package for IG {ig_name} (version: {version}): {error}") # if "404" in error: # raise HTTPException(status_code=404, detail=f"Package for IG '{ig_name}' (version: {version}) not found.") # raise HTTPException(status_code=500, detail=f"Failed to fetch package: {error}") # # Extract the specific profile from the .tgz file # profile_resource = None # try: # with tarfile.open(tgz_path, mode="r:gz") as tar: # for member in tar.getmembers(): # if member.name.endswith('.json') and 'StructureDefinition' in member.name: # f = tar.extractfile(member) # if f: # resource = json.load(f) # if resource.get("resourceType") == "StructureDefinition": # resource_name = resource.get("name", "") # resource_url = resource.get("url", "") # if resource_name == profile_id or resource_url.endswith(profile_id): # profile_resource = resource # break # if not profile_resource: # logger.error(f"Profile {profile_id} not found in package for IG {ig_name} (version: {version})") # raise HTTPException(status_code=404, detail=f"Profile '{profile_id}' not found in IG '{ig_name}' (version: {version}).") # except Exception as e: # logger.error(f"Failed to extract profile {profile_id} from package for IG {ig_name} (version: {version}): {str(e)}") # raise HTTPException(status_code=500, detail=f"Failed to extract profile: {str(e)}") # # Strip narrative if requested # if not include_narrative: # logger.info(f"Stripping narrative from profile {profile_id}") # if "text" in profile_resource: # profile_resource["text"] = None # logger.info(f"Successfully retrieved profile {profile_id} for IG {ig_name} (version: {version})") # return StructureDefinition(resource=profile_resource) #------------------------------------------------------------------------------end @app.get("/igs/{ig_id}/profiles/{profile_id}", response_model=StructureDefinition) async def get_profile(ig_id: str, profile_id: str, version: Optional[str] = None, include_narrative: bool = True): """ Retrieve a specific StructureDefinition from an Implementation Guide (IG). This endpoint fetches a specific FHIR StructureDefinition (profile) from the given IG. It supports optional version specification and an option to strip the narrative content. Args: ig_id (str): The ID of the Implementation Guide (e.g., 'hl7.fhir.au.core' or 'hl7.fhir.au.core#1.1.0-preview'). If the version is included in the ig_id (after '#'), it takes precedence unless overridden by the version parameter. profile_id (str): The ID or name of the profile to retrieve (e.g., 'AUCorePatient' or 'DAV_PR_ERP_Abrechnungszeilen'). version (str, optional): The version of the IG (e.g., '1.1.0-preview'). If not provided and ig_id contains a version, the version from ig_id is used; otherwise, the latest version is used. include_narrative (bool, optional): Whether to include the narrative (`text` element) in the StructureDefinition. Defaults to True. Set to False to strip the narrative, removing human-readable content. Returns: StructureDefinition: A dictionary containing the requested StructureDefinition resource. The response includes the `resource` field with the StructureDefinition JSON. If `include_narrative=False`, the `text` element will be set to null. Raises: HTTPException: - 400: If the IG name, version, or profile ID is invalid. - 404: If the IG, version, or profile is not found. - 500: If an error occurs during package retrieval or profile extraction. Example: - GET /igs/hl7.fhir.au.core/profiles/AUCorePatient?version=1.1.0-preview Returns the AUCorePatient profile with narrative included. - GET /igs/hl7.fhir.au.core/profiles/AUCorePatient?version=1.1.0-preview&include_narrative=false Returns the AUCorePatient profile with the narrative (`text` element) stripped. """ logger.info(f"Retrieving profile {profile_id} for IG: {ig_id}, version: {version}, include_narrative: {include_narrative}") # Parse ig_id for version if it includes a '#' ig_name = ig_id if '#' in ig_id: parts = ig_id.split('#', 1) ig_name = parts[0] if version and parts[1] != version: logger.warning(f"Version specified in ig_id ({parts[1]}) conflicts with version parameter ({version}). Using version parameter.") else: version = parts[1] logger.info(f"Parsed ig_id: name={ig_name}, version={version}") # Validate ig_name if not ig_name or not re.match(r'^[a-zA-Z0-9\.\-_]+$', ig_name): logger.error(f"Invalid IG name: {ig_name}") raise HTTPException(status_code=400, detail="Invalid IG name. Use format like 'hl7.fhir.au.core'.") # Validate version if provided if version and not re.match(r'^[a-zA-Z0-9\.\-_]+$', version): logger.error(f"Invalid version: {version}") raise HTTPException(status_code=400, detail="Invalid version format. Use format like '1.1.0-preview'.") # Validate profile_id if not profile_id or not re.match(r'^[a-zA-Z0-9\.\-_]+$', profile_id): logger.error(f"Invalid profile ID: {profile_id}") raise HTTPException(status_code=400, detail="Invalid profile ID format.") # Check if profiles are cached cache_key = f"{ig_name}#{version if version else 'latest'}" if cache_key in app_config["PROFILE_CACHE"]: logger.info(f"Cache hit for IG {ig_name} (version: {version if version else 'latest'})") profiles = app_config["PROFILE_CACHE"][cache_key] logger.debug(f"Cached profiles: {[profile.name for profile in profiles]}") profile_found = False # Normalize profile_id for matching (remove hyphens, underscores, and convert to lowercase) normalized_profile_id = profile_id.lower().replace('-', '').replace('_', '') for profile in profiles: normalized_name = profile.name.lower().replace('-', '').replace('_', '') if profile.name else '' normalized_url_end = profile.url.lower().split('/')[-1].replace('-', '').replace('_', '') if profile.url else '' if normalized_name == normalized_profile_id or normalized_url_end == normalized_profile_id: logger.info(f"Found profile {profile_id} in cached profiles: name={profile.name}, url={profile.url}") profile_found = True break if not profile_found: logger.error(f"Profile {profile_id} not found in cached profiles for IG {ig_name} (version: {version if version else 'latest'})") raise HTTPException(status_code=404, detail=f"Profile '{profile_id}' not found in IG '{ig_name}' (version: {version if version else 'latest'}).") else: logger.info(f"Cache miss for IG {ig_name} (version: {version if version else 'latest'}), calling list_profiles") profiles = await list_profiles(ig_id, version) # Fetch package metadata packages = app_config["MANUAL_PACKAGE_CACHE"] if not packages: logger.error("Package cache is empty. Please refresh the cache using /refresh-cache.") raise HTTPException(status_code=500, detail="Package cache is empty. Please refresh the cache.") package = None for pkg in packages: if pkg['package_name'].lower() == ig_name.lower(): package = pkg break if not package: logger.error(f"IG {ig_name} not found in cached packages.") raise HTTPException(status_code=404, detail=f"IG '{ig_name}' not found.") # Determine the version to fetch if version: target_version = None for ver_entry in package['all_versions']: if ver_entry['version'] == version: target_version = ver_entry['version'] break if not target_version: logger.error(f"Version {version} not found for IG {ig_name}.") raise HTTPException(status_code=404, detail=f"Version '{version}' not found for IG '{ig_name}'.") else: target_version = package['latest_version'] version = target_version logger.info(f"No version specified, using latest version: {target_version}") # Check directory state before calling download_package instance_dir = "instance" if os.path.exists(instance_dir): logger.info(f"Directory {instance_dir} exists before calling download_package in get_profile") else: logger.warning(f"Directory {instance_dir} does NOT exist before calling download_package in get_profile") # Download the package logger.info(f"Calling download_package for IG {ig_name} (version: {version}) in get_profile") tgz_path, error = download_package(ig_name, version, package) if not tgz_path: logger.error(f"Failed to download package for IG {ig_name} (version: {version}): {error}") if "404" in error: raise HTTPException(status_code=404, detail=f"Package for IG '{ig_name}' (version: {version}) not found.") raise HTTPException(status_code=500, detail=f"Failed to fetch package: {error}") # Extract the specific profile from the .tgz file profile_resource = None try: with tarfile.open(tgz_path, mode="r:gz") as tar: # Normalize profile_id for matching (remove hyphens, underscores, and convert to lowercase) normalized_profile_id = profile_id.lower().replace('-', '').replace('_', '') for member in tar.getmembers(): if member.name.endswith('.json'): # Check all JSON files logger.debug(f"Processing file: {member.name}") f = tar.extractfile(member) if f: try: resource = json.load(f) # Check if the resource is a StructureDefinition if resource.get("resourceType") == "StructureDefinition": resource_name = resource.get("name", "") resource_url = resource.get("url", "") # Normalize name and URL for matching normalized_name = resource_name.lower().replace('-', '').replace('_', '') if resource_name else '' normalized_url_end = resource_url.lower().split('/')[-1].replace('-', '').replace('_', '') if resource_url else '' logger.debug(f"Found StructureDefinition in file: {member.name}, name={resource_name}, url={resource_url}, normalized_name={normalized_name}, normalized_url_end={normalized_url_end}") # Match profile_id against name or the last segment of the URL if normalized_name == normalized_profile_id or normalized_url_end == normalized_profile_id: logger.info(f"Matched profile {profile_id} in file {member.name}: name={resource_name}, url={resource_url}") profile_resource = resource break else: logger.debug(f"File {member.name} is not a StructureDefinition, resourceType: {resource.get('resourceType', 'unknown')}") except json.JSONDecodeError as e: logger.warning(f"Failed to parse JSON in file {member.name}: {str(e)}") except Exception as e: logger.warning(f"Error processing file {member.name}: {str(e)}") if not profile_resource: logger.error(f"Profile {profile_id} not found in package for IG {ig_name} (version: {version})") raise HTTPException(status_code=404, detail=f"Profile '{profile_id}' not found in IG '{ig_name}' (version: {version}).") except Exception as e: logger.error(f"Failed to extract profile {profile_id} from package for IG {ig_name} (version: {version}): {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to extract profile: {str(e)}") # Strip narrative if requested if not include_narrative: logger.info(f"Stripping narrative from profile {profile_id}") if "text" in profile_resource: profile_resource["text"] = None logger.info(f"Successfully retrieved profile {profile_id} for IG {ig_name} (version: {version})") return StructureDefinition(resource=profile_resource) @app.get("/status", response_model=RefreshStatus) async def get_refresh_status(): """Get the status of the last cache refresh.""" logger.info("Fetching refresh status") db = SessionLocal() try: package_count = db.query(CachedPackage).count() return RefreshStatus( last_refresh=refresh_status["last_refresh"], package_count=package_count, errors=refresh_status["errors"] ) finally: db.close() logger.info("Closed database session after status check") @app.post("/refresh-cache", response_model=RefreshStatus) async def force_refresh_cache(): """Force a refresh of the IG metadata cache.""" global last_refresh_time logger.info("Forcing cache refresh") sync_packages() last_refresh_time = datetime.utcnow() # Update the last refresh time logger.info(f"Manual cache refresh completed at {last_refresh_time.isoformat()}") db = SessionLocal() try: package_count = db.query(CachedPackage).count() return RefreshStatus( last_refresh=refresh_status["last_refresh"], package_count=package_count, errors=refresh_status["errors"] ) finally: db.close() logger.info("Closed database session after cache refresh") # Log that the application is starting logger.info("IggyAPI application starting up.")