Updated - Dependency Modes

This commit is contained in:
Joshua Hare 2025-04-11 13:17:09 +10:00
parent 95f80fbe91
commit d7835c7dc7
15 changed files with 757 additions and 131 deletions

54
app.py
View File

@ -1,7 +1,7 @@
from flask import Flask, render_template, render_template_string, request, redirect, url_for, flash, jsonify, Response
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms import StringField, SubmitField, SelectField
from wtforms.validators import DataRequired, Regexp
import os
import tarfile
@ -55,6 +55,11 @@ class IgImportForm(FlaskForm):
DataRequired(),
Regexp(r'^[a-zA-Z0-9\.\-]+$', message='Invalid version format.')
])
dependency_mode = SelectField('Dependency Pulling Mode', choices=[
('recursive', 'Current Recursive'),
('patch-canonical', 'Patch Canonical Versions'),
('tree-shaking', 'Tree Shaking (Only Used Dependencies)')
], default='recursive')
submit = SubmitField('Fetch & Download IG')
class ProcessedIg(db.Model):
@ -88,12 +93,16 @@ def import_ig():
if form.validate_on_submit():
name = form.package_name.data
version = form.package_version.data
dependency_mode = form.dependency_mode.data
try:
result = services.import_package_and_dependencies(name, version)
result = services.import_package_and_dependencies(name, version, dependency_mode=dependency_mode)
if result['errors'] and not result['downloaded']:
flash(f"Failed to import {name}#{version}: {result['errors'][0]}", "error")
error_msg = result['errors'][0]
# Simplify the error message by taking the last part after the last colon
simplified_msg = error_msg.split(": ")[-1] if ": " in error_msg else error_msg
flash(f"Failed to import {name}#{version}: {simplified_msg}", "error - check the name and version!")
return redirect(url_for('import_ig'))
flash(f"Successfully downloaded {name}#{version} and dependencies!", "success")
flash(f"Successfully downloaded {name}#{version} and dependencies! Mode: {dependency_mode}", "success")
return redirect(url_for('view_igs'))
except Exception as e:
flash(f"Error downloading IG: {str(e)}", "error")
@ -272,9 +281,13 @@ def delete_ig():
return redirect(url_for('view_igs'))
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename)
metadata_path = tgz_path.replace('.tgz', '.metadata.json')
if os.path.exists(tgz_path):
try:
os.remove(tgz_path)
if os.path.exists(metadata_path):
os.remove(metadata_path)
logger.debug(f"Deleted metadata file: {metadata_path}")
flash(f"Deleted {filename}", "success")
except Exception as e:
flash(f"Error deleting {filename}: {str(e)}", "error")
@ -391,6 +404,17 @@ def get_example_content():
except tarfile.TarError as e:
return jsonify({"error": f"Error reading {tgz_path}: {e}"}), 500
@app.route('/get-package-metadata')
def get_package_metadata():
package_name = request.args.get('package_name')
version = request.args.get('version')
if not package_name or not version:
return jsonify({'error': 'Missing package_name or version'}), 400
metadata = services.get_package_metadata(package_name, version)
if metadata:
return jsonify({'dependency_mode': metadata['dependency_mode']})
return jsonify({'error': 'Metadata not found'}), 404
# API Endpoint: Import IG Package
@app.route('/api/import-ig', methods=['POST'])
def api_import_ig():
@ -406,6 +430,7 @@ def api_import_ig():
data = request.get_json()
package_name = data.get('package_name')
version = data.get('version')
dependency_mode = data.get('dependency_mode', 'recursive') # Default to recursive
if not package_name or not version:
return jsonify({"status": "error", "message": "Missing package_name or version"}), 400
@ -416,9 +441,14 @@ def api_import_ig():
re.match(r'^[a-zA-Z0-9\.\-]+$', version)):
return jsonify({"status": "error", "message": "Invalid package name or version format"}), 400
# Validate dependency mode
valid_modes = ['recursive', 'patch-canonical', 'tree-shaking']
if dependency_mode not in valid_modes:
return jsonify({"status": "error", "message": f"Invalid dependency mode: {dependency_mode}. Must be one of {valid_modes}"}), 400
try:
# Import package and dependencies
result = services.import_package_and_dependencies(package_name, version)
result = services.import_package_and_dependencies(package_name, version, dependency_mode=dependency_mode)
if result['errors'] and not result['downloaded']:
return jsonify({"status": "error", "message": f"Failed to import {package_name}#{version}: {result['errors'][0]}"}), 500
@ -475,6 +505,7 @@ def api_import_ig():
"message": "Package imported successfully",
"package_name": package_name,
"version": version,
"dependency_mode": dependency_mode,
"dependencies": unique_dependencies,
"duplicates": duplicates
}
@ -522,23 +553,22 @@ def api_push_ig():
# Start message
yield json.dumps({"type": "start", "message": f"Starting push for {package_name}#{version}..."}) + "\n"
# Extract resources from the package
# Extract resources from the main package
resources = []
with tarfile.open(tgz_path, "r:gz") as tar:
for member in tar.getmembers():
if member.name.startswith('package/') and member.name.endswith('.json'):
if member.name.startswith('package/') and member.name.endswith('.json') and not member.name.endswith('package.json'):
with tar.extractfile(member) as f:
resource_data = json.load(f)
if 'resourceType' in resource_data:
resources.append(resource_data)
# If include_dependencies is True, find and include dependencies
# If include_dependencies is True, fetch dependencies from metadata
pushed_packages = [f"{package_name}#{version}"]
if include_dependencies:
yield json.dumps({"type": "progress", "message": "Processing dependencies..."}) + "\n"
# Re-import to get dependencies (simulating dependency resolution)
import_result = services.import_package_and_dependencies(package_name, version)
dependencies = import_result.get('dependencies', [])
metadata = services.get_package_metadata(package_name, version)
dependencies = metadata.get('imported_dependencies', []) if metadata else []
for dep in dependencies:
dep_name = dep['name']
dep_version = dep['version']
@ -547,7 +577,7 @@ def api_push_ig():
if os.path.exists(dep_tgz_path):
with tarfile.open(dep_tgz_path, "r:gz") as tar:
for member in tar.getmembers():
if member.name.startswith('package/') and member.name.endswith('.json'):
if member.name.startswith('package/') and member.name.endswith('.json') and not member.name.endswith('package.json'):
with tar.extractfile(member) as f:
resource_data = json.load(f)
if 'resourceType' in resource_data:

