New Feature - Manual Upload IG file / or from URL.

this allows users to use IG's tat are not yet Published to registries
This commit is contained in:
Joshua Hare 2025-06-22 22:36:28 +10:00
parent 847a5b2a6d
commit d38e29160c
5 changed files with 544 additions and 3 deletions

73
app.py
View File

@ -38,9 +38,10 @@ from services import (
HAS_PACKAGING_LIB,
pkg_version,
get_package_description,
safe_parse_version
safe_parse_version,
import_manual_package_and_dependencies
)
from forms import IgImportForm, ValidationForm, FSHConverterForm, TestDataUploadForm, RetrieveSplitDataForm
from forms import IgImportForm, ManualIgImportForm, ValidationForm, FSHConverterForm, TestDataUploadForm, RetrieveSplitDataForm
from wtforms import SubmitField
from package import package_bp
from flasgger import Swagger, swag_from # Import Flasgger
@ -518,6 +519,74 @@ def restart_tomcat():
def config_hapi():
return render_template('config_hapi.html', site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
@app.route('/manual-import-ig', methods=['GET', 'POST'])
def manual_import_ig():
"""
Handle manual import of FHIR Implementation Guides using file or URL uploads.
Uses ManualIgImportForm to support file and URL inputs without registry option.
"""
form = ManualIgImportForm()
is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.headers.get('HX-Request') == 'true'
if form.validate_on_submit():
import_mode = form.import_mode.data
dependency_mode = form.dependency_mode.data
resolve_dependencies = form.resolve_dependencies.data
while not log_queue.empty():
log_queue.get()
try:
if import_mode == 'file':
tgz_file = form.tgz_file.data
temp_dir = tempfile.mkdtemp()
temp_path = os.path.join(temp_dir, secure_filename(tgz_file.filename))
tgz_file.save(temp_path)
result = import_manual_package_and_dependencies(temp_path, dependency_mode=dependency_mode, is_file=True, resolve_dependencies=resolve_dependencies)
identifier = result.get('requested', tgz_file.filename)
shutil.rmtree(temp_dir, ignore_errors=True)
elif import_mode == 'url':
tgz_url = form.tgz_url.data
result = import_manual_package_and_dependencies(tgz_url, dependency_mode=dependency_mode, is_url=True, resolve_dependencies=resolve_dependencies)
identifier = result.get('requested', tgz_url)
if result['errors'] and not result['downloaded']:
error_msg = result['errors'][0]
simplified_msg = error_msg
if "HTTP error" in error_msg and "404" in error_msg:
simplified_msg = "Package not found (404). Check input."
elif "HTTP error" in error_msg:
simplified_msg = f"Error: {error_msg.split(': ', 1)[-1]}"
elif "Connection error" in error_msg:
simplified_msg = "Could not connect to source."
flash(f"Failed to import {identifier}: {simplified_msg}", "error")
logger.error(f"Manual import failed for {identifier}: {error_msg}")
if is_ajax:
return jsonify({"status": "error", "message": simplified_msg}), 400
return render_template('manual_import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
else:
if result['errors']:
flash(f"Partially imported {identifier} with errors. Check logs.", "warning")
for err in result['errors']:
logger.warning(f"Manual import warning for {identifier}: {err}")
else:
flash(f"Successfully imported {identifier}! Mode: {dependency_mode}", "success")
if is_ajax:
return jsonify({"status": "success", "message": f"Imported {identifier}", "redirect": url_for('view_igs')}), 200
return redirect(url_for('view_igs'))
except Exception as e:
logger.error(f"Unexpected error during manual IG import: {str(e)}", exc_info=True)
flash(f"An unexpected error occurred: {str(e)}", "error")
if is_ajax:
return jsonify({"status": "error", "message": str(e)}), 500
return render_template('manual_import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
else:
for field, errors in form.errors.items():
for error in errors:
flash(f"Error in {getattr(form, field).label.text}: {error}", "danger")
if is_ajax:
return jsonify({"status": "error", "message": "Form validation failed", "errors": form.errors}), 400
return render_template('manual_import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now())
@app.route('/import-ig', methods=['GET', 'POST'])
def import_ig():
form = IgImportForm()

View File

@ -75,6 +75,62 @@ class IgImportForm(FlaskForm):
], default='recursive')
submit = SubmitField('Import')
class ManualIgImportForm(FlaskForm):
"""Form for manual importing Implementation Guides via file or URL."""
import_mode = SelectField('Import Mode', choices=[
('file', 'Upload File'),
('url', 'From URL')
], default='file', validators=[DataRequired()])
tgz_file = FileField('IG Package File (.tgz)', validators=[Optional()],
render_kw={'accept': '.tgz'})
tgz_url = StringField('IG Package URL', validators=[Optional(), URL()],
render_kw={'placeholder': 'e.g., https://example.com/hl7.fhir.au.core-1.1.0-preview.tgz'})
dependency_mode = SelectField('Dependency Mode', choices=[
('recursive', 'Current Recursive'),
('patch-canonical', 'Patch Canonical Versions'),
('tree-shaking', 'Tree Shaking (Only Used Dependencies)')
], default='recursive')
resolve_dependencies = BooleanField('Resolve Dependencies', default=True,
render_kw={'class': 'form-check-input'})
submit = SubmitField('Import')
def validate(self, extra_validators=None):
if not super().validate(extra_validators):
return False
mode = self.import_mode.data
has_file = request and request.files and self.tgz_file.name in request.files and request.files[self.tgz_file.name].filename != ''
has_url = bool(self.tgz_url.data) # Convert to boolean: True if non-empty string
# Ensure exactly one input method is used
inputs_provided = sum([has_file, has_url])
if inputs_provided != 1:
if inputs_provided == 0:
self.import_mode.errors.append('Please provide input for one import method (File or URL).')
else:
self.import_mode.errors.append('Please use only one import method at a time.')
return False
# Validate based on import mode
if mode == 'file':
if not has_file:
self.tgz_file.errors.append('A .tgz file is required for File import.')
return False
if not self.tgz_file.data.filename.lower().endswith('.tgz'):
self.tgz_file.errors.append('File must be a .tgz file.')
return False
elif mode == 'url':
if not has_url:
self.tgz_url.errors.append('A valid URL is required for URL import.')
return False
if not self.tgz_url.data.lower().endswith('.tgz'):
self.tgz_url.errors.append('URL must point to a .tgz file.')
return False
else:
self.import_mode.errors.append('Invalid import mode selected.')
return False
return True
class ValidationForm(FlaskForm):
"""Form for validating FHIR samples."""
package_name = StringField('Package Name', validators=[DataRequired()])
@ -246,7 +302,7 @@ class FhirRequestForm(FlaskForm):
if self.auth_type.data == 'basicAuth':
if not self.basic_auth_username.data:
self.basic_auth_username.errors.append('Username is required for Basic Authentication with a custom URL.')
return False
return False
if not self.basic_auth_password.data:
self.basic_auth_password.errors.append('Password is required for Basic Authentication with a custom URL.')
return False

View File

@ -241,6 +241,225 @@ def get_additional_registries():
return feeds
# --- END MODIFIED FUNCTION ---
def import_manual_package_and_dependencies(input_source, version=None, dependency_mode='recursive', is_file=False, is_url=False, resolve_dependencies=True):
"""
Import a FHIR Implementation Guide package manually, cloning import_package_and_dependencies.
Supports registry, file, or URL inputs with dependency handling.
Args:
input_source (str): Package name (for registry), file path (for file), or URL (for URL).
version (str, optional): Package version for registry imports.
dependency_mode (str): Dependency import mode ('recursive', 'patch-canonical', 'tree-shaking').
is_file (bool): True if input_source is a file path.
is_url (bool): True if input_source is a URL.
resolve_dependencies (bool): Whether to resolve and import dependencies.
Returns:
dict: Import results with 'requested', 'downloaded', 'dependencies', and 'errors'.
"""
logger.info(f"Starting manual import for {input_source} (mode={dependency_mode}, resolve_deps={resolve_dependencies})")
download_dir = _get_download_dir()
if not download_dir:
return {
"requested": input_source,
"downloaded": {},
"dependencies": [],
"errors": ["Failed to get download directory."]
}
results = {
"requested": input_source,
"downloaded": {},
"dependencies": [],
"errors": []
}
try:
if is_file:
tgz_path = input_source
if not os.path.exists(tgz_path):
results['errors'].append(f"File not found: {tgz_path}")
return results
name, version = parse_package_filename(os.path.basename(tgz_path))
if not name:
name = os.path.splitext(os.path.basename(tgz_path))[0]
version = "unknown"
target_filename = construct_tgz_filename(name, version)
target_path = os.path.join(download_dir, target_filename)
shutil.copy(tgz_path, target_path)
results['downloaded'][name, version] = target_path
elif is_url:
tgz_path = download_manual_package_from_url(input_source, download_dir)
if not tgz_path:
results['errors'].append(f"Failed to download package from URL: {input_source}")
return results
name, version = parse_package_filename(os.path.basename(tgz_path))
if not name:
name = os.path.splitext(os.path.basename(tgz_path))[0]
version = "unknown"
results['downloaded'][name, version] = tgz_path
else:
tgz_path = download_manual_package(input_source, version, download_dir)
if not tgz_path:
results['errors'].append(f"Failed to download {input_source}#{version}")
return results
results['downloaded'][input_source, version] = tgz_path
name = input_source
if resolve_dependencies:
pkg_info = process_manual_package_file(tgz_path)
if pkg_info.get('errors'):
results['errors'].extend(pkg_info['errors'])
dependencies = pkg_info.get('dependencies', [])
results['dependencies'] = dependencies
if dependencies and dependency_mode != 'tree-shaking':
for dep in dependencies:
dep_name = dep.get('name')
dep_version = dep.get('version', 'latest')
if not dep_name:
continue
logger.info(f"Processing dependency {dep_name}#{dep_version}")
dep_result = import_manual_package_and_dependencies(
dep_name,
dep_version,
dependency_mode=dependency_mode,
resolve_dependencies=True
)
results['downloaded'].update(dep_result['downloaded'])
results['dependencies'].extend(dep_result['dependencies'])
results['errors'].extend(dep_result['errors'])
save_package_metadata(name, version, dependency_mode, results['dependencies'])
return results
except Exception as e:
logger.error(f"Error during manual import of {input_source}: {str(e)}", exc_info=True)
results['errors'].append(f"Unexpected error: {str(e)}")
return results
def download_manual_package(package_name, version, download_dir):
"""
Download a FHIR package from the registry, cloning download_package.
Args:
package_name (str): Package name.
version (str): Package version.
download_dir (str): Directory to save the package.
Returns:
str: Path to the downloaded file, or None if failed.
"""
logger.info(f"Attempting manual download of {package_name}#{version}")
tgz_filename = construct_tgz_filename(package_name, version)
if not tgz_filename:
logger.error(f"Invalid filename constructed for {package_name}#{version}")
return None
target_path = os.path.join(download_dir, tgz_filename)
if os.path.exists(target_path):
logger.info(f"Manual package {package_name}#{version} already exists at {target_path}")
return target_path
url = f"{FHIR_REGISTRY_BASE_URL}/{package_name}/{version}"
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(target_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Manually downloaded {package_name}#{version} to {target_path}")
return target_path
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error downloading {package_name}#{version}: {e}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request error downloading {package_name}#{version}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading {package_name}#{version}: {e}", exc_info=True)
return None
def download_manual_package_from_url(url, download_dir):
"""
Download a FHIR package from a URL, cloning download_package logic.
Args:
url (str): URL to the .tgz file.
download_dir (str): Directory to save the package.
Returns:
str: Path to the downloaded file, or None if failed.
"""
logger.info(f"Attempting manual download from URL: {url}")
parsed_url = urlparse(url)
filename = os.path.basename(parsed_url.path)
if not filename.endswith('.tgz'):
logger.error(f"URL does not point to a .tgz file: {filename}")
return None
target_path = os.path.join(download_dir, filename)
if os.path.exists(target_path):
logger.info(f"Package from {url} already exists at {target_path}")
return target_path
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(target_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Manually downloaded package from {url} to {target_path}")
return target_path
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error downloading from {url}: {e}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request error downloading from {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading from {url}: {e}", exc_info=True)
return None
def process_manual_package_file(tgz_path):
"""
Process a .tgz package file to extract metadata, cloning process_package_file.
Args:
tgz_path (str): Path to the .tgz file.
Returns:
dict: Package metadata including dependencies and errors.
"""
if not tgz_path or not os.path.exists(tgz_path):
logger.error(f"Package file not found for manual processing: {tgz_path}")
return {'errors': [f"Package file not found: {tgz_path}"], 'dependencies': []}
pkg_basename = os.path.basename(tgz_path)
name, version = parse_package_filename(tgz_path)
logger.info(f"Manually processing package: {pkg_basename} ({name}#{version})")
results = {
'dependencies': [],
'errors': []
}
try:
with tarfile.open(tgz_path, "r:gz") as tar:
pkg_json_member = next((m for m in tar if m.name == 'package/package.json'), None)
if pkg_json_member:
with tar.extractfile(pkg_json_member) as f:
pkg_data = json.load(f)
dependencies = pkg_data.get('dependencies', {})
results['dependencies'] = [
{'name': dep_name, 'version': dep_version}
for dep_name, dep_version in dependencies.items()
]
else:
results['errors'].append("package.json not found in archive")
except Exception as e:
logger.error(f"Error manually processing {tgz_path}: {e}", exc_info=True)
results['errors'].append(f"Error processing package: {str(e)}")
return results
def fetch_packages_from_registries(search_term=''):
logger.debug("Entering fetch_packages_from_registries function with search_term: %s", search_term)
packages_dict = defaultdict(list)

View File

@ -769,6 +769,9 @@
<li class="nav-item">
<a class="nav-link {% if request.path == '/search-and-import' %}active{% endif %}" href="{{ url_for('search_and_import') }}"><i class="fas fa-download me-1"></i> Search and Import</a>
</li>
<li class="nav-item">
<a class="nav-link {{ 'active' if request.endpoint == 'manual_import_ig' else '' }}" href="{{ url_for('manual_import_ig') }}"><i class="fas fa-folder-open me-1"></i> Manual Import</a>
</li>
<li class="nav-item">
<a class="nav-link {{ 'active' if request.endpoint == 'view_igs' else '' }}" href="{{ url_for('view_igs') }}"><i class="fas fa-folder-open me-1"></i> Manage FHIR Packages</a>
</li>

View File

@ -0,0 +1,194 @@
{% extends "base.html" %}
{% from "_form_helpers.html" import render_field %}
{% block content %}
<div class="container">
{% include "_flash_messages.html" %}
<h1 class="mt-4 mb-3">Import FHIR Implementation Guides</h1>
<div class="row">
<div class="col-md-12">
<p class="lead">Import new FHIR Implementation Guides to the system via file or URL.</p>
</div>
</div>
<div class="row">
<div class="col-md-6">
<h2>Import a New IG</h2>
<form id="manual-import-ig-form" method="POST" action="{{ url_for('manual_import_ig') }}" enctype="multipart/form-data">
{{ form.hidden_tag() }}
<div class="mb-3">
{{ render_field(form.import_mode, class="form-select") }}
</div>
<div id="file-field" class="{% if form.import_mode.data != 'file' %}d-none{% endif %}">
<div class="mb-3">
<label for="tgz_file" class="form-label">IG Package File (.tgz)</label>
<div class="input-group">
<input type="file" name="tgz_file" id="tgz_file" accept=".tgz" class="form-control custom-file-input">
<span id="file-label" class="custom-file-label">No file selected</span>
</div>
{% if form.tgz_file.errors %}
<div class="form-error">
{% for error in form.tgz_file.errors %}
<p>{{ error }}</p>
{% endfor %}
</div>
{% endif %}
</div>
</div>
<div id="url-field" class="{% if form.import_mode.data != 'url' %}d-none{% endif %}">
<div class="mb-3">
{{ render_field(form.tgz_url, class="form-control url-input") }}
</div>
</div>
<div class="mb-3">
{{ render_field(form.dependency_mode, class="form-select") }}
</div>
<div class="form-check mb-3">
{{ form.resolve_dependencies(type="checkbox", class="form-check-input") }}
<label class="form-check-label" for="resolve_dependencies">Resolve Dependencies</label>
{% if form.resolve_dependencies.errors %}
<div class="form-error">
{% for error in form.resolve_dependencies.errors %}
<p>{{ error }}</p>
{% endfor %}
</div>
{% endif %}
</div>
<div class="d-grid gap-2 d-sm-flex mt-3">
{{ form.submit(class="btn btn-success", id="submit-btn") }}
<a href="{{ url_for('index') }}" class="btn btn-secondary">Back</a>
</div>
</form>
</div>
</div>
</div>
<style>
.form-error {
color: #dc3545;
font-size: 0.875rem;
margin-top: 0.25rem;
}
.custom-file-input {
width: 100%;
padding: 0.375rem 0.75rem;
font-size: 1rem;
line-height: 1.5;
color: #495057;
background-color: #fff;
border: 1px solid #ced4da;
border-radius: 0.25rem;
cursor: pointer;
}
.custom-file-input::-webkit-file-upload-button {
display: none;
}
.custom-file-input::before {
content: 'Choose File';
display: inline-block;
background: #007bff;
color: #fff;
padding: 0.375rem 0.75rem;
border-radius: 0.25rem;
margin-right: 0.5rem;
}
.custom-file-input:hover::before {
background: #0056b3;
}
.custom-file-label {
display: inline-block;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
vertical-align: middle;
max-width: 60%;
}
#file-field, #url-field {
display: none;
}
#file-field:not(.d-none), #url-field:not(.d-none) {
display: block;
}
</style>
<script>
document.addEventListener('DOMContentLoaded', function() {
console.debug('DOMContentLoaded fired for manual_import_ig.html');
const importForm = document.getElementById('manual-import-ig-form');
const importModeSelect = document.querySelector('#import_mode');
const fileField = document.querySelector('#file-field');
const urlField = document.querySelector('#url-field');
const fileInput = document.querySelector('#tgz_file');
const fileLabel = document.querySelector('#file-label');
const urlInput = document.querySelector('#tgz_url');
// Debug DOM elements
console.debug('importForm:', !!importForm);
console.debug('importModeSelect:', !!importModeSelect);
console.debug('fileField:', !!fileField);
console.debug('urlField:', !!urlField);
console.debug('fileInput:', !!fileInput);
console.debug('fileLabel:', !!fileLabel);
console.debug('urlInput:', !!urlInput);
function toggleFields() {
if (!importModeSelect || !fileField || !urlField) {
console.error('Required elements for toggling not found');
return;
}
const mode = importModeSelect.value;
console.debug(`Toggling fields for mode: ${mode}`);
fileField.classList.toggle('d-none', mode !== 'file');
urlField.classList.toggle('d-none', mode !== 'url');
console.debug(`file-field d-none: ${fileField.classList.contains('d-none')}, url-field d-none: ${urlField.classList.contains('d-none')}`);
}
function updateFileLabel() {
if (fileInput && fileLabel) {
const fileName = fileInput.files.length > 0 ? fileInput.files[0].name : 'No file selected';
fileLabel.textContent = fileName;
console.debug(`Updated file label to: ${fileName}`);
} else {
console.warn('File input or label not found');
}
}
if (importModeSelect) {
importModeSelect.addEventListener('change', function() {
console.debug('Import mode changed to:', importModeSelect.value);
toggleFields();
});
toggleFields();
} else {
console.error('Import mode select not found');
}
if (fileInput) {
fileInput.addEventListener('change', function() {
console.debug('File input changed');
updateFileLabel();
});
updateFileLabel();
}
if (urlInput) {
urlInput.addEventListener('input', function() {
console.debug(`URL input value: ${urlInput.value}`);
});
}
if (importForm) {
importForm.addEventListener('submit', function(evt) {
console.debug('Form submission triggered');
if (!importForm.checkValidity()) {
evt.preventDefault();
console.error('Form validation failed');
importForm.reportValidity();
}
});
}
});
</script>
{% endblock %}