diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c10061f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +# Use lightweight Python base image +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY main.py . + +# Create instance directory for SQLite +RUN mkdir -p /app/instance + +# Set permissions for the instance directory +RUN chmod -R 777 /app/instance + +# Expose configurable port +ENV PORT=8000 +EXPOSE $PORT + +# Run the application +CMD ["python", "main.py"] \ No newline at end of file diff --git a/IggyAPI.png b/IggyAPI.png new file mode 100644 index 0000000..b760cbc Binary files /dev/null and b/IggyAPI.png differ diff --git a/README.md b/README.md index ff37e79..ab680de 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,439 @@ # IggyAPI -IggyAPI is a FastAPI-based tool for managing FHIR Implementation Guides (IGs) and StructureDefinitions, with a focus on AU Core profiles. It offers endpoints to search IGs, list profiles, and fetch StructureDefinitions with optional narrative stripping. Featuring caching, error handling, and Swagger UI, it supports healthcare interoperability. +![IggyAPI Logo](IggyAPI.png) + +IggyAPI is a FastAPI-based application designed to search, retrieve, and manage FHIR Implementation Guides (IGs) and their associated StructureDefinitions (profiles). It offers a powerful interface for querying FHIR packages, listing profiles within an IG, and fetching specific StructureDefinitions with the option to strip narrative content. Tailored to support healthcare interoperability in an Australian context, IggyAPI focuses on AU Core profiles but is capable of handling IGs from various FHIR registries worldwide. + +## Table of Contents + +- [Project Overview](#project-overview) +- [Features](#features) +- [Prerequisites](#prerequisites) +- [Installation](#installation) +- [Running IggyAPI](#running-iggyapi) +- [API Endpoints](#api-endpoints) + - [Search Implementation Guides](#search-implementation-guides) + - [List Profiles in an IG](#list-profiles-in-an-ig) + - [Get a Specific Profile](#get-a-specific-profile) + - [Get Refresh Status](#get-refresh-status) + - [Force Cache Refresh](#force-cache-refresh) +- [Response Samples](#response-samples) + - [Search Implementation Guides Response](#search-implementation-guides-response) + - [List Profiles Response](#list-profiles-response) + - [Get Profile Response](#get-profile-response) +- [Testing IggyAPI](#testing-iggyapi) +- [Implementation Details](#implementation-details) +- [Contributing](#contributing) +- [License](#license) + +## Project Overview + +IggyAPI is a backend service that enables developers and healthcare systems to interact with FHIR Implementation Guides programmatically. It retrieves metadata from FHIR registries, caches the data, and provides endpoints to search for IGs, list profiles within an IG, and fetch specific StructureDefinitions. A key feature of IggyAPI is its support for narrative stripping in StructureDefinitions, which helps reduce payload size when human-readable content is not required. + +While IggyAPI is particularly focused on supporting AU Core profiles (e.g., `hl7.fhir.au.core`), it is designed to work with any FHIR IG published in supported registries, such as `packages.fhir.org` and `packages.simplifier.net`. + +## Features + +- **Search IGs**: Query Implementation Guides using a search term with fuzzy matching. +- **List Profiles**: Retrieve a list of StructureDefinitions (profiles) within a specified IG, with optional version filtering. +- **Fetch Profiles**: Obtain a specific StructureDefinition by IG and profile ID, with an option to exclude narrative content. +- **Cache Management**: Automatically syncs IG metadata from registries every 4 hours or on demand, with status reporting. +- **Robust Error Handling**: Provides detailed error messages for invalid requests or unavailable resources. +- **Swagger UI**: Offers an interactive API documentation interface at `/docs`. + +## Prerequisites + +Before setting up IggyAPI, ensure you have the following installed: + +- **Python 3.8+**: IggyAPI is built using Python. +- **pip**: Python package manager for installing dependencies. +- **SQLite**: Used for caching (included with Python). +- **Git**: For cloning the repository (optional). + +An internet connection is required to fetch IG metadata from FHIR registries during the initial sync. + +## Installation + +1. **Clone the Repository** (if applicable): + ```bash + git clone + cd iggyapi + ``` + +2. **Create a Virtual Environment** (recommended): + ```bash + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. **Install Dependencies**: + IggyAPI requires several Python packages listed in `requirements.txt`. Install them using: + ```bash + pip install -r requirements.txt + ``` + Example `requirements.txt`: + ``` + fastapi==0.115.0 + uvicorn==0.30.6 + requests==2.32.3 + feedparser==6.0.11 + sqlalchemy==2.0.35 + tenacity==9.0.0 + rapidfuzz==3.10.0 + ``` + +4. **Verify Setup**: + Ensure all dependencies are installed by running: + ```bash + pip list + ``` + Confirm that `fastapi`, `uvicorn`, `requests`, `sqlalchemy`, and other required packages are listed. + +## Running IggyAPI + +1. **Start the Server**: + Launch IggyAPI using `uvicorn`: + ```bash + uvicorn main:app --host 0.0.0.0 --port 8000 + ``` + - `--host 0.0.0.0`: Allows external access to the API. + - `--port 8000`: The port to listen on (default is 8000; adjust as needed). + +2. **Access IggyAPI**: + - Open your browser and navigate to `http://localhost:8000/docs` to access the Swagger UI. + - The Swagger UI provides an interactive interface to test IggyAPI’s endpoints. + +3. **Initial Sync**: + On first run, IggyAPI will sync IG metadata from FHIR registries. This process may take a few minutes depending on your internet connection. Subsequent syncs occur every 4 hours or can be triggered manually via the `/refresh-cache` endpoint. + +## API Endpoints + +IggyAPI provides the following endpoints, all documented in the Swagger UI at `/docs`. + +### Search Implementation Guides + +- **Endpoint**: `GET /igs/search` +- **Query Parameter**: + - `query` (string, optional): A search term to filter IGs by name or author (e.g., `au core`). +- **Description**: Searches for FHIR Implementation Guides using fuzzy matching on package name and author. Returns a list of matching IGs with metadata such as version, FHIR version, and relevance score. +- **Response**: A JSON object containing a list of IGs, total count, and cache status. + +### List Profiles in an IG + +- **Endpoint**: `GET /igs/{ig_id}/profiles` +- **Path Parameter**: + - `ig_id` (string, required): The ID of the IG (e.g., `hl7.fhir.au.core` or `hl7.fhir.au.core#1.1.0-preview`). +- **Query Parameter**: + - `version` (string, optional): The version of the IG (e.g., `1.1.0-preview`). Overrides the version in `ig_id` if provided. +- **Description**: Lists all StructureDefinitions (profiles) within the specified IG. Downloads the IG package if not already cached and extracts profile metadata. +- **Response**: A list of profiles with their name, description, version, and URL. + +### Get a Specific Profile + +- **Endpoint**: `GET /igs/{ig_id}/profiles/{profile_id}` +- **Path Parameters**: + - `ig_id` (string, required): The ID of the IG (e.g., `hl7.fhir.au.core`). + - `profile_id` (string, required): The ID or name of the profile (e.g., `AUCorePatient`). +- **Query Parameters**: + - `version` (string, optional): The version of the IG (e.g., `1.1.0-preview`). + - `include_narrative` (boolean, optional, default: `true`): Whether to include the narrative (`text` element) in the StructureDefinition. Set to `false` to strip the narrative. +- **Description**: Retrieves a specific StructureDefinition from the IG. Supports narrative stripping to reduce payload size. +- **Response**: A JSON object containing the StructureDefinition resource. + +### Get Refresh Status + +- **Endpoint**: `GET /status` +- **Description**: Returns the status of the last cache refresh, including the timestamp, package count, and any errors encountered. +- **Response**: A JSON object with refresh status details. + +### Force Cache Refresh + +- **Endpoint**: `POST /refresh-cache` +- **Description**: Forces an immediate refresh of the IG metadata cache by re-syncing with FHIR registries. +- **Response**: A JSON object with the updated refresh status. + +## Response Samples + +Below are sample responses for IggyAPI’s main endpoints, illustrating the output format. + +### Search Implementation Guides Response + +**Request**: +``` +GET /igs/search?query=core +``` + +**Response** (`response_Search.json`): +```json +{ + "packages": [ + { + "id": "hl7.fhir.core", + "name": "hl7.fhir.core", + "description": "", + "url": "https://packages.simplifier.net/hl7.fhir.core/4.3.0", + "Author": "Martijn Harthoorn", + "fhir_version": "4", + "Latest_Version": "4.3.0", + "version_count": 9, + "all_versions": [ + { + "version": "4.3.0", + "pubDate": "Wed, 29 Mar 2023 11:22:42 GMT" + }, + { + "version": "3.2.0", + "pubDate": "Thu, 23 Apr 2020 11:48:07 GMT" + }, + { + "version": "4.0.1", + "pubDate": "Thu, 23 Apr 2020 11:37:44 GMT" + }, + { + "version": "3.5.0", + "pubDate": "Thu, 23 Apr 2020 11:37:25 GMT" + }, + { + "version": "3.0.2", + "pubDate": "Thu, 23 Apr 2020 11:33:53 GMT" + }, + { + "version": "1.8.0", + "pubDate": "Thu, 23 Apr 2020 11:33:40 GMT" + }, + { + "version": "1.4.0", + "pubDate": "Thu, 23 Apr 2020 11:32:11 GMT" + }, + { + "version": "1.0.2", + "pubDate": "Thu, 23 Apr 2020 11:30:34 GMT" + }, + { + "version": "4.1.0", + "pubDate": "Mon, 03 Mar 2025 18:00:06 GMT" + } + ], + "relevance": 1.5 + }, + { + "id": "hl7.fhir.r2.core", + "name": "hl7.fhir.r2.core", + "description": "", + "url": "https://packages.simplifier.net/hl7.fhir.r2.core/1.0.2", + "Author": "Martijn Harthoorn", + "fhir_version": "1", + "Latest_Version": "1.0.2", + "version_count": 1, + "all_versions": [ + { + "version": "1.0.2", + "pubDate": "Mon, 18 Nov 2019 16:06:05 GMT" + } + ], + "relevance": 1.5 + } + // ... (additional packages truncated for brevity) + ], + "total": 76, + "last_cached_timestamp": "2025-05-12T09:29:55.653596", + "fetch_failed": false, + "is_fetching": false +} +``` + +This response lists IGs matching the query `core`, including their metadata and relevance scores. The `total` field indicates the number of matching IGs, and cache status fields provide information about the last sync. + +### List Profiles Response + +**Request**: +``` +GET /igs/hl7.fhir.au.core/profiles?version=1.1.0-preview +``` + +**Response** (`response_Get Profiles.json`): +```json +[ + { + "name": "AUCoreAllergyIntolerance", + "description": "This profile sets minimum expectations for an AllergyIntolerance resource to record, search, and fetch allergies/adverse reactions associated with a patient. It is based on the [AU Base Allergy Intolerance](http://build.fhir.org/ig/hl7au/au-fhir-base/StructureDefinition-au-allergyintolerance.html) profile and identifies the *additional* mandatory core elements, extensions, vocabularies and value sets that **SHALL** be present in the AllergyIntolerance resource when conforming to this profile. It provides the floor for standards development for specific uses cases in an Australian context.", + "version": "1.1.0-preview", + "url": "http://hl7.org.au/fhir/core/StructureDefinition/au-core-allergyintolerance" + }, + { + "name": "AUCoreBloodPressure", + "description": "This profile sets minimum expectations for an Observation resource to record, search, and fetch blood pressure observations with standard coding and units of measure. It is based on the [FHIR Blood Pressure Profile](http://hl7.org/fhir/R4/bp.html) and identifies the *additional* mandatory core elements, extensions, vocabularies and value sets that **SHALL** be present in the Observation resource when conforming to this profile. It provides the floor for standards development for specific uses cases in an Australian context.", + "version": "1.1.0-preview", + "url": "http://hl7.org.au/fhir/core/StructureDefinition/au-core-bloodpressure" + }, + { + "name": "AUCoreBodyHeight", + "description": "This profile sets minimum expectations for an Observation resource to record, search, and fetch body height observations with standard coding and units of measure. It is based on the [FHIR Body Height Profile](http://hl7.org/fhir/R4/bodyheight.html) and identifies the *additional* mandatory core elements, extensions, vocabularies and value sets that **SHALL** be present in the Observation resource when conforming to this profile. It provides the floor for standards development for specific uses cases in an Australian context.", + "version": "1.1.0-preview", + "url": "http://hl7.org.au/fhir/core/StructureDefinition/au-core-bodyheight" + } + // ... (additional profiles truncated for brevity) +] +``` + +This response lists profiles within the `hl7.fhir.au.core` IG (version `1.1.0-preview`), including their names, descriptions, versions, and URLs. + +### Get Profile Response + +**Request**: +``` +GET /igs/hl7.fhir.au.core/profiles/AUCoreAllergyIntolerance?version=1.1.0-preview&include_narrative=true +``` + +**Response** (`response_get structure def.json`): +```json +{ + "resource": { + "resourceType": "StructureDefinition", + "id": "au-core-allergyintolerance", + "text": { + "status": "extensions", + "div": "