Binary file not shown.

View File

@ -0,0 +1,31 @@
{
"package_name": "hl7.fhir.au.core",
"version": "1.1.0-preview",
"dependency_mode": "tree-shaking",
"imported_dependencies": [
{
"name": "hl7.fhir.r4.core",
"version": "4.0.1"
},
{
"name": "hl7.terminology.r4",
"version": "6.2.0"
},
{
"name": "hl7.fhir.uv.extensions.r4",
"version": "5.2.0"
},
{
"name": "hl7.fhir.au.base",
"version": "5.1.0-preview"
},
{
"name": "hl7.fhir.uv.smart-app-launch",
"version": "2.1.0"
},
{
"name": "hl7.fhir.uv.ipa",
"version": "1.0.0"
}
]
}

View File

@ -0,0 +1,6 @@
{
"package_name": "hl7.fhir.r4.core",
"version": "4.0.1",
"dependency_mode": "recursive",
"imported_dependencies": []
}

View File

@ -14,6 +14,7 @@ from collections import defaultdict
# Constants
FHIR_REGISTRY_BASE_URL = "https://packages.fhir.org"
DOWNLOAD_DIR_NAME = "fhir_packages"
CANONICAL_PACKAGE = ("hl7.fhir.r4.core", "4.0.1") # Define the canonical FHIR package
# --- Helper Functions ---
@ -22,33 +23,27 @@ def _get_download_dir():
logger = logging.getLogger(__name__)
instance_path = None # Initialize
try:
# --- FIX: Indent code inside try block ---
instance_path = current_app.instance_path
logger.debug(f"Using instance path from current_app: {instance_path}")
except RuntimeError:
# --- FIX: Indent code inside except block ---
logger.warning("No app context for instance_path, constructing relative path.")
instance_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'instance'))
logger.debug(f"Constructed instance path: {instance_path}")
# This part depends on instance_path being set above
if not instance_path:
logger.error("Fatal Error: Could not determine instance path.")
return None
download_dir = os.path.join(instance_path, DOWNLOAD_DIR_NAME)
try:
# --- FIX: Indent code inside try block ---
os.makedirs(download_dir, exist_ok=True)
return download_dir
except OSError as e:
# --- FIX: Indent code inside except block ---
logger.error(f"Fatal Error creating dir {download_dir}: {e}", exc_info=True)
return None
def sanitize_filename_part(text): # Public version
"""Basic sanitization for name/version parts of filename."""
# --- FIX: Indent function body ---
safe_text = "".join(c if c.isalnum() or c in ['.', '-'] else '_' for c in text)
safe_text = re.sub(r'_+', '_', safe_text) # Uses re
safe_text = safe_text.strip('_-.')
@ -56,12 +51,10 @@ def sanitize_filename_part(text): # Public version
def _construct_tgz_filename(name, version):
"""Constructs the standard filename using the sanitized parts."""
# --- FIX: Indent function body ---
return f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.tgz"
def find_and_extract_sd(tgz_path, resource_identifier): # Public version
"""Helper to find and extract SD json from a given tgz path by ID, Name, or Type."""
# --- FIX: Ensure consistent indentation ---
sd_data = None
found_path = None
logger = logging.getLogger(__name__)
@ -113,11 +106,54 @@ def find_and_extract_sd(tgz_path, resource_identifier): # Public version
raise
return sd_data, found_path
def save_package_metadata(name, version, dependency_mode, dependencies):
"""Saves the dependency mode and imported dependencies as metadata alongside the package."""
logger = logging.getLogger(__name__)
download_dir = _get_download_dir()
if not download_dir:
logger.error("Could not get download directory for metadata saving.")
return False
metadata = {
'package_name': name,
'version': version,
'dependency_mode': dependency_mode,
'imported_dependencies': dependencies # List of {'name': ..., 'version': ...}
}
metadata_filename = f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.metadata.json"
metadata_path = os.path.join(download_dir, metadata_filename)
try:
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Saved metadata for {name}#{version} at {metadata_path}")
return True
except Exception as e:
logger.error(f"Failed to save metadata for {name}#{version}: {e}")
return False
def get_package_metadata(name, version):
"""Retrieves the metadata for a given package."""
logger = logging.getLogger(__name__)
download_dir = _get_download_dir()
if not download_dir:
logger.error("Could not get download directory for metadata retrieval.")
return None
metadata_filename = f"{sanitize_filename_part(name)}-{sanitize_filename_part(version)}.metadata.json"
metadata_path = os.path.join(download_dir, metadata_filename)
if os.path.exists(metadata_path):
try:
with open(metadata_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to read metadata for {name}#{version}: {e}")
return None
return None
# --- Core Service Functions ---
def download_package(name, version):
""" Downloads a single FHIR package. Returns (save_path, error_message) """
# --- FIX: Ensure consistent indentation ---
logger = logging.getLogger(__name__)
download_dir = _get_download_dir()
if not download_dir:
@ -152,7 +188,6 @@ def download_package(name, version):
def extract_dependencies(tgz_path):
""" Extracts dependencies dict from package.json. Returns (dep_dict or None on error, error_message) """
# --- FIX: Ensure consistent indentation ---
logger = logging.getLogger(__name__)
package_json_path = "package/package.json"
dependencies = {}
@ -165,7 +200,7 @@ def extract_dependencies(tgz_path):
package_json_fileobj = tar.extractfile(package_json_member)
if package_json_fileobj:
try:
package_data = json.loads(package_json_fileobj.read().decode('utf-8-sig'))
package_data = json.load(package_json_fileobj)
dependencies = package_data.get('dependencies', {})
finally:
package_json_fileobj.close()
@ -182,23 +217,178 @@ def extract_dependencies(tgz_path):
error_message = f"Unexpected error extracting deps: {e}"; logger.error(error_message, exc_info=True); dependencies = None
return dependencies, error_message
# --- Recursive Import Orchestrator ---
def import_package_and_dependencies(initial_name, initial_version):
"""Orchestrates recursive download and dependency extraction."""
# --- FIX: Ensure consistent indentation ---
def extract_used_types(tgz_path):
""" Extracts all resource types and referenced types from the package to determine used dependencies. """
logger = logging.getLogger(__name__)
logger.info(f"Starting recursive import for {initial_name}#{initial_version}")
used_types = set()
try:
with tarfile.open(tgz_path, "r:gz") as tar:
for member in tar:
if not (member.isfile() and member.name.startswith('package/') and member.name.lower().endswith('.json')):
continue
if os.path.basename(member.name).lower() in ['package.json', '.index.json', 'validation-summary.json', 'validation-oo.json']:
continue
fileobj = None
try:
fileobj = tar.extractfile(member)
if fileobj:
content_bytes = fileobj.read()
content_string = content_bytes.decode('utf-8-sig')
data = json.loads(content_string)
resource_type = data.get('resourceType')
# Add the resource type itself
if resource_type:
used_types.add(resource_type)
# If this is a StructureDefinition, extract referenced types
if resource_type == 'StructureDefinition':
sd_type = data.get('type')
if sd_type:
used_types.add(sd_type)
# Extract types from elements
for element_list in [data.get('snapshot', {}).get('element', []), data.get('differential', {}).get('element', [])]:
for element in element_list:
if 'type' in element:
for t in element['type']:
if 'code' in t:
used_types.add(t['code'])
if 'targetProfile' in t:
for profile in t['targetProfile']:
type_name = profile.split('/')[-1]
used_types.add(type_name)
# If this is another resource (e.g., ValueSet, CodeSystem), extract referenced types
else:
# Look for meta.profile for referenced profiles
profiles = data.get('meta', {}).get('profile', [])
for profile in profiles:
type_name = profile.split('/')[-1]
used_types.add(type_name)
# For ValueSet, check compose.include.system
if resource_type == 'ValueSet':
for include in data.get('compose', {}).get('include', []):
system = include.get('system')
if system and system.startswith('http://hl7.org/fhir/'):
type_name = system.split('/')[-1]
used_types.add(type_name)
except Exception as e:
logger.warning(f"Could not process member {member.name} for used types: {e}")
finally:
if fileobj:
fileobj.close()
except Exception as e:
logger.error(f"Error extracting used types from {tgz_path}: {e}")
return used_types
def map_types_to_packages(used_types, all_dependencies):
""" Maps used types to the packages that provide them based on dependency lists. """
logger = logging.getLogger(__name__)
type_to_package = {}
for (pkg_name, pkg_version), deps in all_dependencies.items():
# Simplified mapping: assume package names indicate the types they provide
# In a real implementation, you'd need to inspect each package's contents
for dep_name, dep_version in deps.items():
# Heuristic: map types to packages based on package name
for t in used_types:
if t.lower() in dep_name.lower():
type_to_package[t] = (dep_name, dep_version)
# Special case for the package itself
for t in used_types:
if t.lower() in pkg_name.lower():
type_to_package[t] = (pkg_name, pkg_version)
# Fallback: map remaining types to the canonical package
for t in used_types:
if t not in type_to_package:
type_to_package[t] = CANONICAL_PACKAGE
return type_to_package
# --- Recursive Import Orchestrator ---
def import_package_and_dependencies(initial_name, initial_version, dependency_mode='recursive'):
"""Orchestrates recursive download and dependency extraction based on the dependency mode."""
logger = logging.getLogger(__name__)
logger.info(f"Starting import for {initial_name}#{initial_version} with dependency_mode={dependency_mode}")
results = {
'requested': (initial_name, initial_version),
'processed': set(),
'downloaded': {},
'all_dependencies': {},
'dependencies': [], # New field to store dependencies as a list
'dependencies': [], # Store dependencies as a list
'errors': []
}
pending_queue = [(initial_name, initial_version)]
processed_lookup = set()
# Always download the initial package
name, version = initial_name, initial_version
package_id_tuple = (name, version)
logger.info(f"Processing initial package: {name}#{version}")
processed_lookup.add(package_id_tuple)
save_path, dl_error = download_package(name, version)
if dl_error:
error_msg = f"Download failed for {name}#{version}: {dl_error}"
results['errors'].append(error_msg)
logger.error("Aborting import: Initial package download failed.")
return results
else:
results['downloaded'][package_id_tuple] = save_path
dependencies, dep_error = extract_dependencies(save_path)
if dep_error:
results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}")
elif dependencies is not None:
results['all_dependencies'][package_id_tuple] = dependencies
results['processed'].add(package_id_tuple)
logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}")
for dep_name, dep_version in dependencies.items():
if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version:
results['dependencies'].append({"name": dep_name, "version": dep_version})
# Save metadata for the initial package
save_package_metadata(initial_name, initial_version, dependency_mode, results['dependencies'])
# Handle dependency pulling based on mode
if dependency_mode == 'recursive':
# Current behavior: recursively download all dependencies
for dep in results['dependencies']:
dep_name, dep_version = dep['name'], dep['version']
dep_tuple = (dep_name, dep_version)
if dep_tuple not in processed_lookup:
pending_queue.append(dep_tuple)
logger.debug(f"Added to queue (recursive): {dep_name}#{dep_version}")
elif dependency_mode == 'patch-canonical':
# Patch Canonical: Only download the canonical package if needed
canonical_name, canonical_version = CANONICAL_PACKAGE
canonical_tuple = (canonical_name, canonical_version)
if canonical_tuple not in processed_lookup:
pending_queue.append(canonical_tuple)
logger.debug(f"Added canonical package to queue: {canonical_name}#{canonical_version}")
elif dependency_mode == 'tree-shaking':
# Tree Shaking: Analyze the initial package to determine used types
used_types = extract_used_types(save_path)
logger.debug(f"Used types in {initial_name}#{initial_version}: {used_types}")
# Map used types to packages
type_to_package = map_types_to_packages(used_types, results['all_dependencies'])
logger.debug(f"Type to package mapping: {type_to_package}")
# Add only the necessary packages to the queue
for t, (dep_name, dep_version) in type_to_package.items():
dep_tuple = (dep_name, dep_version)
if dep_tuple not in processed_lookup and dep_tuple != package_id_tuple:
pending_queue.append(dep_tuple)
logger.debug(f"Added to queue (tree-shaking): {dep_name}#{dep_version}")
# Process the queue
while pending_queue:
name, version = pending_queue.pop(0)
package_id_tuple = (name, version)
@ -214,14 +404,9 @@ def import_package_and_dependencies(initial_name, initial_version):
if dl_error:
error_msg = f"Download failed for {name}#{version}: {dl_error}"
results['errors'].append(error_msg)
if package_id_tuple == results['requested']:
logger.error("Aborting import: Initial package download failed.")
break
else:
continue
else: # Download OK
continue
else:
results['downloaded'][package_id_tuple] = save_path
# --- Correctly indented block ---
dependencies, dep_error = extract_dependencies(save_path)
if dep_error:
results['errors'].append(f"Dependency extraction failed for {name}#{version}: {dep_error}")
@ -229,18 +414,15 @@ def import_package_and_dependencies(initial_name, initial_version):
results['all_dependencies'][package_id_tuple] = dependencies
results['processed'].add(package_id_tuple)
logger.debug(f"Dependencies for {name}#{version}: {list(dependencies.keys())}")
# Add dependencies to the new 'dependencies' list
# Add dependencies to the list
for dep_name, dep_version in dependencies.items():
if isinstance(dep_name, str) and isinstance(dep_version, str) and dep_name and dep_version:
dep_tuple = (dep_name, dep_version)
results['dependencies'].append({"name": dep_name, "version": dep_version})
if dep_tuple not in processed_lookup:
if dep_tuple not in pending_queue:
pending_queue.append(dep_tuple)
logger.debug(f"Added to queue: {dep_name}#{dep_version}")
else:
logger.warning(f"Skipping invalid dependency '{dep_name}': '{dep_version}' in {name}#{version}")
# --- End Correctly indented block ---
# For recursive mode, add to queue
if dependency_mode == 'recursive' and dep_tuple not in processed_lookup:
pending_queue.append(dep_tuple)
logger.debug(f"Added to queue: {dep_name}#{dep_version}")
proc_count=len(results['processed']); dl_count=len(results['downloaded']); err_count=len(results['errors'])
logger.info(f"Import finished. Processed: {proc_count}, Downloaded/Verified: {dl_count}, Errors: {err_count}")

