From d38e29160c09312bdcf0c7498c81f91a8eb21145 Mon Sep 17 00:00:00 2001 From: Sudo-JHare Date: Sun, 22 Jun 2025 22:36:28 +1000 Subject: [PATCH] update New Feature - Manual Upload IG file / or from URL. this allows users to use IG's tat are not yet Published to registries --- app.py | 73 ++++++++++- forms.py | 58 ++++++++- services.py | 219 ++++++++++++++++++++++++++++++++ templates/base.html | 3 + templates/manual_import_ig.html | 194 ++++++++++++++++++++++++++++ 5 files changed, 544 insertions(+), 3 deletions(-) create mode 100644 templates/manual_import_ig.html diff --git a/app.py b/app.py index c69f1b5..e2a9a90 100644 --- a/app.py +++ b/app.py @@ -38,9 +38,10 @@ from services import ( HAS_PACKAGING_LIB, pkg_version, get_package_description, - safe_parse_version + safe_parse_version, + import_manual_package_and_dependencies ) -from forms import IgImportForm, ValidationForm, FSHConverterForm, TestDataUploadForm, RetrieveSplitDataForm +from forms import IgImportForm, ManualIgImportForm, ValidationForm, FSHConverterForm, TestDataUploadForm, RetrieveSplitDataForm from wtforms import SubmitField from package import package_bp from flasgger import Swagger, swag_from # Import Flasgger @@ -518,6 +519,74 @@ def restart_tomcat(): def config_hapi(): return render_template('config_hapi.html', site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now()) +@app.route('/manual-import-ig', methods=['GET', 'POST']) +def manual_import_ig(): + """ + Handle manual import of FHIR Implementation Guides using file or URL uploads. + Uses ManualIgImportForm to support file and URL inputs without registry option. + """ + form = ManualIgImportForm() + is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest' or request.headers.get('HX-Request') == 'true' + + if form.validate_on_submit(): + import_mode = form.import_mode.data + dependency_mode = form.dependency_mode.data + resolve_dependencies = form.resolve_dependencies.data + while not log_queue.empty(): + log_queue.get() + + try: + if import_mode == 'file': + tgz_file = form.tgz_file.data + temp_dir = tempfile.mkdtemp() + temp_path = os.path.join(temp_dir, secure_filename(tgz_file.filename)) + tgz_file.save(temp_path) + result = import_manual_package_and_dependencies(temp_path, dependency_mode=dependency_mode, is_file=True, resolve_dependencies=resolve_dependencies) + identifier = result.get('requested', tgz_file.filename) + shutil.rmtree(temp_dir, ignore_errors=True) + elif import_mode == 'url': + tgz_url = form.tgz_url.data + result = import_manual_package_and_dependencies(tgz_url, dependency_mode=dependency_mode, is_url=True, resolve_dependencies=resolve_dependencies) + identifier = result.get('requested', tgz_url) + + if result['errors'] and not result['downloaded']: + error_msg = result['errors'][0] + simplified_msg = error_msg + if "HTTP error" in error_msg and "404" in error_msg: + simplified_msg = "Package not found (404). Check input." + elif "HTTP error" in error_msg: + simplified_msg = f"Error: {error_msg.split(': ', 1)[-1]}" + elif "Connection error" in error_msg: + simplified_msg = "Could not connect to source." + flash(f"Failed to import {identifier}: {simplified_msg}", "error") + logger.error(f"Manual import failed for {identifier}: {error_msg}") + if is_ajax: + return jsonify({"status": "error", "message": simplified_msg}), 400 + return render_template('manual_import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now()) + else: + if result['errors']: + flash(f"Partially imported {identifier} with errors. Check logs.", "warning") + for err in result['errors']: + logger.warning(f"Manual import warning for {identifier}: {err}") + else: + flash(f"Successfully imported {identifier}! Mode: {dependency_mode}", "success") + if is_ajax: + return jsonify({"status": "success", "message": f"Imported {identifier}", "redirect": url_for('view_igs')}), 200 + return redirect(url_for('view_igs')) + except Exception as e: + logger.error(f"Unexpected error during manual IG import: {str(e)}", exc_info=True) + flash(f"An unexpected error occurred: {str(e)}", "error") + if is_ajax: + return jsonify({"status": "error", "message": str(e)}), 500 + return render_template('manual_import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now()) + else: + for field, errors in form.errors.items(): + for error in errors: + flash(f"Error in {getattr(form, field).label.text}: {error}", "danger") + if is_ajax: + return jsonify({"status": "error", "message": "Form validation failed", "errors": form.errors}), 400 + return render_template('manual_import_ig.html', form=form, site_name='FHIRFLARE IG Toolkit', now=datetime.datetime.now()) + @app.route('/import-ig', methods=['GET', 'POST']) def import_ig(): form = IgImportForm() diff --git a/forms.py b/forms.py index 976521e..71b26e8 100644 --- a/forms.py +++ b/forms.py @@ -75,6 +75,62 @@ class IgImportForm(FlaskForm): ], default='recursive') submit = SubmitField('Import') +class ManualIgImportForm(FlaskForm): + """Form for manual importing Implementation Guides via file or URL.""" + import_mode = SelectField('Import Mode', choices=[ + ('file', 'Upload File'), + ('url', 'From URL') + ], default='file', validators=[DataRequired()]) + tgz_file = FileField('IG Package File (.tgz)', validators=[Optional()], + render_kw={'accept': '.tgz'}) + tgz_url = StringField('IG Package URL', validators=[Optional(), URL()], + render_kw={'placeholder': 'e.g., https://example.com/hl7.fhir.au.core-1.1.0-preview.tgz'}) + dependency_mode = SelectField('Dependency Mode', choices=[ + ('recursive', 'Current Recursive'), + ('patch-canonical', 'Patch Canonical Versions'), + ('tree-shaking', 'Tree Shaking (Only Used Dependencies)') + ], default='recursive') + resolve_dependencies = BooleanField('Resolve Dependencies', default=True, + render_kw={'class': 'form-check-input'}) + submit = SubmitField('Import') + + def validate(self, extra_validators=None): + if not super().validate(extra_validators): + return False + mode = self.import_mode.data + has_file = request and request.files and self.tgz_file.name in request.files and request.files[self.tgz_file.name].filename != '' + has_url = bool(self.tgz_url.data) # Convert to boolean: True if non-empty string + + # Ensure exactly one input method is used + inputs_provided = sum([has_file, has_url]) + if inputs_provided != 1: + if inputs_provided == 0: + self.import_mode.errors.append('Please provide input for one import method (File or URL).') + else: + self.import_mode.errors.append('Please use only one import method at a time.') + return False + + # Validate based on import mode + if mode == 'file': + if not has_file: + self.tgz_file.errors.append('A .tgz file is required for File import.') + return False + if not self.tgz_file.data.filename.lower().endswith('.tgz'): + self.tgz_file.errors.append('File must be a .tgz file.') + return False + elif mode == 'url': + if not has_url: + self.tgz_url.errors.append('A valid URL is required for URL import.') + return False + if not self.tgz_url.data.lower().endswith('.tgz'): + self.tgz_url.errors.append('URL must point to a .tgz file.') + return False + else: + self.import_mode.errors.append('Invalid import mode selected.') + return False + + return True + class ValidationForm(FlaskForm): """Form for validating FHIR samples.""" package_name = StringField('Package Name', validators=[DataRequired()]) @@ -246,7 +302,7 @@ class FhirRequestForm(FlaskForm): if self.auth_type.data == 'basicAuth': if not self.basic_auth_username.data: self.basic_auth_username.errors.append('Username is required for Basic Authentication with a custom URL.') - return False + return False if not self.basic_auth_password.data: self.basic_auth_password.errors.append('Password is required for Basic Authentication with a custom URL.') return False diff --git a/services.py b/services.py index 6fe8267..926e454 100644 --- a/services.py +++ b/services.py @@ -241,6 +241,225 @@ def get_additional_registries(): return feeds # --- END MODIFIED FUNCTION --- +def import_manual_package_and_dependencies(input_source, version=None, dependency_mode='recursive', is_file=False, is_url=False, resolve_dependencies=True): + """ + Import a FHIR Implementation Guide package manually, cloning import_package_and_dependencies. + Supports registry, file, or URL inputs with dependency handling. + + Args: + input_source (str): Package name (for registry), file path (for file), or URL (for URL). + version (str, optional): Package version for registry imports. + dependency_mode (str): Dependency import mode ('recursive', 'patch-canonical', 'tree-shaking'). + is_file (bool): True if input_source is a file path. + is_url (bool): True if input_source is a URL. + resolve_dependencies (bool): Whether to resolve and import dependencies. + + Returns: + dict: Import results with 'requested', 'downloaded', 'dependencies', and 'errors'. + """ + logger.info(f"Starting manual import for {input_source} (mode={dependency_mode}, resolve_deps={resolve_dependencies})") + download_dir = _get_download_dir() + if not download_dir: + return { + "requested": input_source, + "downloaded": {}, + "dependencies": [], + "errors": ["Failed to get download directory."] + } + + results = { + "requested": input_source, + "downloaded": {}, + "dependencies": [], + "errors": [] + } + + try: + if is_file: + tgz_path = input_source + if not os.path.exists(tgz_path): + results['errors'].append(f"File not found: {tgz_path}") + return results + name, version = parse_package_filename(os.path.basename(tgz_path)) + if not name: + name = os.path.splitext(os.path.basename(tgz_path))[0] + version = "unknown" + target_filename = construct_tgz_filename(name, version) + target_path = os.path.join(download_dir, target_filename) + shutil.copy(tgz_path, target_path) + results['downloaded'][name, version] = target_path + elif is_url: + tgz_path = download_manual_package_from_url(input_source, download_dir) + if not tgz_path: + results['errors'].append(f"Failed to download package from URL: {input_source}") + return results + name, version = parse_package_filename(os.path.basename(tgz_path)) + if not name: + name = os.path.splitext(os.path.basename(tgz_path))[0] + version = "unknown" + results['downloaded'][name, version] = tgz_path + else: + tgz_path = download_manual_package(input_source, version, download_dir) + if not tgz_path: + results['errors'].append(f"Failed to download {input_source}#{version}") + return results + results['downloaded'][input_source, version] = tgz_path + name = input_source + + if resolve_dependencies: + pkg_info = process_manual_package_file(tgz_path) + if pkg_info.get('errors'): + results['errors'].extend(pkg_info['errors']) + dependencies = pkg_info.get('dependencies', []) + results['dependencies'] = dependencies + + if dependencies and dependency_mode != 'tree-shaking': + for dep in dependencies: + dep_name = dep.get('name') + dep_version = dep.get('version', 'latest') + if not dep_name: + continue + logger.info(f"Processing dependency {dep_name}#{dep_version}") + dep_result = import_manual_package_and_dependencies( + dep_name, + dep_version, + dependency_mode=dependency_mode, + resolve_dependencies=True + ) + results['downloaded'].update(dep_result['downloaded']) + results['dependencies'].extend(dep_result['dependencies']) + results['errors'].extend(dep_result['errors']) + + save_package_metadata(name, version, dependency_mode, results['dependencies']) + return results + except Exception as e: + logger.error(f"Error during manual import of {input_source}: {str(e)}", exc_info=True) + results['errors'].append(f"Unexpected error: {str(e)}") + return results + +def download_manual_package(package_name, version, download_dir): + """ + Download a FHIR package from the registry, cloning download_package. + + Args: + package_name (str): Package name. + version (str): Package version. + download_dir (str): Directory to save the package. + + Returns: + str: Path to the downloaded file, or None if failed. + """ + logger.info(f"Attempting manual download of {package_name}#{version}") + tgz_filename = construct_tgz_filename(package_name, version) + if not tgz_filename: + logger.error(f"Invalid filename constructed for {package_name}#{version}") + return None + target_path = os.path.join(download_dir, tgz_filename) + if os.path.exists(target_path): + logger.info(f"Manual package {package_name}#{version} already exists at {target_path}") + return target_path + + url = f"{FHIR_REGISTRY_BASE_URL}/{package_name}/{version}" + try: + response = requests.get(url, stream=True, timeout=30) + response.raise_for_status() + with open(target_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + logger.info(f"Manually downloaded {package_name}#{version} to {target_path}") + return target_path + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error downloading {package_name}#{version}: {e}") + return None + except requests.exceptions.RequestException as e: + logger.error(f"Request error downloading {package_name}#{version}: {e}") + return None + except Exception as e: + logger.error(f"Unexpected error downloading {package_name}#{version}: {e}", exc_info=True) + return None + +def download_manual_package_from_url(url, download_dir): + """ + Download a FHIR package from a URL, cloning download_package logic. + + Args: + url (str): URL to the .tgz file. + download_dir (str): Directory to save the package. + + Returns: + str: Path to the downloaded file, or None if failed. + """ + logger.info(f"Attempting manual download from URL: {url}") + parsed_url = urlparse(url) + filename = os.path.basename(parsed_url.path) + if not filename.endswith('.tgz'): + logger.error(f"URL does not point to a .tgz file: {filename}") + return None + target_path = os.path.join(download_dir, filename) + if os.path.exists(target_path): + logger.info(f"Package from {url} already exists at {target_path}") + return target_path + + try: + response = requests.get(url, stream=True, timeout=30) + response.raise_for_status() + with open(target_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + logger.info(f"Manually downloaded package from {url} to {target_path}") + return target_path + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error downloading from {url}: {e}") + return None + except requests.exceptions.RequestException as e: + logger.error(f"Request error downloading from {url}: {e}") + return None + except Exception as e: + logger.error(f"Unexpected error downloading from {url}: {e}", exc_info=True) + return None + +def process_manual_package_file(tgz_path): + """ + Process a .tgz package file to extract metadata, cloning process_package_file. + + Args: + tgz_path (str): Path to the .tgz file. + + Returns: + dict: Package metadata including dependencies and errors. + """ + if not tgz_path or not os.path.exists(tgz_path): + logger.error(f"Package file not found for manual processing: {tgz_path}") + return {'errors': [f"Package file not found: {tgz_path}"], 'dependencies': []} + + pkg_basename = os.path.basename(tgz_path) + name, version = parse_package_filename(tgz_path) + logger.info(f"Manually processing package: {pkg_basename} ({name}#{version})") + + results = { + 'dependencies': [], + 'errors': [] + } + + try: + with tarfile.open(tgz_path, "r:gz") as tar: + pkg_json_member = next((m for m in tar if m.name == 'package/package.json'), None) + if pkg_json_member: + with tar.extractfile(pkg_json_member) as f: + pkg_data = json.load(f) + dependencies = pkg_data.get('dependencies', {}) + results['dependencies'] = [ + {'name': dep_name, 'version': dep_version} + for dep_name, dep_version in dependencies.items() + ] + else: + results['errors'].append("package.json not found in archive") + except Exception as e: + logger.error(f"Error manually processing {tgz_path}: {e}", exc_info=True) + results['errors'].append(f"Error processing package: {str(e)}") + + return results + def fetch_packages_from_registries(search_term=''): logger.debug("Entering fetch_packages_from_registries function with search_term: %s", search_term) packages_dict = defaultdict(list) diff --git a/templates/base.html b/templates/base.html index 1437ba7..4d1d846 100644 --- a/templates/base.html +++ b/templates/base.html @@ -769,6 +769,9 @@ + diff --git a/templates/manual_import_ig.html b/templates/manual_import_ig.html new file mode 100644 index 0000000..466d3b0 --- /dev/null +++ b/templates/manual_import_ig.html @@ -0,0 +1,194 @@ +{% extends "base.html" %} +{% from "_form_helpers.html" import render_field %} + +{% block content %} +
+ {% include "_flash_messages.html" %} + +

Import FHIR Implementation Guides

+
+
+

Import new FHIR Implementation Guides to the system via file or URL.

+
+
+ +
+
+

Import a New IG

+
+ {{ form.hidden_tag() }} +
+ {{ render_field(form.import_mode, class="form-select") }} +
+
+
+ +
+ + No file selected +
+ {% if form.tgz_file.errors %} +
+ {% for error in form.tgz_file.errors %} +

{{ error }}

+ {% endfor %} +
+ {% endif %} +
+
+
+
+ {{ render_field(form.tgz_url, class="form-control url-input") }} +
+
+
+ {{ render_field(form.dependency_mode, class="form-select") }} +
+
+ {{ form.resolve_dependencies(type="checkbox", class="form-check-input") }} + + {% if form.resolve_dependencies.errors %} +
+ {% for error in form.resolve_dependencies.errors %} +

{{ error }}

+ {% endfor %} +
+ {% endif %} +
+
+ {{ form.submit(class="btn btn-success", id="submit-btn") }} + Back +
+
+
+
+
+ + + + +{% endblock %} \ No newline at end of file