Generated Narrative: StructureDefinition au-core-allergyintolerance

\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n
NameFlagsCard.TypeDescription & Constraints\"doco\"
\".\"\".\" AllergyIntolerance 0..*AUBaseAllergyIntoleranceAn allergy or intolerance statement in an Australian healthcare context
\".\"\".\"\".\" clinicalStatus SO0..1CodeableConceptactive | inactive | resolved
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\" verificationStatus SO0..1CodeableConceptunconfirmed | confirmed | refuted | entered-in-error
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\" code SO1..1CodeableConceptCode that identifies the allergy or intolerance
Binding: Indicator of Hypersensitivity or Intolerance to Substance \".\"/ (extensible)
Additional BindingsPurpose
Adverse Reaction Substances and Negated Findings \".\"/candidate
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\" patient SO1..1Reference(AU Core Patient)Who the sensitivity is for
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\" onset[x] SO0..1When allergy or intolerance was identified
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\"\".\" onsetDateTimedateTime
\".\"\".\"\".\"\".\" onsetAgeAge
\".\"\".\"\".\"\".\" onsetPeriodPeriod
\".\"\".\"\".\"\".\" onsetRangeRange
\".\"\".\"\".\" reaction SO0..*BackboneElementAdverse Reaction Events linked to exposure to substance
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\"\".\" manifestation SO1..*CodeableConceptClinical symptoms/signs associated with the Event
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester
\".\"\".\"\".\"\".\" severity SO0..1codemild | moderate | severe (of event as a whole)
ObligationsActor
SHALL:populate-if-knownAU Core Responder
SHALL:no-errorAU Core Requester