View File

@ -35,14 +35,26 @@
</nav>
<main class="container flex-grow-1">
<!-- Flashed Messages Section -->
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category or 'info' }} alert-dismissible fade show" role="alert">
{{ message }}
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
</div>
{% endfor %}
<div class="mt-3">
{% for category, message in messages %}
<div class="alert
{{ 'alert-danger' if category == 'error' else
'alert-success' if category == 'success' else
'alert-info' }}
alert-dismissible fade show" role="alert">
<!-- Add an icon based on the category -->
<i class="bi
{{ 'bi-exclamation-triangle-fill me-2' if category == 'error' else
'bi-check-circle-fill me-2' if category == 'success' else
'bi-info-circle-fill me-2' }}"></i>
{{ message }}
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
</div>
{% endfor %}
</div>
{% endif %}
{% endwith %}
{% block content %}{% endblock %}

View File

@ -12,7 +12,9 @@
<tr>
<th>Package Name</th>
<th>Version</th>
<!--------------------------------------------------------------------------------------------------------------if you unhide buttons unhide this--------------------------------------------
<th>Actions</th>
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
</tr>
</thead>
<tbody>
@ -25,6 +27,7 @@
<tr {% if duplicate_group %}class="{{ group_colors[name] }}"{% endif %}>
<td>{{ name }}</td>
<td>{{ version }}</td>
<!--------------------------------------------------------------------------------------------------------------Dont need the buttons here--------------------------------------------------
<td>
{% if not processed %}
<form method="POST" action="{{ url_for('process_ig') }}" style="display:inline;">
@ -37,6 +40,7 @@
<button type="submit" class="btn btn-danger btn-sm">Delete</button>
</form>
</td>
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
</tr>
{% endfor %}
</tbody>
@ -70,6 +74,10 @@
{% endfor %}
</select>
</div>
<div class="mb-3">
<label for="dependencyMode" class="form-label">Dependency Mode Used During Import</label>
<input type="text" class="form-control" id="dependencyMode" readonly>
</div>
<div class="mb-3">
<label for="fhirServerUrl" class="form-label">FHIR Server URL</label>
<input type="url" class="form-control" id="fhirServerUrl" name="fhir_server_url" placeholder="e.g., http://hapi.fhir.org/baseR4" required>
@ -104,6 +112,39 @@
</div>
<script>
document.addEventListener('DOMContentLoaded', function() {
const packageSelect = document.getElementById('packageSelect');
const dependencyModeField = document.getElementById('dependencyMode');
// Update dependency mode when package selection changes
packageSelect.addEventListener('change', function() {
const packageId = this.value;
if (packageId) {
const [packageName, version] = packageId.split('#');
fetch(`/get-package-metadata?package_name=${packageName}&version=${version}`)
.then(response => response.json())
.then(data => {
if (data.dependency_mode) {
dependencyModeField.value = data.dependency_mode;
} else {
dependencyModeField.value = 'Unknown';
}
})
.catch(error => {
console.error('Error fetching metadata:', error);
dependencyModeField.value = 'Error';
});
} else {
dependencyModeField.value = '';
}
});
// Trigger change event on page load if a package is pre-selected
if (packageSelect.value) {
packageSelect.dispatchEvent(new Event('change'));
}
});
document.getElementById('pushIgForm').addEventListener('submit', async function(event) {
event.preventDefault();

View File

@ -28,6 +28,15 @@
{{ form.hidden_tag() }}
{{ render_field(form.package_name) }}
{{ render_field(form.package_version) }}
<!-- Dependency Pulling Mode Toggle -->
<div class="mb-3">
<label for="dependency_mode" class="form-label">Dependency Pulling Mode:</label>
<select class="form-select" id="dependency_mode" name="dependency_mode">
<option value="recursive" selected>Current Recursive</option>
<option value="patch-canonical">Patch Canonical Versions</option>
<option value="tree-shaking">Tree Shaking (Only Used Dependencies)</option>
</select>
</div>
<div class="d-grid gap-2 d-sm-flex">
{{ form.submit(class="btn btn-success") }}
<a href="{{ url_for('index') }}" class="btn btn-secondary">Back</a>

View File

@ -21,12 +21,13 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
app.config['TESTING'] = True
app.config['WTF_CSRF_ENABLED'] = False # Disable CSRF for testing
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
app.config['FHIR_PACKAGES_DIR'] = 'test_packages'
app.config['FHIR_PACKAGES_DIR'] = os.path.abspath('test_packages') # Use absolute path
app.config['SECRET_KEY'] = 'test-secret-key'
app.config['API_KEY'] = 'test-api-key'
# Create the test packages directory
os.makedirs(app.config['FHIR_PACKAGES_DIR'], exist_ok=True)
os.makedirs(app.config['FHIR_PACKAGES_DIR'], exist_ok=True, mode=0o777)
os.chmod(app.config['FHIR_PACKAGES_DIR'], 0o777)
# Create the Flask test client
self.client = app.test_client()
@ -44,16 +45,37 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
shutil.rmtree(app.config['FHIR_PACKAGES_DIR'])
# Helper method to create a mock .tgz file
def create_mock_tgz(self, filename, content=None):
def create_mock_tgz(self, filename, content=None, append_files=None):
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], filename)
with tarfile.open(tgz_path, "w:gz") as tar:
if content:
# Create a mock package.json file inside the .tgz
import io
package_json = io.BytesIO(json.dumps(content).encode('utf-8'))
tarinfo = tarfile.TarInfo(name="package/package.json")
tarinfo.size = len(package_json.getvalue())
tar.addfile(tarinfo, package_json)
temp_tgz_path = tgz_path + '.tmp'
# If the file exists and we're appending, copy existing contents
if append_files and os.path.exists(tgz_path):
with tarfile.open(tgz_path, "r:gz") as existing_tar, tarfile.open(temp_tgz_path, "w:gz") as new_tar:
# Copy existing files
for member in existing_tar.getmembers():
file_obj = existing_tar.extractfile(member)
new_tar.addfile(member, file_obj)
# Append new files
for name, data in append_files.items():
data_bytes = json.dumps(data).encode('utf-8')
import io
file_io = io.BytesIO(data_bytes)
tarinfo = tarfile.TarInfo(name=name)
tarinfo.size = len(data_bytes)
new_tar.addfile(tarinfo, file_io)
# Replace the original file with the new one
os.replace(temp_tgz_path, tgz_path)
else:
# Create a new tar file
with tarfile.open(tgz_path, "w:gz") as tar:
if content:
# Create a mock package.json file inside the .tgz
import io
package_json = io.BytesIO(json.dumps(content).encode('utf-8'))
tarinfo = tarfile.TarInfo(name="package/package.json")
tarinfo.size = len(package_json.getvalue())
tar.addfile(tarinfo, package_json)
return tgz_path
# Test Case 1: Test Homepage Rendering
@ -69,53 +91,106 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertIn(b'Import IG', response.data)
self.assertIn(b'Package Name', response.data)
self.assertIn(b'Package Version', response.data)
self.assertIn(b'Dependency Pulling Mode', response.data)
# Test Case 3: Test Import IG Form Submission (Success)
# Test Case 3: Test Import IG Form Submission - Recursive Mode (Success)
@patch('services.import_package_and_dependencies')
def test_import_ig_success(self, mock_import):
def test_import_ig_recursive_success(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}]
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'},
{'name': 'hl7.fhir.extensions', 'version': '1.0.0'}
]
}
response = self.client.post('/import-ig', data={
'package_name': 'hl7.fhir.us.core',
'package_version': '3.1.1'
'package_version': '3.1.1',
'dependency_mode': 'recursive'
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.assertIn(b'Successfully downloaded hl7.fhir.us.core#3.1.1', response.data)
self.assertIn(b'Successfully downloaded hl7.fhir.us.core#3.1.1 and dependencies! Mode: recursive', response.data)
# Test Case 4: Test Import IG Form Submission (Failure)
# Test Case 4: Test Import IG Form Submission - Patch Canonical Mode (Success)
@patch('services.import_package_and_dependencies')
def test_import_ig_patch_canonical_success(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}
]
}
response = self.client.post('/import-ig', data={
'package_name': 'hl7.fhir.us.core',
'package_version': '3.1.1',
'dependency_mode': 'patch-canonical'
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.assertIn(b'Successfully downloaded hl7.fhir.us.core#3.1.1 and dependencies! Mode: patch-canonical', response.data)
# Test Case 5: Test Import IG Form Submission - Tree Shaking Mode (Success)
@patch('services.import_package_and_dependencies')
def test_import_ig_tree_shaking_success(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}
]
}
response = self.client.post('/import-ig', data={
'package_name': 'hl7.fhir.us.core',
'package_version': '3.1.1',
'dependency_mode': 'tree-shaking'
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.assertIn(b'Successfully downloaded hl7.fhir.us.core#3.1.1 and dependencies! Mode: tree-shaking', response.data)
# Test Case 6: Test Import IG Form Submission (Failure)
@patch('services.import_package_and_dependencies')
def test_import_ig_failure(self, mock_import):
mock_import.return_value = {
'downloaded': False,
'errors': ['Package not found'],
'errors': ['Package not found: 404 Client Error: Not Found'],
'dependencies': []
}
response = self.client.post('/import-ig', data={
'package_name': 'invalid.package',
'package_version': '1.0.0'
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.assertIn(b'Failed to import invalid.package#1.0.0', response.data)
with app.test_request_context():
# Generate a CSRF token
from flask_wtf.csrf import generate_csrf
csrf_token = generate_csrf()
response = self.client.post('/import-ig', data={
'package_name': 'invalid.package',
'package_version': '1.0.0',
'dependency_mode': 'recursive',
'csrf_token': csrf_token
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
print(f"Response data: {response.data.decode('utf-8')}")
self.assertIn(b'Failed to import invalid.package#1.0.0: Package not found: 404 Client Error: Not Found', response.data)
# Test Case 5: Test Import IG Form Submission (Invalid Input)
# Test Case 7: Test Import IG Form Submission (Invalid Input)
def test_import_ig_invalid_input(self):
response = self.client.post('/import-ig', data={
'package_name': 'invalid@package', # Invalid format
'package_version': '1.0.0'
'package_version': '1.0.0',
'dependency_mode': 'recursive'
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.assertIn(b'Invalid package name format', response.data)
# Test Case 6: Test View IGs Page Rendering (No Packages)
# Test Case 8: Test View IGs Page Rendering (No Packages)
def test_view_igs_no_packages(self):
# Ensure the packages directory is empty
for filename in os.listdir(app.config['FHIR_PACKAGES_DIR']):
os.remove(os.path.join(app.config['FHIR_PACKAGES_DIR'], filename))
response = self.client.get('/view-igs')
self.assertEqual(response.status_code, 200)
print(f"Response data: {response.data.decode('utf-8')}")
self.assertIn(b'No packages downloaded yet', response.data)
# Test Case 7: Test View IGs Page Rendering (With Packages)
# Test Case 9: Test View IGs Page Rendering (With Packages)
def test_view_igs_with_packages(self):
# Create a mock .tgz file
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz', {
@ -127,7 +202,7 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertIn(b'hl7.fhir.us.core', response.data)
self.assertIn(b'3.1.1', response.data)
# Test Case 8: Test Process IG (Success)
# Test Case 10: Test Process IG (Success)
@patch('services.process_package_file')
def test_process_ig_success(self, mock_process):
mock_process.return_value = {
@ -142,7 +217,7 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'Successfully processed hl7.fhir.us.core#3.1.1', response.data)
# Test Case 9: Test Process IG (Invalid File)
# Test Case 11: Test Process IG (Invalid File)
def test_process_ig_invalid_file(self):
response = self.client.post('/process-igs', data={
'filename': 'invalid-file.txt'
@ -150,17 +225,27 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'Invalid package file', response.data)
# Test Case 10: Test Delete IG (Success)
# Test Case 12: Test Delete IG (Success)
def test_delete_ig_success(self):
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz')
# Create a mock metadata file
metadata_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.metadata.json')
with open(metadata_path, 'w') as f:
json.dump({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'dependency_mode': 'recursive',
'imported_dependencies': [{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}]
}, f)
response = self.client.post('/delete-ig', data={
'filename': 'hl7.fhir.us.core-3.1.1.tgz'
}, follow_redirects=True)
self.assertEqual(response.status_code, 200)
self.assertIn(b'Deleted hl7.fhir.us.core-3.1.1.tgz', response.data)
self.assertFalse(os.path.exists(os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.tgz')))
self.assertFalse(os.path.exists(metadata_path)) # Verify metadata file is deleted
# Test Case 11: Test Delete IG (File Not Found)
# Test Case 13: Test Delete IG (File Not Found)
def test_delete_ig_file_not_found(self):
response = self.client.post('/delete-ig', data={
'filename': 'nonexistent.tgz'
@ -168,7 +253,7 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'File not found: nonexistent.tgz', response.data)
# Test Case 12: Test Unload IG (Success)
# Test Case 14: Test Unload IG (Success)
def test_unload_ig_success(self):
with app.app_context():
processed_ig = ProcessedIg(
@ -187,7 +272,7 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'Unloaded hl7.fhir.us.core#3.1.1', response.data)
# Test Case 13: Test Unload IG (Invalid ID)
# Test Case 15: Test Unload IG (Invalid ID)
def test_unload_ig_invalid_id(self):
response = self.client.post('/unload-ig', data={
'ig_id': '999'
@ -195,7 +280,7 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'Package not found with ID: 999', response.data)
# Test Case 14: Test View Processed IG Page
# Test Case 16: Test View Processed IG Page
def test_view_processed_ig(self):
with app.app_context():
processed_ig = ProcessedIg(
@ -212,26 +297,30 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'View hl7.fhir.us.core#3.1.1', response.data)
# Test Case 15: Test Push IGs Page Rendering
# Test Case 17: Test Push IGs Page Rendering
def test_push_igs_page(self):
response = self.client.get('/push-igs')
self.assertEqual(response.status_code, 200)
self.assertIn(b'Push IGs to FHIR Server', response.data)
self.assertIn(b'Live Console', response.data)
# Test Case 16: Test API - Import IG (Success)
# Test Case 18: Test API - Import IG Recursive Mode (Success)
@patch('services.import_package_and_dependencies')
def test_api_import_ig_success(self, mock_import):
def test_api_import_ig_recursive_success(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}]
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'},
{'name': 'hl7.fhir.extensions', 'version': '1.0.0'}
]
}
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'api_key': 'test-api-key'
'api_key': 'test-api-key',
'dependency_mode': 'recursive'
}),
content_type='application/json'
)
@ -239,14 +328,84 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
data = json.loads(response.data)
self.assertEqual(data['status'], 'success')
self.assertEqual(data['package_name'], 'hl7.fhir.us.core')
self.assertEqual(data['dependency_mode'], 'recursive')
self.assertEqual(len(data['dependencies']), 2) # Expect multiple dependencies
# Test Case 17: Test API - Import IG (Invalid API Key)
# Test Case 19: Test API - Import IG Patch Canonical Mode (Success)
@patch('services.import_package_and_dependencies')
def test_api_import_ig_patch_canonical_success(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}
]
}
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'api_key': 'test-api-key',
'dependency_mode': 'patch-canonical'
}),
content_type='application/json'
)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['status'], 'success')
self.assertEqual(data['package_name'], 'hl7.fhir.us.core')
self.assertEqual(data['dependency_mode'], 'patch-canonical')
self.assertEqual(len(data['dependencies']), 1) # Expect only canonical dependency
# Test Case 20: Test API - Import IG Tree Shaking Mode (Success)
@patch('services.import_package_and_dependencies')
def test_api_import_ig_tree_shaking_success(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}
]
}
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'api_key': 'test-api-key',
'dependency_mode': 'tree-shaking'
}),
content_type='application/json'
)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['status'], 'success')
self.assertEqual(data['package_name'], 'hl7.fhir.us.core')
self.assertEqual(data['dependency_mode'], 'tree-shaking')
self.assertEqual(len(data['dependencies']), 1) # Expect reduced dependencies
# Test Case 21: Test API - Import IG Invalid Dependency Mode
def test_api_import_ig_invalid_dependency_mode(self):
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'api_key': 'test-api-key',
'dependency_mode': 'invalid-mode'
}),
content_type='application/json'
)
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertEqual(data['message'], "Invalid dependency mode: invalid-mode. Must be one of ['recursive', 'patch-canonical', 'tree-shaking']")
# Test Case 22: Test API - Import IG (Invalid API Key)
def test_api_import_ig_invalid_api_key(self):
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'api_key': 'wrong-api-key'
'api_key': 'wrong-api-key',
'dependency_mode': 'recursive'
}),
content_type='application/json'
)
@ -254,12 +413,13 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
data = json.loads(response.data)
self.assertEqual(data['message'], 'Invalid API key')
# Test Case 18: Test API - Import IG (Missing Parameters)
# Test Case 23: Test API - Import IG (Missing Parameters)
def test_api_import_ig_missing_params(self):
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'api_key': 'test-api-key'
'api_key': 'test-api-key',
'dependency_mode': 'recursive'
}),
content_type='application/json'
)
@ -267,9 +427,9 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
data = json.loads(response.data)
self.assertEqual(data['message'], 'Missing package_name or version')
# Test Case 19: Test API - Push IG (Success)
# Test Case 24: Test API - Push IG (Success, No Dependencies)
@patch('requests.put')
def test_api_push_ig_success(self, mock_put):
def test_api_push_ig_success_no_dependencies(self, mock_put):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status.return_value = None
@ -279,17 +439,22 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
'name': 'hl7.fhir.us.core',
'version': '3.1.1'
})
# Add a mock resource file
with tarfile.open(os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.tgz'), "a:gz") as tar:
resource_data = json.dumps({
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz', append_files={
"package/Patient-example.json": {
'resourceType': 'Patient',
'id': 'example'
}).encode('utf-8')
import io
resource_file = io.BytesIO(resource_data)
tarinfo = tarfile.TarInfo(name="package/Patient-example.json")
tarinfo.size = len(resource_data)
tar.addfile(tarinfo, resource_file)
}
})
# Create a mock metadata file
metadata_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.metadata.json')
with open(metadata_path, 'w') as f:
json.dump({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'dependency_mode': 'recursive',
'imported_dependencies': [{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}]
}, f)
response = self.client.post('/api/push-ig',
data=json.dumps({
@ -307,8 +472,9 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertIn('"type": "start"', response_text)
self.assertIn('"type": "success"', response_text)
self.assertIn('"status": "success"', response_text)
self.assertIn('"pushed_packages": ["hl7.fhir.us.core#3.1.1"]', response_text)
# Test Case 20: Test API - Push IG (Invalid API Key)
# Test Case 25: Test API - Push IG (Invalid API Key)
def test_api_push_ig_invalid_api_key(self):
response = self.client.post('/api/push-ig',
data=json.dumps({
@ -325,7 +491,7 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
data = json.loads(response.data)
self.assertEqual(data['message'], 'Invalid API key')
# Test Case 21: Test API - Push IG (Package Not Found)
# Test Case 26: Test API - Push IG (Package Not Found)
def test_api_push_ig_package_not_found(self):
response = self.client.post('/api/push-ig',
data=json.dumps({
@ -342,17 +508,33 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
data = json.loads(response.data)
self.assertEqual(data['message'], 'Package not found: hl7.fhir.us.core#3.1.1')
# Test Case 22: Test Secret Key - CSRF Protection
# Test Case 27: Test Secret Key - CSRF Protection
def test_secret_key_csrf(self):
# Re-enable CSRF for this test
app.config['WTF_CSRF_ENABLED'] = True
from flask_wtf import FlaskForm
from wtforms import StringField, SelectField, SubmitField
class ImportIGForm(FlaskForm):
package_name = StringField('Package Name')
package_version = StringField('Package Version')
dependency_mode = SelectField('Dependency Pulling Mode', choices=[
('recursive', 'Current Recursive'),
('patch-canonical', 'Patch Canonical Versions'),
('tree-shaking', 'Tree Shaking for Only Used Dependencies')
])
submit = SubmitField('Fetch & Download IG')
# Do not include the CSRF token to trigger the error
response = self.client.post('/import-ig', data={
'package_name': 'hl7.fhir.us.core',
'package_version': '3.1.1'
'package_version': '3.1.1',
'dependency_mode': 'recursive'
}, follow_redirects=True)
print(f"Response status: {response.status_code}")
print(f"Response data: {response.data.decode('utf-8')}")
self.assertEqual(response.status_code, 400) # CSRF token missing
# Test Case 23: Test Secret Key - Flash Messages
# Test Case 28: Test Secret Key - Flash Messages
def test_secret_key_flash_messages(self):
# Set a flash message
with self.client as client:
@ -362,53 +544,52 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn(b'Test message', response.data)
# Test Case 24: Test Get Structure Definition (Success)
# Test Case 29: Test Get Structure Definition (Success)
def test_get_structure_definition_success(self):
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz')
with tarfile.open(os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.tgz'), "a:gz") as tar:
sd_data = json.dumps({
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz', append_files={
"package/StructureDefinition-us-core-patient.json": {
'snapshot': {'element': [{'id': 'Patient.name'}]}
}).encode('utf-8')
import io
sd_file = io.BytesIO(sd_data)
tarinfo = tarfile.TarInfo(name="package/StructureDefinition-Patient.json")
tarinfo.size = len(sd_data)
tar.addfile(tarinfo, sd_file)
}
})
tgz_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.tgz')
print(f"Checking contents of {tgz_path}")
with tarfile.open(tgz_path, "r:gz") as tar:
print(f"Tar contents: {[member.name for member in tar.getmembers()]}")
response = self.client.get('/get-structure?package_name=hl7.fhir.us.core&package_version=3.1.1&resource_type=Patient')
print(f"Response status: {response.status_code}")
print(f"Response data: {response.data.decode('utf-8')}")
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertIn('elements', data)
self.assertEqual(data['elements'][0]['id'], 'Patient.name')
# Test Case 25: Test Get Structure Definition (Not Found)
# Test Case 30: Test Get Structure Definition (Not Found)
def test_get_structure_definition_not_found(self):
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz')
response = self.client.get('/get-structure?package_name=hl7.fhir.us.core&package_version=3.1.1&resource_type=Observation')
self.assertEqual(response.status_code, 404)
data = json.loads(response.data)
self.assertEqual(data['error'], "SD for 'Observation' not found.")
self.assertEqual(data['error'], "SD for 'Observation' not found in hl7.fhir.us.core#3.1.1, and core package hl7.fhir.r4.core#4.0.1 could not be located.")
# Test Case 26: Test Get Example Content (Success)
# Test Case 31: Test Get Example Content (Success)
def test_get_example_content_success(self):
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz')
with tarfile.open(os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.tgz'), "a:gz") as tar:
example_data = json.dumps({
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz', append_files={
"package/example-Patient.json": {
'resourceType': 'Patient',
'id': 'example'
}).encode('utf-8')
import io
example_file = io.BytesIO(example_data)
tarinfo = tarfile.TarInfo(name="package/example-Patient.json")
tarinfo.size = len(example_data)
tar.addfile(tarinfo, example_file)
}
})
response = self.client.get('/get-example?package_name=hl7.fhir.us.core&package_version=3.1.1&filename=package/example-Patient.json')
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['resourceType'], 'Patient')
# Test Case 27: Test Get Example Content (Invalid Path)
# Test Case 32: Test Get Example Content (Invalid Path)
def test_get_example_content_invalid_path(self):
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz')
response = self.client.get('/get-example?package_name=hl7.fhir.us.core&package_version=3.1.1&filename=invalid/example.json')
@ -416,5 +597,139 @@ class TestFHIRFlareIGToolkit(unittest.TestCase):
data = json.loads(response.data)
self.assertEqual(data['error'], 'Invalid example file path.')
# Test Case 33: Test Metadata Storage After Import
@patch('services.import_package_and_dependencies')
def test_metadata_storage_after_import(self, mock_import):
mock_import.return_value = {
'downloaded': True,
'errors': [],
'dependencies': [
{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'},
{'name': 'hl7.fhir.extensions', 'version': '1.0.0'}
]
}
response = self.client.post('/api/import-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'api_key': 'test-api-key',
'dependency_mode': 'recursive'
}),
content_type='application/json'
)
self.assertEqual(response.status_code, 200)
metadata_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.metadata.json')
print(f"Checking for metadata file at: {metadata_path}")
print(f"File exists: {os.path.exists(metadata_path)}")
self.assertTrue(os.path.exists(metadata_path))
with open(metadata_path, 'r') as f:
metadata = json.load(f)
self.assertEqual(metadata['dependency_mode'], 'recursive')
self.assertEqual(len(metadata['imported_dependencies']), 2)
self.assertEqual(metadata['imported_dependencies'][0]['name'], 'hl7.fhir.r4.core')
# Test Case 34: Test Get Package Metadata (Success)
def test_get_package_metadata_success(self):
print(f"FHIR_PACKAGES_DIR: {app.config['FHIR_PACKAGES_DIR']}")
metadata_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.metadata.json')
print(f"Creating metadata file at: {metadata_path}")
try:
with open(metadata_path, 'w') as f:
json.dump({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'dependency_mode': 'tree-shaking',
'imported_dependencies': [{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}]
}, f)
print(f"File exists after creation: {os.path.exists(metadata_path)}")
print(f"File permissions: {oct(os.stat(metadata_path).st_mode & 0o777)}")
except Exception as e:
print(f"Error creating metadata file: {e}")
raise
response = self.client.get('/get-package-metadata?package_name=hl7.fhir.us.core&version=3.1.1')
print(f"File exists after request: {os.path.exists(metadata_path)}")
print(f"Response status: {response.status_code}")
print(f"Response data: {response.data.decode('utf-8')}")
self.assertEqual(response.status_code, 200)
data = json.loads(response.data)
self.assertEqual(data['dependency_mode'], 'tree-shaking')
# Test Case 35: Test Get Package Metadata (Not Found)
def test_get_package_metadata_not_found(self):
response = self.client.get('/get-package-metadata?package_name=hl7.fhir.us.core&version=3.1.1')
self.assertEqual(response.status_code, 404)
data = json.loads(response.data)
self.assertEqual(data['error'], 'Metadata not found')
# Test Case 36: Test Get Package Metadata (Missing Parameters)
def test_get_package_metadata_missing_params(self):
response = self.client.get('/get-package-metadata?package_name=hl7.fhir.us.core')
self.assertEqual(response.status_code, 400)
data = json.loads(response.data)
self.assertEqual(data['error'], 'Missing package_name or version')
# Test Case 37: Test API - Push IG with Dependencies (Success)
@patch('requests.put')
def test_api_push_ig_with_dependencies_success(self, mock_put):
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.raise_for_status.return_value = None
mock_put.return_value = mock_response
# Create the main package
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz', {
'name': 'hl7.fhir.us.core',
'version': '3.1.1'
})
self.create_mock_tgz('hl7.fhir.us.core-3.1.1.tgz', append_files={
"package/Patient-example.json": {
'resourceType': 'Patient',
'id': 'example'
}
})
# Create a dependency package
self.create_mock_tgz('hl7.fhir.r4.core-4.0.1.tgz', {
'name': 'hl7.fhir.r4.core',
'version': '4.0.1'
})
self.create_mock_tgz('hl7.fhir.r4.core-4.0.1.tgz', append_files={
"package/Observation-example.json": {
'resourceType': 'Observation',
'id': 'example'
}
})
# Create a mock metadata file
metadata_path = os.path.join(app.config['FHIR_PACKAGES_DIR'], 'hl7.fhir.us.core-3.1.1.metadata.json')
with open(metadata_path, 'w') as f:
json.dump({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'dependency_mode': 'patch-canonical',
'imported_dependencies': [{'name': 'hl7.fhir.r4.core', 'version': '4.0.1'}]
}, f)
response = self.client.post('/api/push-ig',
data=json.dumps({
'package_name': 'hl7.fhir.us.core',
'version': '3.1.1',
'fhir_server_url': 'http://test-server/fhir',
'include_dependencies': True,
'api_key': 'test-api-key'
}),
content_type='application/json',
headers={'Accept': 'application/x-ndjson'}
)
self.assertEqual(response.status_code, 200)
response_text = response.data.decode('utf-8')
print(f"Response text: {response_text}")
self.assertIn('"type": "start"', response_text)
self.assertIn('"type": "progress"', response_text)
self.assertIn('"message": "Added dependency hl7.fhir.r4.core#4.0.1"', response_text)
self.assertIn('"status": "success"', response_text)
self.assertIn('"pushed_packages": ["hl7.fhir.us.core#3.1.1", "hl7.fhir.r4.core#4.0.1"]', response_text)
self.assertIn('"success_count": 2', response_text) # One resource from each package
if __name__ == '__main__':
unittest.main()