Merge pull request #15 from Sudo-JHare/Patch

Patched
This commit is contained in:
Sudo-Xops 2025-05-14 10:06:08 +10:00 committed by GitHub
commit d2e05ee4a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 317 additions and 50 deletions

140
app.py
View File

@ -42,6 +42,7 @@ from services import (
from forms import IgImportForm, ValidationForm, FSHConverterForm, TestDataUploadForm, RetrieveSplitDataForm
from wtforms import SubmitField
from package import package_bp
from copy import deepcopy
import tempfile
from logging.handlers import RotatingFileHandler
@ -913,6 +914,101 @@ def get_example():
logger.error(f"Unexpected error getting example '{filename}' from {tgz_filename}: {e}", exc_info=True)
return jsonify({"error": f"Unexpected error: {str(e)}"}), 500
#----------------------------------------------------------------------new
def collect_all_structure_definitions(tgz_path):
"""Collect all StructureDefinitions from a .tgz package."""
structure_definitions = {}
try:
with tarfile.open(tgz_path, "r:gz") as tar:
for member in tar:
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
continue
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
continue
fileobj = None
try:
fileobj = tar.extractfile(member)
if fileobj:
content_bytes = fileobj.read()
content_string = content_bytes.decode('utf-8-sig')
data = json.loads(content_string)
if isinstance(data, dict) and data.get('resourceType') == 'StructureDefinition':
sd_url = data.get('url')
if sd_url:
structure_definitions[sd_url] = data
except Exception as e:
logger.warning(f"Could not read/parse potential SD {member.name}, skipping: {e}")
finally:
if fileobj:
fileobj.close()
except Exception as e:
logger.error(f"Unexpected error collecting StructureDefinitions from {tgz_path}: {e}", exc_info=True)
return structure_definitions
def generate_snapshot(structure_def, core_package_path, local_package_path):
"""Generate a snapshot by merging the differential with the base StructureDefinition."""
if 'snapshot' in structure_def:
return structure_def
# Fetch all StructureDefinitions from the local package for reference resolution
local_sds = collect_all_structure_definitions(local_package_path)
# Get the base StructureDefinition from the core package
base_url = structure_def.get('baseDefinition')
if not base_url:
logger.error("No baseDefinition found in StructureDefinition.")
return structure_def
resource_type = structure_def.get('type')
base_sd_data, _ = services.find_and_extract_sd(core_package_path, resource_type, profile_url=base_url)
if not base_sd_data or 'snapshot' not in base_sd_data:
logger.error(f"Could not fetch or find snapshot in base StructureDefinition: {base_url}")
return structure_def
# Copy the base snapshot elements
snapshot_elements = deepcopy(base_sd_data['snapshot']['element'])
differential_elements = structure_def.get('differential', {}).get('element', [])
# Map snapshot elements by path and id for easier lookup
snapshot_by_path = {el['path']: el for el in snapshot_elements}
snapshot_by_id = {el['id']: el for el in snapshot_elements if 'id' in el}
# Process differential elements
for diff_el in differential_elements:
diff_path = diff_el.get('path')
diff_id = diff_el.get('id')
# Resolve extensions or referenced types
if 'type' in diff_el:
for type_info in diff_el['type']:
if 'profile' in type_info:
for profile_url in type_info['profile']:
if profile_url in local_sds:
# Add elements from the referenced StructureDefinition
ref_sd = local_sds[profile_url]
ref_elements = ref_sd.get('snapshot', {}).get('element', []) or ref_sd.get('differential', {}).get('element', [])
for ref_el in ref_elements:
# Adjust paths to fit within the current structure
ref_path = ref_el.get('path')
if ref_path.startswith(ref_sd.get('type')):
new_path = diff_path + ref_path[len(ref_sd.get('type')):]
new_el = deepcopy(ref_el)
new_el['path'] = new_path
new_el['id'] = diff_id + ref_path[len(ref_sd.get('type')):]
snapshot_elements.append(new_el)
# Find matching element in snapshot
target_el = snapshot_by_id.get(diff_id) or snapshot_by_path.get(diff_path)
if target_el:
# Update existing element with differential constraints
target_el.update(diff_el)
else:
# Add new element (e.g., extensions or new slices)
snapshot_elements.append(diff_el)
structure_def['snapshot'] = {'element': snapshot_elements}
return structure_def
@app.route('/get-structure')
def get_structure():
package_name = request.args.get('package_name')
@ -954,7 +1050,6 @@ def get_structure():
if sd_data is None:
logger.info(f"SD for '{resource_type}' not found or failed to load from {source_package_id}. Attempting fallback to {services.CANONICAL_PACKAGE_ID}.")
if not core_package_exists:
logger.error(f"Core package {services.CANONICAL_PACKAGE_ID} not found locally at {core_tgz_path}.")
error_message = f"SD for '{resource_type}' not found in primary package, and core package is missing." if primary_package_exists else f"Primary package {package_name}#{version} and core package are missing."
return jsonify({"error": error_message}), 500 if primary_package_exists else 404
try:
@ -970,23 +1065,43 @@ def get_structure():
if not sd_data:
logger.error(f"SD for '{resource_type}' could not be found in primary or fallback packages.")
return jsonify({"error": f"StructureDefinition for '{resource_type}' not found."}), 404
# Generate snapshot if missing
if 'snapshot' not in sd_data:
logger.info(f"Snapshot missing for {resource_type}. Generating snapshot...")
sd_data = generate_snapshot(sd_data, core_tgz_path, tgz_path)
if raw:
return Response(json.dumps(sd_data, indent=None, separators=(',', ':')), mimetype='application/json')
# Prepare elements based on the view
snapshot_elements = sd_data.get('snapshot', {}).get('element', [])
differential_elements = sd_data.get('differential', {}).get('element', [])
differential_ids = {el.get('id') for el in differential_elements if el.get('id')}
logger.debug(f"Found {len(differential_ids)} unique IDs in differential.")
# Select elements based on the view
enriched_elements = []
if snapshot_elements:
logger.debug(f"Processing {len(snapshot_elements)} snapshot elements to add isInDifferential flag.")
for element in snapshot_elements:
element_id = element.get('id')
element['isInDifferential'] = bool(element_id and element_id in differential_ids)
enriched_elements.append(element)
enriched_elements = [services.remove_narrative(el, include_narrative=include_narrative) for el in enriched_elements]
else:
logger.warning(f"No snapshot found for {resource_type} in {source_package_id}. Returning empty element list.")
enriched_elements = []
if view == 'snapshot':
if snapshot_elements:
logger.debug(f"Processing {len(snapshot_elements)} snapshot elements for Snapshot view.")
for element in snapshot_elements:
element_id = element.get('id')
element['isInDifferential'] = bool(element_id and element_id in differential_ids)
enriched_elements.append(element)
else:
logger.warning(f"No snapshot elements found for {resource_type} in {source_package_id} for Snapshot view.")
else: # Differential, Must Support, Key Elements views use differential elements as a base
if differential_elements:
logger.debug(f"Processing {len(differential_elements)} differential elements for {view} view.")
for element in differential_elements:
element['isInDifferential'] = True
enriched_elements.append(element)
else:
logger.warning(f"No differential elements found for {resource_type} in {source_package_id} for {view} view.")
enriched_elements = [services.remove_narrative(el, include_narrative=include_narrative) for el in enriched_elements]
must_support_paths = []
processed_ig_record = ProcessedIg.query.filter_by(package_name=package_name, version=version).first()
if processed_ig_record and processed_ig_record.must_support_elements:
@ -1002,6 +1117,7 @@ def get_structure():
logger.debug(f"No specific MS paths found for keys '{resource_type}' or '{base_resource_type_for_sp}' in DB.")
else:
logger.debug(f"No processed IG record or no must_support_elements found in DB for {package_name}#{version}")
if base_resource_type_for_sp and primary_package_exists:
try:
logger.info(f"Fetching SearchParameters for base type '{base_resource_type_for_sp}' from primary package {tgz_path}")
@ -1060,6 +1176,8 @@ def get_structure():
'source_package': source_package_id
}
return Response(json.dumps(response_data, indent=None, separators=(',', ':')), mimetype='application/json')
#------------------------------------------------------------------------
@app.route('/get-package-metadata')
def get_package_metadata():