\"doco\" Documentation for this format
" + }, + "extension": [ + { + "url": "http://hl7.org/fhir/StructureDefinition/structuredefinition-fmm", + "valueInteger": 2 + }, + { + "url": "http://hl7.org/fhir/StructureDefinition/structuredefinition-standards-status", + "valueCode": "trial-use", + "_valueCode": { + "extension": [ + { + "url": "http://hl7.org/fhir/StructureDefinition/structuredefinition-conformance-derivedFrom", + "valueCanonical": "http://hl7.org.au/fhir/core/ImplementationGuide/hl7.fhir.au.core" + } + ] + } + } + ], + "url": "http://hl7.org.au/fhir/core/StructureDefinition/au-core-allergyintolerance", + "version": "1.1.0-preview", + "name": "AUCoreAllergyIntolerance", + "title": "AU Core AllergyIntolerance", + "status": "active", + "date": "2025-03-06T07:19:11+00:00", + "publisher": "HL7 Australia", + "contact": [ + { + "name": "HL7 Australia FHIR Work Group", + "telecom": [ + { + "system": "url", + "value": "https://confluence.hl7.org/display/HAFWG", + "use": "work" + } + ] + } + ], + "description": "This profile sets minimum expectations for an AllergyIntolerance resource to record, search, and fetch allergies/adverse reactions associated with a patient. It is based on the [AU Base Allergy Intolerance](http://build.fhir.org/ig/hl7au/au-fhir-base/StructureDefinition-au-allergyintolerance.html) profile and identifies the *additional* mandatory core elements, extensions, vocabularies and value sets that **SHALL** be present in the AllergyIntolerance resource when conforming to this profile. It provides the floor for standards development for specific uses cases in an Australian context.", + "jurisdiction": [ + { + "coding": [ + { + "system": "urn:iso:std:iso:3166", + "code": "AU" + } + ] + } + ], + "copyright": "Used by permission of HL7 International, all rights reserved Creative Commons License. HL7 Australia© 2022+; Licensed Under Creative Commons No Rights Reserved.", + "fhirVersion": "4.0.1", + // ... (additional fields truncated for brevity) + "snapshot": { + // Snapshot details omitted for brevity + }, + "differential": { + // Differential details omitted for brevity + } + } +} +``` + +This response provides the `AUCoreAllergyIntolerance` StructureDefinition with the narrative included. To strip the narrative, set `include_narrative=false`, and the `text` element will be `null`. + +## Testing IggyAPI + +You can test IggyAPI using `curl`, Postman, or the Swagger UI. + +### Using `curl` + +1. **Search for IGs**: + ```bash + curl -X GET "http://localhost:8000/igs/search?query=au%20core" -H "accept: application/json" + ``` + Look for `hl7.fhir.au.core` in the response. + +2. **List Profiles**: + ```bash + curl -X GET "http://localhost:8000/igs/hl7.fhir.au.core/profiles?version=1.1.0-preview" -H "accept: application/json" + ``` + Verify that profiles like `AUCoreAllergyIntolerance` are listed. + +3. **Get a Profile with Narrative**: + ```bash + curl -X GET "http://localhost:8000/igs/hl7.fhir.au.core/profiles/AUCoreAllergyIntolerance?version=1.1.0-preview&include_narrative=true" -H "accept: application/json" + ``` + Check that the `text` element contains narrative content. + +4. **Get a Profile without Narrative**: + ```bash + curl -X GET "http://localhost:8000/igs/hl7.fhir.au.core/profiles/AUCoreAllergyIntolerance?version=1.1.0-preview&include_narrative=false" -H "accept: application/json" + ``` + Confirm that the `text` element is `null`. + +5. **Refresh Cache**: + ```bash + curl -X POST "http://localhost:8000/refresh-cache" -H "accept: application/json" + ``` + Verify the response includes the updated `last_refresh` timestamp. + +### Using Swagger UI + +- Navigate to `http://localhost:8000/docs`. +- Use the interactive interface to execute the above requests and inspect responses. + +### Using Postman + +- Import the OpenAPI specification (`http://localhost:8000/openapi.json`) into Postman. +- Create requests for each endpoint and test with the parameters shown above. + +## Implementation Details + +- **Framework**: Built with FastAPI for high performance and automatic OpenAPI documentation. +- **Database**: Uses SQLite to cache IG metadata, with tables `cached_packages` and `registry_cache_info`. +- **Caching**: + - IG metadata is cached in memory and persisted to SQLite. + - The cache is refreshed every 4 hours or on demand via `/refresh-cache`. + - Profiles are cached in memory to reduce redundant downloads. +- **Narrative Stripping**: The `include_narrative` parameter in the `get_profile` endpoint removes the `text` element from StructureDefinitions when set to `false`. +- **Dependencies**: + - `requests` and `feedparser` for fetching IG metadata from registries. + - `sqlalchemy` for database operations. + - `rapidfuzz` for fuzzy search functionality. + - `tenacity` for retrying failed registry requests. +- **Error Handling**: + - Returns HTTP 400 for invalid input (e.g., malformed IG names). + - Returns HTTP 404 for missing IGs or profiles. + - Returns HTTP 500 for server errors, with detailed logs. + +## Contributing + +Contributions to IggyAPI are welcome! To contribute: + +1. Fork the repository. +2. Create a feature branch (`git checkout -b feature/my-feature`). +3. Commit your changes (`git commit -m "Add my feature"`). +4. Push to the branch (`git push origin feature/my-feature`). +5. Open a pull request with a detailed description of your changes. + +Please ensure your code follows PEP 8 style guidelines and includes appropriate tests. + +## License + +IggyAPI is licensed under the MIT License. See the `LICENSE` file for details. \ No newline at end of file diff --git a/instance/fhir_igs.db b/instance/fhir_igs.db new file mode 100644 index 0000000..ca2e1dc Binary files /dev/null and b/instance/fhir_igs.db differ diff --git a/instance/fhir_igs.db-shm b/instance/fhir_igs.db-shm new file mode 100644 index 0000000..f471029 Binary files /dev/null and b/instance/fhir_igs.db-shm differ diff --git a/instance/fhir_igs.db-wal b/instance/fhir_igs.db-wal new file mode 100644 index 0000000..37fb437 Binary files /dev/null and b/instance/fhir_igs.db-wal differ diff --git a/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.tgz b/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.tgz new file mode 100644 index 0000000..92c1e35 Binary files /dev/null and b/instance/fhir_packages/hl7.fhir.au.core-1.1.0-preview.tgz differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..dc3f5ac --- /dev/null +++ b/main.py @@ -0,0 +1,1081 @@ +from fastapi import FastAPI, HTTPException +from contextlib import asynccontextmanager +from pydantic import BaseModel +from typing import List, Optional, Dict +import feedparser +import requests +import json +import os +import logging +from sqlalchemy import create_engine, Column, String, Integer, JSON, DateTime, text +from sqlalchemy.orm import declarative_base, sessionmaker +from datetime import datetime, timedelta, timezone +from tenacity import retry, stop_after_attempt, wait_fixed +import re +import tarfile +import io +from rapidfuzz import process, fuzz +from collections import defaultdict + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# FastAPI app with lifespan handler +app = FastAPI(title="FHIR IG API", description="API for searching and retrieving FHIR Implementation Guides and StructureDefinitions") + +# SQLite Database Setup with increased timeout +Base = declarative_base() +DATABASE_URL = "sqlite:///instance/fhir_igs.db?timeout=60" # Increase timeout to 60 seconds +os.makedirs("instance", exist_ok=True) +engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +with engine.connect() as connection: + connection.execute(text("PRAGMA journal_mode=WAL;")) + connection.execute(text("PRAGMA busy_timeout=60000;")) + logger.info("Enabled WAL mode and set busy timeout for SQLite") + +# Database Models +class CachedPackage(Base): + __tablename__ = "cached_packages" + package_name = Column(String, primary_key=True) + version = Column(String) + latest_official_version = Column(String) + author = Column(String) + description = Column(String) + fhir_version = Column(String) + url = Column(String) + canonical = Column(String) + all_versions = Column(JSON) + dependencies = Column(JSON) + version_count = Column(Integer) + last_updated = Column(String) + latest_version = Column(String) + +class RegistryCacheInfo(Base): + __tablename__ = "registry_cache_info" + id = Column(Integer, primary_key=True) + last_fetch_timestamp = Column(DateTime(timezone=True), nullable=True) + +Base.metadata.create_all(bind=engine) + +# Pydantic Models for Responses +class VersionEntry(BaseModel): + version: str + pubDate: str + +class IGSearchResult(BaseModel): + id: str + name: str + description: Optional[str] + url: Optional[str] + Author: Optional[str] + fhir_version: Optional[str] + Latest_Version: Optional[str] + version_count: int + all_versions: List[VersionEntry] + relevance: float + +class SearchResponse(BaseModel): + packages: List[IGSearchResult] + total: int + last_cached_timestamp: Optional[str] + fetch_failed: bool + is_fetching: bool + +class ProfileMetadata(BaseModel): + name: str + description: Optional[str] + version: Optional[str] + url: str + +class StructureDefinition(BaseModel): + resource: dict + +class RefreshStatus(BaseModel): + last_refresh: Optional[str] + package_count: int + errors: List[str] + +# Global variables +refresh_status = { + "last_refresh": None, + "errors": [] +} + +app_config = { + "MANUAL_PACKAGE_CACHE": None, + "MANUAL_CACHE_TIMESTAMP": None, + "FETCH_IN_PROGRESS": False, + "PROFILE_CACHE": {} # Cache for profiles: {ig_name#version: [ProfileMetadata]} +} + +# Constants from FHIRFLARE +FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org" + +def safe_parse_version(v_str): + """Parse version strings, handling FHIR-specific suffixes.""" + if not v_str or not isinstance(v_str, str): + return "0.0.0a0" + v_str_norm = v_str.lower() + base_part = v_str_norm.split('-', 1)[0] if '-' in v_str_norm else v_str_norm + suffix = v_str_norm.split('-', 1)[1] if '-' in v_str_norm else None + if re.match(r'^\d+(\.\d+)*$', base_part): + if suffix in ['dev', 'snapshot', 'ci-build', 'snapshot1', 'snapshot3', 'draft-final']: + return f"{base_part}a0" + elif suffix in ['draft', 'ballot', 'preview', 'ballot2']: + return f"{base_part}b0" + elif suffix and suffix.startswith('rc'): + return f"{base_part}rc{''.join(filter(str.isdigit, suffix)) or '0'}" + return base_part + return "0.0.0a0" + +def compare_versions(v1, v2): + """Compare two version strings, handling FHIR-specific formats.""" + v1_parts = v1.split('.') + v2_parts = v2.split('.') + for i in range(max(len(v1_parts), len(v2_parts))): + p1 = v1_parts[i] if i < len(v1_parts) else '0' + p2 = v2_parts[i] if i < len(v2_parts) else '0' + p1_num, p1_suffix = re.match(r'(\d+)([a-zA-Z0-9]*)$', p1).groups() if re.match(r'^\d+[a-zA-Z0-9]*$', p1) else (p1, '') + p2_num, p2_suffix = re.match(r'(\d+)([a-zA-Z0-9]*)$', p2).groups() if re.match(r'^\d+[a-zA-Z0-9]*$', p2) else (p2, '') + if int(p1_num) != int(p2_num): + return int(p1_num) > int(p2_num) + if p1_suffix != p2_suffix: + if not p1_suffix: + return True + if not p2_suffix: + return False + return p1_suffix > p2_suffix + return False + +def get_additional_registries(): + """Fetch additional FHIR IG registries from the master feed.""" + feed_registry_url = 'https://raw.githubusercontent.com/FHIR/ig-registry/master/package-feeds.json' + feeds = [] + try: + response = requests.get(feed_registry_url, timeout=15) + response.raise_for_status() + data = json.loads(response.text) + feeds = [{'name': feed['name'], 'url': feed['url']} for feed in data.get('feeds', []) if 'name' in feed and 'url' in feed and feed['url'].startswith(('http://', 'https://'))] + feeds = [feed for feed in feeds if feed['url'] != 'https://fhir.kl.dk/package-feed.xml'] + logger.info(f"Fetched {len(feeds)} registries from {feed_registry_url}") + except Exception as e: + logger.error(f"Failed to fetch registries: {str(e)}") + refresh_status["errors"].append(f"Failed to fetch registries from {feed_registry_url}: {str(e)}") + return feeds + +@retry(stop=stop_after_attempt(3), wait=wait_fixed(5)) +def fetch_feed(feed): + """Fetch and parse a single feed, handling both JSON and RSS/Atom.""" + logger.info(f"Fetching feed: {feed['name']} from {feed['url']}") + entries = [] + try: + response = requests.get(feed['url'], timeout=30) + response.raise_for_status() + content_type = response.headers.get('content-type', '').lower() + logger.debug(f"Response content-type: {content_type}, content: {response.text[:200]}") + + if 'application/json' in content_type or feed['url'].endswith('.json'): + try: + data = response.json() + packages = data.get('packages', data.get('entries', [])) + for pkg in packages: + if not isinstance(pkg, dict): + continue + versions = pkg.get('versions', []) + if versions: + entries.append(pkg) + else: + pkg['versions'] = [{"version": pkg.get('version', ''), "pubDate": pkg.get('pubDate', 'NA')}] + entries.append(pkg) + logger.info(f"Fetched {len(entries)} packages from JSON feed {feed['name']}") + except json.JSONDecodeError as e: + logger.error(f"JSON parse error for {feed['name']}: {str(e)}") + refresh_status["errors"].append(f"JSON parse error for {feed['name']} at {feed['url']}: {str(e)}") + raise + elif 'xml' in content_type or 'rss' in content_type or 'atom' in content_type or feed['url'].endswith(('.rss', '.atom', '.xml')) or 'text/plain' in content_type: + try: + feed_data = feedparser.parse(response.text) + if not feed_data.entries: + logger.warning(f"No entries found in feed {feed['name']}") + for entry in feed_data.entries: + title = entry.get('title', '') + pkg_name = '' + version = '' + if '#' in title: + pkg_name, version = title.split('#', 1) + else: + pkg_name = title + version = entry.get('version', '') + if not pkg_name: + pkg_name = entry.get('id', '') or entry.get('summary', '') + if not pkg_name: + continue + package = { + 'name': pkg_name, + 'version': version, + 'author': entry.get('author', 'NA'), + 'fhirVersion': entry.get('fhir_version', ['NA'])[0] or 'NA', + 'url': entry.get('link', 'unknown'), + 'canonical': entry.get('canonical', ''), + 'dependencies': entry.get('dependencies', []), + 'pubDate': entry.get('published', entry.get('pubdate', 'NA')), + 'registry': feed['url'], + 'versions': [{"version": version, "pubDate": entry.get('published', entry.get('pubdate', 'NA'))}] + } + entries.append(package) + logger.info(f"Fetched {len(entries)} entries from RSS/Atom feed {feed['name']}") + except Exception as e: + logger.error(f"RSS/Atom parse error for {feed['name']}: {str(e)}") + refresh_status["errors"].append(f"RSS/Atom parse error for {feed['name']} at {feed['url']}: {str(e)}") + raise + else: + logger.error(f"Unknown content type for {feed['name']}: {content_type}") + refresh_status["errors"].append(f"Unknown content type for {feed['name']} at {feed['url']}: {content_type}") + raise ValueError(f"Unknown content type: {content_type}") + + return entries + except requests.RequestException as e: + logger.error(f"Request error for {feed['name']}: {str(e)}") + refresh_status["errors"].append(f"Request error for {feed['name']} at {feed['url']}: {str(e)}") + raise + except Exception as e: + logger.error(f"Unexpected error for {feed['name']}: {str(e)}") + refresh_status["errors"].append(f"Unexpected error for {feed['name']} at {feed['url']}: {str(e)}") + raise + +def normalize_package_data(entries, registry_url): + """Normalize package data, grouping by name and aggregating versions.""" + logger.info("Starting normalization of package data") + packages_grouped = defaultdict(list) + skipped_raw_count = 0 + for entry in entries: + if not isinstance(entry, dict): + skipped_raw_count += 1 + logger.warning(f"Skipping raw package entry, not a dict: {entry}") + continue + raw_name = entry.get('name') or entry.get('title') or '' + if not isinstance(raw_name, str): + raw_name = str(raw_name) + name_part = raw_name.split('#', 1)[0].strip().lower() + if name_part: + packages_grouped[name_part].append(entry) + else: + if not entry.get('id'): + skipped_raw_count += 1 + logger.warning(f"Skipping raw package entry, no name or id: {entry}") + logger.info(f"Initial grouping: {len(packages_grouped)} unique package names found. Skipped {skipped_raw_count} raw entries.") + + normalized_list = [] + skipped_norm_count = 0 + total_entries_considered = 0 + + for name_key, entries in packages_grouped.items(): + total_entries_considered += len(entries) + latest_absolute_data = None + latest_official_data = None + latest_absolute_ver = "0.0.0a0" + latest_official_ver = "0.0.0a0" + all_versions = [] + package_name_display = name_key + + processed_versions = set() + for package_entry in entries: + versions_list = package_entry.get('versions', []) + for version_info in versions_list: + if isinstance(version_info, dict) and 'version' in version_info: + version_str = version_info.get('version', '') + if version_str and version_str not in processed_versions: + all_versions.append({ + "version": version_str, + "pubDate": version_info.get('pubDate', 'NA') + }) + processed_versions.add(version_str) + + processed_entries = [] + for package_entry in entries: + version_str = None + raw_name_entry = package_entry.get('name') or package_entry.get('title') or '' + if not isinstance(raw_name_entry, str): + raw_name_entry = str(raw_name_entry) + version_keys = ['version', 'latestVersion'] + for key in version_keys: + val = package_entry.get(key) + if isinstance(val, str) and val: + version_str = val.strip() + break + elif isinstance(val, list) and val and isinstance(val[0], str) and val[0]: + version_str = val[0].strip() + break + if not version_str and '#' in raw_name_entry: + parts = raw_name_entry.split('#', 1) + if len(parts) == 2 and parts[1]: + version_str = parts[1].strip() + + if not version_str: + logger.warning(f"Skipping entry for {raw_name_entry}: no valid version found. Entry: {package_entry}") + skipped_norm_count += 1 + continue + + version_str = version_str.strip() + current_display_name = str(raw_name_entry).split('#')[0].strip() + if current_display_name and current_display_name != name_key: + package_name_display = current_display_name + + entry_with_version = package_entry.copy() + entry_with_version['version'] = version_str + processed_entries.append(entry_with_version) + + try: + current_ver = safe_parse_version(version_str) + if latest_absolute_data is None or compare_versions(current_ver, latest_absolute_ver): + latest_absolute_ver = current_ver + latest_absolute_data = entry_with_version + + if re.match(r'^\d+\.\d+\.\d+(?:-[a-zA-Z0-9\.]+)?$', version_str): + if latest_official_data is None or compare_versions(current_ver, latest_official_ver): + latest_official_ver = current_ver + latest_official_data = entry_with_version + except Exception as comp_err: + logger.error(f"Error comparing version '{version_str}' for package '{package_name_display}': {comp_err}", exc_info=True) + + if latest_absolute_data: + final_absolute_version = latest_absolute_data.get('version', 'unknown') + final_official_version = latest_official_data.get('version') if latest_official_data else None + + author_raw = latest_absolute_data.get('author') or latest_absolute_data.get('publisher') or 'NA' + if isinstance(author_raw, dict): + author = author_raw.get('name', str(author_raw)) + elif not isinstance(author_raw, str): + author = str(author_raw) + else: + author = author_raw + + fhir_version_str = 'NA' + fhir_keys = ['fhirVersion', 'fhirVersions', 'fhir_version'] + for key in fhir_keys: + val = latest_absolute_data.get(key) + if isinstance(val, list) and val and isinstance(val[0], str): + fhir_version_str = val[0] + break + elif isinstance(val, str) and val: + fhir_version_str = val + break + + url_raw = latest_absolute_data.get('url') or latest_absolute_data.get('link') or 'unknown' + url = str(url_raw) if not isinstance(url_raw, str) else url_raw + canonical_raw = latest_absolute_data.get('canonical') or url + canonical = str(canonical_raw) if not isinstance(canonical_raw, str) else canonical_raw + + dependencies_raw = latest_absolute_data.get('dependencies', []) + dependencies = [] + if isinstance(dependencies_raw, dict): + dependencies = [{"name": str(dn), "version": str(dv)} for dn, dv in dependencies_raw.items()] + elif isinstance(dependencies_raw, list): + for dep in dependencies_raw: + if isinstance(dep, str): + if '@' in dep: + dep_name, dep_version = dep.split('@', 1) + dependencies.append({"name": dep_name, "version": dep_version}) + else: + dependencies.append({"name": dep, "version": "N/A"}) + elif isinstance(dep, dict) and 'name' in dep and 'version' in dep: + dependencies.append(dep) + + all_versions.sort(key=lambda x: x.get('pubDate', ''), reverse=True) + latest_version = final_official_version or final_absolute_version or 'N/A' + + normalized_entry = { + "package_name": package_name_display, + "version": final_absolute_version, + "latest_official_version": final_official_version, + "author": author.strip(), + "description": "", + "fhir_version": fhir_version_str.strip(), + "url": url.strip(), + "canonical": canonical.strip(), + "all_versions": all_versions, + "dependencies": dependencies, + "version_count": len(all_versions), + "last_updated": datetime.utcnow().isoformat(), + "latest_version": latest_version + } + normalized_list.append(normalized_entry) + if not final_official_version: + logger.warning(f"No official version found for package '{package_name_display}'. Versions: {[v['version'] for v in all_versions]}") + else: + logger.warning(f"No valid entries found to determine details for package name key '{name_key}'. Entries: {entries}") + skipped_norm_count += len(entries) + + logger.info(f"Normalization complete. Entries considered: {total_entries_considered}, Skipped during norm: {skipped_norm_count}, Unique Packages Found: {len(normalized_list)}") + normalized_list.sort(key=lambda x: x.get('package_name', '').lower()) + return normalized_list + +def cache_packages(normalized_packages, db_session): + """Cache normalized FHIR Implementation Guide packages in the CachedPackage database.""" + logger.info("Starting to cache packages") + try: + batch_size = 10 + for i in range(0, len(normalized_packages), batch_size): + batch = normalized_packages[i:i + batch_size] + logger.info(f"Processing batch {i//batch_size + 1} with {len(batch)} packages") + for package in batch: + existing = db_session.query(CachedPackage).filter_by(package_name=package['package_name']).first() + if existing: + existing.version = package['version'] + existing.latest_official_version = package['latest_official_version'] + existing.author = package['author'] + existing.description = package['description'] + existing.fhir_version = package['fhir_version'] + existing.url = package['url'] + existing.canonical = package['canonical'] + existing.all_versions = package['all_versions'] + existing.dependencies = package['dependencies'] + existing.version_count = package['version_count'] + existing.last_updated = package['last_updated'] + existing.latest_version = package['latest_version'] + else: + new_package = CachedPackage(**package) + db_session.add(new_package) + db_session.commit() + logger.info(f"Cached {len(batch)} packages in batch {i//batch_size + 1}") + logger.info(f"Successfully cached {len(normalized_packages)} packages in CachedPackage.") + except Exception as error: + db_session.rollback() + logger.error(f"Error caching packages: {error}") + refresh_status["errors"].append(f"Error caching packages: {str(error)}") + raise + logger.info("Finished caching packages") + +def should_sync_packages(db_session): + """Check if the database is empty or data is older than 4 hours.""" + logger.info("Checking if sync is needed") + try: + package_count = db_session.query(CachedPackage).count() + if package_count == 0: + logger.info("Database is empty, triggering sync") + return True + + latest_package = db_session.query(CachedPackage).order_by(CachedPackage.last_updated.desc()).first() + if not latest_package or not latest_package.last_updated: + logger.info("No valid last_updated timestamp, triggering sync") + return True + + try: + last_updated = datetime.fromisoformat(latest_package.last_updated.replace('Z', '+00:00')) + time_diff = datetime.utcnow() - last_updated + if time_diff.total_seconds() > 4 * 3600: + logger.info(f"Data is {time_diff.total_seconds()/3600:.2f} hours old, triggering sync") + return True + else: + logger.info(f"Data is {time_diff.total_seconds()/3600:.2f} hours old, using current dataset") + return False + except ValueError: + logger.warning("Invalid last_updated format, triggering sync") + return True + except Exception as e: + logger.error(f"Error checking sync status: {str(e)}") + return True + +def sync_packages(): + """Syndicate package metadata from RSS feeds and package registries.""" + logger.info("Starting RSS feed refresh") + global refresh_status, app_config + refresh_status["errors"] = [] + temp_packages = [] + app_config["FETCH_IN_PROGRESS"] = True + + db = SessionLocal() + try: + registries = get_additional_registries() + if not registries: + logger.error("No registries fetched. Cannot proceed with package syndication.") + refresh_status["errors"].append("No registries fetched. Syndication aborted.") + app_config["FETCH_IN_PROGRESS"] = False + return + + for feed in registries: + if not feed['url'].startswith(('http://', 'https://')): + logger.warning(f"Skipping invalid feed URL: {feed['url']}") + continue + try: + entries = fetch_feed(feed) + normalized_packages = normalize_package_data(entries, feed["url"]) + temp_packages.extend(normalized_packages) + except Exception as e: + logger.error(f"Failed to process feed {feed['name']}: {str(e)}") + refresh_status["errors"].append(f"Failed to process feed {feed['name']}: {str(e)}") + + now_ts = datetime.utcnow().isoformat() + app_config["MANUAL_PACKAGE_CACHE"] = temp_packages + app_config["MANUAL_CACHE_TIMESTAMP"] = now_ts + + logger.info("Updating database with fetched packages") + try: + db.query(CachedPackage).delete() + db.flush() + logger.info("Cleared existing data in cached_packages table") + cache_packages(temp_packages, db) + timestamp_info = db.query(RegistryCacheInfo).first() + if timestamp_info: + timestamp_info.last_fetch_timestamp = datetime.fromisoformat(now_ts.replace('Z', '+00:00')) + else: + timestamp_info = RegistryCacheInfo(last_fetch_timestamp=datetime.fromisoformat(now_ts.replace('Z', '+00:00'))) + db.add(timestamp_info) + db.commit() + refresh_status["last_refresh"] = now_ts + logger.info(f"Refreshed database with {len(temp_packages)} packages") + except Exception as e: + db.rollback() + logger.error(f"Failed to update database: {str(e)}") + refresh_status["errors"].append(f"Database update failed: {str(e)}") + raise + finally: + app_config["FETCH_IN_PROGRESS"] = False + db.close() + logger.info("Closed database session after sync") + logger.info("Finished syncing packages") + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Lifespan handler for FastAPI startup and shutdown.""" + os.makedirs("instance", exist_ok=True) + db = SessionLocal() + try: + db_path = "instance/fhir_igs.db" + if os.path.exists(db_path) and os.path.getsize(db_path) > 0: + logger.info("Database file exists and has data. Checking if sync is needed...") + if should_sync_packages(db): + logger.info("Data is outdated or missing, triggering sync") + sync_packages() + else: + logger.info("Using existing dataset at startup") + cached_packages = db.query(CachedPackage).all() + normalized_packages = [] + for pkg in cached_packages: + pkg_data = { + "package_name": pkg.package_name, + "version": pkg.version, + "latest_official_version": pkg.latest_official_version, + "author": pkg.author, + "description": pkg.description, + "fhir_version": pkg.fhir_version, + "url": pkg.url, + "canonical": pkg.canonical, + "all_versions": pkg.all_versions, + "dependencies": pkg.dependencies, + "version_count": pkg.version_count, + "last_updated": pkg.last_updated, + "latest_version": pkg.latest_version + } + normalized_packages.append(pkg_data) + app_config["MANUAL_PACKAGE_CACHE"] = normalized_packages + db_timestamp_info = db.query(RegistryCacheInfo).first() + db_timestamp = db_timestamp_info.last_fetch_timestamp if db_timestamp_info else None + app_config["MANUAL_CACHE_TIMESTAMP"] = db_timestamp.isoformat() if db_timestamp else datetime.utcnow().isoformat() + logger.info(f"Loaded {len(normalized_packages)} packages into in-memory cache from database.") + else: + logger.info("Database file does not exist or is empty, triggering sync") + sync_packages() + yield + finally: + db.close() + logger.info("Closed database session after lifespan shutdown") + +app.lifespan = lifespan + +@app.get("/igs/search", response_model=SearchResponse) +async def search_igs(query: str = ''): + """Search for IGs with embedded logic from FHIRFLARE's search_and_import and api_search_packages.""" + logger.info(f"Searching IGs with query: {query}") + db = SessionLocal() + try: + in_memory_packages = app_config["MANUAL_PACKAGE_CACHE"] + in_memory_timestamp = app_config["MANUAL_CACHE_TIMESTAMP"] + db_timestamp_info = db.query(RegistryCacheInfo).first() + db_timestamp = db_timestamp_info.last_fetch_timestamp if db_timestamp_info else None + logger.debug(f"DB Timestamp: {db_timestamp}, In-Memory Timestamp: {in_memory_timestamp}") + + normalized_packages = None + fetch_failed_flag = False + display_timestamp = None + is_fetching = False + + fetch_in_progress = app_config["FETCH_IN_PROGRESS"] + if fetch_in_progress and in_memory_packages is not None: + normalized_packages = in_memory_packages + display_timestamp = in_memory_timestamp + fetch_failed_flag = len(refresh_status["errors"]) > 0 + app_config["FETCH_IN_PROGRESS"] = False + elif in_memory_packages is not None: + logger.info(f"Using in-memory cached package list from {in_memory_timestamp}.") + normalized_packages = in_memory_packages + display_timestamp = in_memory_timestamp + fetch_failed_flag = len(refresh_status["errors"]) > 0 + else: + cached_packages = db.query(CachedPackage).all() + if cached_packages: + logger.info(f"Loading {len(cached_packages)} packages from CachedPackage table.") + normalized_packages = [] + for pkg in cached_packages: + pkg_data = { + "package_name": pkg.package_name, + "version": pkg.version, + "latest_official_version": pkg.latest_official_version, + "author": pkg.author, + "description": pkg.description, + "fhir_version": pkg.fhir_version, + "url": pkg.url, + "canonical": pkg.canonical, + "all_versions": pkg.all_versions, + "dependencies": pkg.dependencies, + "version_count": pkg.version_count, + "last_updated": pkg.last_updated, + "latest_version": pkg.latest_version + } + normalized_packages.append(pkg_data) + app_config["MANUAL_PACKAGE_CACHE"] = normalized_packages + app_config["MANUAL_CACHE_TIMESTAMP"] = db_timestamp.isoformat() if db_timestamp else datetime.utcnow().isoformat() + display_timestamp = app_config["MANUAL_CACHE_TIMESTAMP"] + fetch_failed_flag = len(refresh_status["errors"]) > 0 + logger.info(f"Loaded {len(normalized_packages)} packages into in-memory cache from database.") + else: + logger.info("No packages found in CachedPackage table. Fetching from registries...") + is_fetching = True + app_config["FETCH_IN_PROGRESS"] = True + sync_packages() + normalized_packages = app_config["MANUAL_PACKAGE_CACHE"] + display_timestamp = app_config["MANUAL_CACHE_TIMESTAMP"] + fetch_failed_flag = len(refresh_status["errors"]) > 0 + + if not isinstance(normalized_packages, list): + logger.error(f"normalized_packages is not a list (type: {type(normalized_packages)}). Using empty list.") + normalized_packages = [] + fetch_failed_flag = True + + logger.info("Filtering packages based on query") + if query: + filtered_packages = [ + pkg for pkg in normalized_packages + if isinstance(pkg, dict) and ( + query.lower() in pkg.get('package_name', '').lower() or + query.lower() in pkg.get('author', '').lower() + ) + ] + logger.debug(f"Filtered {len(normalized_packages)} cached packages down to {len(filtered_packages)} for term '{query}'") + else: + filtered_packages = normalized_packages + logger.debug(f"No search term provided, using all {len(filtered_packages)} cached packages.") + + logger.info("Starting fuzzy search") + search_data = [(pkg['package_name'], pkg, 'name') for pkg in filtered_packages] + search_data += [(pkg['description'], pkg, 'description') for pkg in filtered_packages if pkg['description']] + results = process.extract(query.lower(), [item[0].lower() for item in search_data], limit=100, scorer=fuzz.partial_ratio, score_cutoff=70) + logger.info(f"Fuzzy search completed with {len(results)} results") + + logger.info("Building response packages") + packages_to_display = [] + seen_names = set() + for matched_text, score, index in results: + pkg = search_data[index][1] + source = search_data[index][2] + if pkg['package_name'] not in seen_names: + seen_names.add(pkg['package_name']) + adjusted_score = score * 1.5 if source == 'name' else score * 0.8 + logger.info(f"Matched IG: {pkg['package_name']} (source: {source}, score: {score}, adjusted: {adjusted_score})") + packages_to_display.append({ + "id": pkg['package_name'], + "name": pkg['package_name'], + "description": pkg['description'], + "url": pkg['url'], + "Author": pkg['author'], + "fhir_version": pkg['fhir_version'], + "Latest_Version": pkg['latest_version'], + "version_count": pkg['version_count'], + "all_versions": pkg['all_versions'] or [], + "relevance": adjusted_score / 100.0 + }) + + packages_to_display.sort(key=lambda x: x['relevance'], reverse=True) + total = len(packages_to_display) + logger.info(f"Total packages to display: {total}") + + logger.info("Returning search response") + return SearchResponse( + packages=packages_to_display, + total=total, + last_cached_timestamp=display_timestamp, + fetch_failed=fetch_failed_flag, + is_fetching=is_fetching + ) + finally: + db.close() + logger.info("Closed database session after search") + +def download_package(ig_name: str, version: str, package: Dict) -> tuple[str, Optional[str]]: + """Download the .tgz file for the given IG and version, mimicking FHIRFLARE's import_package_and_dependencies.""" + # Create a temporary directory for downloads + download_dir = "instance/fhir_packages" + os.makedirs(download_dir, exist_ok=True) + tgz_filename = f"{ig_name}-{version}.tgz".replace('/', '_') + tgz_path = os.path.join(download_dir, tgz_filename) + + # Check if package already exists + if os.path.exists(tgz_path): + logger.info(f"Package {ig_name}#{version} already exists at {tgz_path}") + return tgz_path, None + + # Try canonical URL first (most reliable) + canonical_url = package.get('canonical') + if canonical_url and canonical_url.endswith(f"{version}/package.tgz"): + logger.info(f"Attempting to fetch package from canonical URL: {canonical_url}") + try: + response = requests.get(canonical_url, stream=True, timeout=30) + response.raise_for_status() + with open(tgz_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + logger.info(f"Successfully downloaded {ig_name}#{version} to {tgz_path} using canonical URL") + return tgz_path, None + except requests.RequestException as e: + error_msg = f"Failed to fetch package from canonical URL {canonical_url}: {str(e)}" + logger.warning(error_msg) + + # Try primary FHIR registry base URL (e.g., https://packages.fhir.org/hl7.fhir.au.core/1.1.0-preview/) + base_url = f"{FHIR_REGISTRY_BASE_URL}/{ig_name}/{version}/" + logger.info(f"Attempting to fetch package from FHIR registry base URL: {base_url}") + try: + response = requests.get(base_url, stream=True, timeout=30) + response.raise_for_status() + # Check if the response is a .tgz file + content_type = response.headers.get('Content-Type', '') + content_disposition = response.headers.get('Content-Disposition', '') + if 'application/x-tar' in content_type or content_disposition.endswith('.tgz') or base_url.endswith('.tgz'): + with open(tgz_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + logger.info(f"Successfully downloaded {ig_name}#{version} to {tgz_path} using FHIR registry base URL") + return tgz_path, None + else: + error_msg = f"FHIR registry base URL {base_url} did not return a .tgz file (Content-Type: {content_type})" + logger.warning(error_msg) + except requests.RequestException as e: + error_msg = f"Failed to fetch package from FHIR registry base URL {base_url}: {str(e)}" + logger.warning(error_msg) + + # Fallback: Try FHIR registry with explicit /package.tgz + tgz_url = f"{FHIR_REGISTRY_BASE_URL}/{ig_name}/{version}/package.tgz" + logger.info(f"Attempting to fetch package from FHIR registry explicit URL: {tgz_url}") + try: + response = requests.get(tgz_url, stream=True, timeout=30) + response.raise_for_status() + with open(tgz_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + logger.info(f"Successfully downloaded {ig_name}#{version} to {tgz_path} using FHIR registry explicit URL") + return tgz_path, None + except requests.RequestException as e: + error_msg = f"Failed to fetch package from FHIR registry explicit URL {tgz_url}: {str(e)}" + logger.warning(error_msg) + + # Fallback: Use registry URL (e.g., Simplifier) + registry_url = package.get('registry', 'https://packages.simplifier.net') + if registry_url.endswith('/rssfeed'): + registry_url = registry_url[:-8] + tgz_url = f"{registry_url}/{ig_name}/{version}/package.tgz" + logger.info(f"Attempting to fetch package from registry URL: {tgz_url}") + try: + response = requests.get(tgz_url, stream=True, timeout=30) + response.raise_for_status() + with open(tgz_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + logger.info(f"Successfully downloaded {ig_name}#{version} to {tgz_path} using registry URL") + return tgz_path, None + except requests.RequestException as e: + error_msg = f"Failed to fetch package from registry URL {tgz_url}: {str(e)}" + logger.error(error_msg) + return None, error_msg + + return None, "All download attempts failed." + +@app.get("/igs/{ig_id}/profiles", response_model=List[ProfileMetadata]) +async def list_profiles(ig_id: str, version: Optional[str] = None): + """List StructureDefinition profiles in the specified IG, optionally for a specific version.""" + logger.info(f"Listing profiles for IG: {ig_id}, version: {version}") + + # Parse ig_id for version if it includes a '#' + ig_name = ig_id + if '#' in ig_id: + parts = ig_id.split('#', 1) + ig_name = parts[0] + if version and parts[1] != version: + logger.warning(f"Version specified in ig_id ({parts[1]}) conflicts with version parameter ({version}). Using version parameter.") + else: + version = parts[1] + logger.info(f"Parsed ig_id: name={ig_name}, version={version}") + + # Validate ig_name + if not ig_name or not re.match(r'^[a-zA-Z0-9\.\-_]+$', ig_name): + logger.error(f"Invalid IG name: {ig_name}") + raise HTTPException(status_code=400, detail="Invalid IG name. Use format like 'hl7.fhir.au.core'.") + + # Validate version if provided + if version and not re.match(r'^[a-zA-Z0-9\.\-_]+$', version): + logger.error(f"Invalid version: {version}") + raise HTTPException(status_code=400, detail="Invalid version format. Use format like '1.1.0-preview'.") + + # Check if profiles are cached + cache_key = f"{ig_name}#{version if version else 'latest'}" + if cache_key in app_config["PROFILE_CACHE"]: + logger.info(f"Returning cached profiles for IG {ig_name} (version: {version if version else 'latest'})") + return app_config["PROFILE_CACHE"][cache_key] + + # Fetch package metadata from cache + packages = app_config["MANUAL_PACKAGE_CACHE"] + if not packages: + logger.error("Package cache is empty. Please refresh the cache using /refresh-cache.") + raise HTTPException(status_code=500, detail="Package cache is empty. Please refresh the cache.") + + # Find the package + package = None + for pkg in packages: + if pkg['package_name'].lower() == ig_name.lower(): + package = pkg + break + + if not package: + logger.error(f"IG {ig_name} not found in cached packages.") + raise HTTPException(status_code=404, detail=f"IG '{ig_name}' not found.") + + # Determine the version to fetch + if version: + target_version = None + for ver_entry in package['all_versions']: + if ver_entry['version'] == version: + target_version = ver_entry['version'] + break + if not target_version: + logger.error(f"Version {version} not found for IG {ig_name}.") + raise HTTPException(status_code=404, detail=f"Version '{version}' not found for IG '{ig_name}'.") + else: + target_version = package['latest_version'] + version = target_version + logger.info(f"No version specified, using latest version: {target_version}") + + # Download the package using updated logic + tgz_path, error = download_package(ig_name, version, package) + if not tgz_path: + logger.error(f"Failed to download package for IG {ig_name} (version: {version}): {error}") + if "404" in error: + raise HTTPException(status_code=404, detail=f"Package for IG '{ig_name}' (version: {version}) not found.") + raise HTTPException(status_code=500, detail=f"Failed to fetch package: {error}") + + # Extract profiles from the .tgz file + profiles = [] + try: + with tarfile.open(tgz_path, mode="r:gz") as tar: + for member in tar.getmembers(): + if member.name.endswith('.json') and 'StructureDefinition' in member.name: + f = tar.extractfile(member) + if f: + resource = json.load(f) + if resource.get("resourceType") == "StructureDefinition": + profiles.append(ProfileMetadata( + name=resource.get("name", ""), + description=resource.get("description"), + version=resource.get("version"), + url=resource.get("url", "") + )) + except Exception as e: + logger.error(f"Failed to extract profiles from package for IG {ig_name} (version: {version}): {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to extract profiles: {str(e)}") + + # Cache the profiles + app_config["PROFILE_CACHE"][cache_key] = profiles + logger.info(f"Cached {len(profiles)} profiles for IG {ig_name} (version: {version})") + + logger.info(f"Found {len(profiles)} profiles in IG {ig_name} (version: {version})") + return profiles + +@app.get("/igs/{ig_id}/profiles/{profile_id}", response_model=StructureDefinition) +async def get_profile(ig_id: str, profile_id: str, version: Optional[str] = None, include_narrative: bool = True): + """ + Retrieve a specific StructureDefinition from an Implementation Guide (IG). + + This endpoint fetches a specific FHIR StructureDefinition (profile) from the given IG. + It supports optional version specification and an option to strip the narrative content. + + Args: + ig_id (str): The ID of the Implementation Guide (e.g., 'hl7.fhir.au.core' or 'hl7.fhir.au.core#1.1.0-preview'). + If the version is included in the ig_id (after '#'), it takes precedence unless overridden by the version parameter. + profile_id (str): The ID or name of the profile to retrieve (e.g., 'AUCorePatient'). + version (str, optional): The version of the IG (e.g., '1.1.0-preview'). If not provided and ig_id contains a version, + the version from ig_id is used; otherwise, the latest version is used. + include_narrative (bool, optional): Whether to include the narrative (`text` element) in the StructureDefinition. + Defaults to True. Set to False to strip the narrative, removing human-readable content. + + Returns: + StructureDefinition: A dictionary containing the requested StructureDefinition resource. + The response includes the `resource` field with the StructureDefinition JSON. + If `include_narrative=False`, the `text` element will be set to null. + + Raises: + HTTPException: + - 400: If the IG name, version, or profile ID is invalid. + - 404: If the IG, version, or profile is not found. + - 500: If an error occurs during package retrieval or profile extraction. + + Example: + - GET /igs/hl7.fhir.au.core/profiles/AUCorePatient?version=1.1.0-preview + Returns the AUCorePatient profile with narrative included. + - GET /igs/hl7.fhir.au.core/profiles/AUCorePatient?version=1.1.0-preview&include_narrative=false + Returns the AUCorePatient profile with the narrative (`text` element) stripped. + """ + logger.info(f"Retrieving profile {profile_id} for IG: {ig_id}, version: {version}, include_narrative: {include_narrative}") + + # Parse ig_id for version if it includes a '#' + ig_name = ig_id + if '#' in ig_id: + parts = ig_id.split('#', 1) + ig_name = parts[0] + if version and parts[1] != version: + logger.warning(f"Version specified in ig_id ({parts[1]}) conflicts with version parameter ({version}). Using version parameter.") + else: + version = parts[1] + logger.info(f"Parsed ig_id: name={ig_name}, version={version}") + + # Validate ig_name + if not ig_name or not re.match(r'^[a-zA-Z0-9\.\-_]+$', ig_name): + logger.error(f"Invalid IG name: {ig_name}") + raise HTTPException(status_code=400, detail="Invalid IG name. Use format like 'hl7.fhir.au.core'.") + + # Validate version if provided + if version and not re.match(r'^[a-zA-Z0-9\.\-_]+$', version): + logger.error(f"Invalid version: {version}") + raise HTTPException(status_code=400, detail="Invalid version format. Use format like '1.1.0-preview'.") + + # Validate profile_id + if not profile_id or not re.match(r'^[a-zA-Z0-9\.\-_]+$', profile_id): + logger.error(f"Invalid profile ID: {profile_id}") + raise HTTPException(status_code=400, detail="Invalid profile ID format.") + + # Check if profiles are cached + cache_key = f"{ig_name}#{version if version else 'latest'}" + if cache_key in app_config["PROFILE_CACHE"]: + logger.info(f"Using cached profiles for IG {ig_name} (version: {version if version else 'latest'})") + profiles = app_config["PROFILE_CACHE"][cache_key] + for profile in profiles: + if profile.name == profile_id or profile.url.endswith(profile_id): + break + else: + logger.error(f"Profile {profile_id} not found in cached profiles for IG {ig_name} (version: {version if version else 'latest'})") + raise HTTPException(status_code=404, detail=f"Profile '{profile_id}' not found in IG '{ig_name}' (version: {version if version else 'latest'}).") + else: + profiles = await list_profiles(ig_id, version) + + # Fetch package metadata + packages = app_config["MANUAL_PACKAGE_CACHE"] + if not packages: + logger.error("Package cache is empty. Please refresh the cache using /refresh-cache.") + raise HTTPException(status_code=500, detail="Package cache is empty. Please refresh the cache.") + + package = None + for pkg in packages: + if pkg['package_name'].lower() == ig_name.lower(): + package = pkg + break + + if not package: + logger.error(f"IG {ig_name} not found in cached packages.") + raise HTTPException(status_code=404, detail=f"IG '{ig_name}' not found.") + + if version: + target_version = None + for ver_entry in package['all_versions']: + if ver_entry['version'] == version: + target_version = ver_entry['version'] + break + if not target_version: + logger.error(f"Version {version} not found for IG {ig_name}.") + raise HTTPException(status_code=404, detail=f"Version '{version}' not found for IG '{ig_name}'.") + else: + target_version = package['latest_version'] + version = target_version + logger.info(f"No version specified, using latest version: {target_version}") + + # Download the package + tgz_path, error = download_package(ig_name, version, package) + if not tgz_path: + logger.error(f"Failed to download package for IG {ig_name} (version: {version}): {error}") + if "404" in error: + raise HTTPException(status_code=404, detail=f"Package for IG '{ig_name}' (version: {version}) not found.") + raise HTTPException(status_code=500, detail=f"Failed to fetch package: {error}") + + # Extract the specific profile + try: + with tarfile.open(tgz_path, mode="r:gz") as tar: + for member in tar.getmembers(): + if member.name.endswith('.json'): + f = tar.extractfile(member) + if f: + resource = json.load(f) + if (resource.get("resourceType") == "StructureDefinition" and + (resource.get("name") == profile_id or resource.get("id") == profile_id or resource.get("url", "").endswith(profile_id))): + # Strip narrative if include_narrative is False + if not include_narrative: + if "text" in resource: + logger.info(f"Stripping narrative from profile {profile_id} for IG {ig_name} (version: {version})") + resource["text"] = None + logger.info(f"Successfully retrieved profile {profile_id} for IG {ig_name} (version: {version})") + return StructureDefinition(resource=resource) + except Exception as e: + logger.error(f"Failed to extract profile {profile_id} from package for IG {ig_name} (version: {version}): {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to extract profile: {str(e)}") + + logger.error(f"Profile {profile_id} not found in package for IG {ig_name} (version: {version})") + raise HTTPException(status_code=404, detail=f"Profile '{profile_id}' not found in IG '{ig_name}' (version: {version}).") + +@app.get("/status", response_model=RefreshStatus) +async def get_refresh_status(): + """Return the status of the last refresh.""" + logger.info("Fetching refresh status") + db = SessionLocal() + try: + package_count = db.query(CachedPackage).count() + return RefreshStatus( + last_refresh=refresh_status["last_refresh"], + package_count=package_count, + errors=refresh_status["errors"] + ) + finally: + db.close() + logger.info("Closed database session after status check") + +@app.post("/refresh-cache", response_model=RefreshStatus) +async def refresh_cache(): + """Force a refresh of the package cache.""" + logger.info("Forcing cache refresh via API") + sync_packages() + db = SessionLocal() + try: + package_count = db.query(CachedPackage).count() + return RefreshStatus( + last_refresh=refresh_status["last_refresh"], + package_count=package_count, + errors=refresh_status["errors"] + ) + finally: + db.close() + logger.info("Closed database session after cache refresh") + +if __name__ == "__main__": + import uvicorn + port = int(os.getenv("PORT", 8000)) + uvicorn.run(app, host="0.0.0.0", port=port) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6325656 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +fastapi==0.115.0 +uvicorn==0.30.6 +feedparser==6.0.11 +requests==2.32.3 +rapidfuzz==3.10.0 +pydantic==2.9.2 +sqlalchemy==2.0.35 +packaging==24.1 +apscheduler==3.10.4 +tenacity==8.5.0 \ No newline at end of file