View File

@ -735,26 +735,76 @@ def basic_fhir_xml_to_dict(xml_string):
logger.error(f"Unexpected error during basic_fhir_xml_to_dict: {e}", exc_info=True)
return None
# def parse_package_filename(filename):
# """Parses a standard FHIR package filename into name and version."""
# if not filename or not filename.endswith('.tgz'):
# logger.debug(f"Filename '{filename}' does not end with .tgz")
# return None, None
# base_name = filename[:-4]
# last_hyphen_index = base_name.rfind('-')
# while last_hyphen_index != -1:
# potential_name = base_name[:last_hyphen_index]
# potential_version = base_name[last_hyphen_index + 1:]
# if potential_version and (potential_version[0].isdigit() or any(potential_version.startswith(kw) for kw in ['v', 'dev', 'draft', 'preview', 'release', 'alpha', 'beta'])):
# name = potential_name.replace('_', '.')
# version = potential_version
# logger.debug(f"Parsed '{filename}' -> name='{name}', version='{version}'")
# return name, version
# else:
# last_hyphen_index = base_name.rfind('-', 0, last_hyphen_index)
# logger.warning(f"Could not parse version from '{filename}'. Treating '{base_name}' as name.")
# name = base_name.replace('_', '.')
# version = ""
# return name, version
def parse_package_filename(filename):
"""Parses a standard FHIR package filename into name and version."""
"""
Parses a standard FHIR package filename into name and version.
Handles various version formats including semantic versions, pre-releases, snapshots, and complex suffixes.
"""
if not filename or not filename.endswith('.tgz'):
logger.debug(f"Filename '{filename}' does not end with .tgz")
return None, None
base_name = filename[:-4]
last_hyphen_index = base_name.rfind('-')
while last_hyphen_index != -1:
potential_name = base_name[:last_hyphen_index]
potential_version = base_name[last_hyphen_index + 1:]
if potential_version and (potential_version[0].isdigit() or any(potential_version.startswith(kw) for kw in ['v', 'dev', 'draft', 'preview', 'release', 'alpha', 'beta'])):
name = potential_name.replace('_', '.')
version = potential_version
logger.debug(f"Parsed '{filename}' -> name='{name}', version='{version}'")
return name, version
else:
last_hyphen_index = base_name.rfind('-', 0, last_hyphen_index)
logger.warning(f"Could not parse version from '{filename}'. Treating '{base_name}' as name.")
name = base_name.replace('_', '.')
version = ""
base_name = filename[:-4] # Remove '.tgz'
# Define a comprehensive pattern for FHIR package versions as a single string
# Matches versions like:
# - 1.0.0, 4.0.2
# - 1.1.0-preview, 0.1.0-draft, 1.0.0-ballot-3
# - 1.0.0-alpha.1, 1.0.0-RC2, 0.9.0-alpha1.0.8
# - 1.1.0-snapshot-3, 0.0.1-snapshot
# - 2.3.5-buildnumbersuffix2
version_pattern = r'(\d+\.\d+\.\d+)(?:-(?:preview|ballot|draft|snapshot|alpha|beta|RC\d*|buildnumbersuffix\d*|alpha\d+\.\d+\.\d+|snapshot-\d+|ballot-\d+|alpha\.\d+))?$'
# Find the last occurrence of the version pattern in the base_name
match = None
for i in range(len(base_name), 0, -1):
substring = base_name[:i]
if re.search(version_pattern, substring):
match = re.search(version_pattern, base_name[:i])
if match:
break
if not match:
logger.warning(f"Could not parse version from '{filename}'. Treating '{base_name}' as name.")
name = base_name.replace('_', '.')
version = ""
return name, version
# Extract the matched version
version_start_idx = match.start(1) # Start of the version (e.g., start of "1.1.0" in "1.1.0-preview")
name = base_name[:version_start_idx].rstrip('-').replace('_', '.') # Everything before the version
version = base_name[version_start_idx:] # The full version string
# Validate the name and version
if not name or not version:
logger.warning(f"Invalid parse for '{filename}': name='{name}', version='{version}'. Using fallback.")
name = base_name.replace('_', '.')
version = ""
return name, version
logger.debug(f"Parsed '{filename}' -> name='{name}', version='{version}'")
return name, version
def remove_narrative(resource, include_narrative=False):
@ -1953,30 +2003,129 @@ def _load_definition(details, download_dir):
logger.error(f"Failed to load definition {details['filename']} from {tgz_path}: {e}")
return None
def download_package(name, version):
"""Downloads a single FHIR package."""
# def download_package(name, version):
# """Downloads a single FHIR package."""
# download_dir = _get_download_dir()
# if not download_dir: return None, "Download dir error"
# filename = construct_tgz_filename(name, version)
# if not filename: return None, "Filename construction error"
# save_path = os.path.join(download_dir, filename)
# if os.path.exists(save_path):
# logger.info(f"Package already exists: {save_path}")
# return save_path, None
# package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}"
# try:
# with requests.get(package_url, stream=True, timeout=60) as r:
# r.raise_for_status()
# with open(save_path, 'wb') as f:
# for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
# logger.info(f"Downloaded {filename}")
# return save_path, None
# except requests.exceptions.RequestException as e:
# logger.error(f"Download failed for {name}#{version}: {e}")
# return None, f"Download error: {e}"
# except IOError as e:
# logger.error(f"File write error for {save_path}: {e}")
# return None, f"File write error: {e}"
def download_package(name, version, dependency_mode='none'):
"""Downloads a FHIR package by name and version to the configured directory."""
download_dir = _get_download_dir()
if not download_dir: return None, "Download dir error"
filename = construct_tgz_filename(name, version)
if not filename: return None, "Filename construction error"
save_path = os.path.join(download_dir, filename)
if os.path.exists(save_path):
logger.info(f"Package already exists: {save_path}")
return save_path, None
package_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}"
if not download_dir:
return None, ["Could not determine download directory"]
tgz_filename = construct_tgz_filename(name, version)
if not tgz_filename:
return None, [f"Could not construct filename for {name}#{version}"]
download_path = os.path.join(download_dir, tgz_filename)
errors = []
# Check if already downloaded
if os.path.exists(download_path):
logger.info(f"Package {name}#{version} already downloaded at {download_path}")
return download_path, []
# Primary download URL
primary_url = f"{FHIR_REGISTRY_BASE_URL}/{name}/{version}"
logger.info(f"Attempting download of {name}#{version} from {primary_url}")
try:
with requests.get(package_url, stream=True, timeout=60) as r:
r.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192): f.write(chunk)
logger.info(f"Downloaded {filename}")
return save_path, None
response = requests.get(primary_url, timeout=30)
response.raise_for_status()
with open(download_path, 'wb') as f:
f.write(response.content)
logger.info(f"Successfully downloaded {name}#{version} to {download_path}")
save_package_metadata(name, version, dependency_mode, [])
return download_path, []
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.warning(f"Primary download failed (404) for {name}#{version} at {primary_url}. Attempting fallback URL.")
else:
error_msg = f"Download error for {name}#{version}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None, errors
except requests.exceptions.RequestException as e:
logger.error(f"Download failed for {name}#{version}: {e}")
return None, f"Download error: {e}"
except IOError as e:
logger.error(f"File write error for {save_path}: {e}")
return None, f"File write error: {e}"
error_msg = f"Download error for {name}#{version}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None, errors
except Exception as e:
error_msg = f"Unexpected error downloading {name}#{version}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None, errors
# Fallback: Try the package's URL from the normalized package data
if errors and "404" in errors[0]:
logger.info(f"Looking up alternative download URL for {name}#{version}")
try:
# Access the in-memory cache from the Flask app config
normalized_packages = current_app.config.get('MANUAL_PACKAGE_CACHE', [])
package_data = next((pkg for pkg in normalized_packages if pkg.get('name') == name), None)
if not package_data:
error_msg = f"Package {name} not found in cache for fallback download."
logger.error(error_msg)
errors.append(error_msg)
return None, errors
package_url = package_data.get('url')
if not package_url:
error_msg = f"No alternative URL found for {name}#{version}."
logger.error(error_msg)
errors.append(error_msg)
return None, errors
# Construct a download URL using the package's URL
# Assuming the URL is a base (e.g., https://packages.simplifier.net/fhir.ieb.core)
# and we append the version to form the download URL
# This may need adjustment based on the actual format of 'url'
fallback_url = f"{package_url.rstrip('/')}/{version}.tgz"
logger.info(f"Attempting fallback download of {name}#{version} from {fallback_url}")
response = requests.get(fallback_url, timeout=30)
response.raise_for_status()
with open(download_path, 'wb') as f:
f.write(response.content)
logger.info(f"Successfully downloaded {name}#{version} using fallback URL to {download_path}")
save_package_metadata(name, version, dependency_mode, [])
return download_path, []
except requests.exceptions.HTTPError as e:
error_msg = f"Fallback download error for {name}#{version} at {fallback_url}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None, errors
except requests.exceptions.RequestException as e:
error_msg = f"Fallback download network error for {name}#{version}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None, errors
except Exception as e:
error_msg = f"Unexpected error during fallback download of {name}#{version}: {str(e)}"
logger.error(error_msg, exc_info=True)
errors.append(error_msg)
return None, errors
return None, errors
def extract_dependencies(tgz_path):
"""Extracts dependencies from package.json."""

View File

@ -10,7 +10,7 @@
</p>
<!-----------------------------------------------------------------remove the buttons-----------------------------------------------------
<div class="d-grid gap-2 d-sm-flex justify-content-sm-center">
<a href="{{ url_for('import_ig') }}" class="btn btn-primary btn-lg px-4 gap-3">Import IGs</a>
<a href="{{ url_for('search-and-import') }}" class="btn btn-primary btn-lg px-4 gap-3">Import IGs</a>
<a href="{{ url_for('view_igs') }}" class="btn btn-outline-secondary btn-lg px-4 disabled">Manage FHIR Packages</a>
<a href="{{ url_for('push_igs') }}" class="btn btn-outline-secondary btn-lg px-4">Upload IG's</a>
</div>
@ -34,7 +34,7 @@
<div class="d-flex justify-content-between align-items-center mb-4">
<h2><i class="bi bi-journal-arrow-down me-2"></i>Manage FHIR Packages</h2>
<div>
<a href="{{ url_for('import_ig') }}" class="btn btn-success"><i class="bi bi-download me-1"></i> Import More IGs</a>
<a href="{{ url_for('search-and-import') }}" class="btn btn-success"><i class="bi bi-download me-1"></i> Import More IGs</a>
</div>
</div>