From: Andrew Grimberg Date: Fri, 3 Nov 2023 15:57:28 +0000 (-0700) Subject: Revert "Refactor: Annotate lftools.deploy" X-Git-Tag: v0.37.8^0 X-Git-Url: https://gerrit.linuxfoundation.org/infra/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F35%2F72335%2F1;p=releng%2Flftools.git Revert "Refactor: Annotate lftools.deploy" This reverts commit 0006fe3df413dcca4d03d26d9c33fdadb3498a26. Signed-off-by: Andrew Grimberg Change-Id: I7d57bc95825769c374333877b27eda628bfd9d6f --- diff --git a/lftools/deploy.py b/lftools/deploy.py index ef3c952d..5583b46c 100755 --- a/lftools/deploy.py +++ b/lftools/deploy.py @@ -9,7 +9,6 @@ # http://www.eclipse.org/legal/epl-v10.html ############################################################################## """Library of functions for deploying artifacts to Nexus.""" -from __future__ import annotations import concurrent.futures import datetime @@ -27,9 +26,6 @@ import subprocess import sys import tempfile import zipfile -from concurrent.futures import Future -from typing import Any, Dict, List, Optional, Tuple -from xml.dom.minidom import Document, Node import boto3 import requests @@ -37,44 +33,44 @@ import six from botocore.exceptions import ClientError from defusedxml.minidom import parseString -log: logging.Logger = logging.getLogger(__name__) +log = logging.getLogger(__name__) logging.getLogger("botocore").setLevel(logging.CRITICAL) -def _compress_text(directory: str) -> None: +def _compress_text(dir): """Compress all text files in directory.""" - save_dir: str = os.getcwd() - os.chdir(directory) + save_dir = os.getcwd() + os.chdir(dir) - compress_types: List[str] = [ + compress_types = [ "**/*.html", "**/*.log", "**/*.txt", "**/*.xml", ] - paths: List[str] = [] + paths = [] for _type in compress_types: - search: str = os.path.join(directory, _type) + search = os.path.join(dir, _type) paths.extend(glob.glob(search, recursive=True)) for _file in paths: # glob may follow symlink paths that open can't find if os.path.exists(_file): - log.debug(f"Compressing file {_file}") - with open(_file, "rb") as src, gzip.open(f"{_file}.gz", "wb") as dest: + log.debug("Compressing file {}".format(_file)) + with open(_file, "rb") as src, gzip.open("{}.gz".format(_file), "wb") as dest: shutil.copyfileobj(src, dest) os.remove(_file) else: - log.info(f"Could not open path from glob {_file}") + log.info("Could not open path from glob {}".format(_file)) os.chdir(save_dir) -def _format_url(url: str) -> str: +def _format_url(url): """Ensure url starts with http and trim trailing '/'s.""" - start_pattern: re.Pattern = re.compile("^(http|https)://") + start_pattern = re.compile("^(http|https)://") if not start_pattern.match(url): - url = f"http://{url}" + url = "http://{}".format(url) if url.endswith("/"): url = url.rstrip("/") @@ -82,40 +78,41 @@ def _format_url(url: str) -> str: return url -def _log_error_and_exit(*msg_list: str) -> None: +def _log_error_and_exit(*msg_list): """Print error message, and exit.""" for msg in msg_list: log.error(msg) sys.exit(1) -def _request_post(url: str, data: Optional[Any], headers: Optional[Dict[str, str]]) -> requests.Response: +def _request_post(url, data, headers): """Execute a request post, return the resp.""" + resp = {} try: - resp: requests.Response = requests.post(url, data=data, headers=headers) + resp = requests.post(url, data=data, headers=headers) except requests.exceptions.MissingSchema: log.debug("in _request_post. MissingSchema") - _log_error_and_exit(f"Not valid URL: {url}") + _log_error_and_exit("Not valid URL: {}".format(url)) except requests.exceptions.ConnectionError: log.debug("in _request_post. ConnectionError") - _log_error_and_exit(f"Could not connect to URL: {url}") + _log_error_and_exit("Could not connect to URL: {}".format(url)) except requests.exceptions.InvalidURL: log.debug("in _request_post. InvalidURL") - _log_error_and_exit(f"Invalid URL: {url}") + _log_error_and_exit("Invalid URL: {}".format(url)) return resp -def _get_filenames_in_zipfile(_zipfile: str) -> List[str]: +def _get_filenames_in_zipfile(_zipfile): """Return a list with file names.""" - files: List[zipfile.ZipInfo] = zipfile.ZipFile(_zipfile).infolist() + files = zipfile.ZipFile(_zipfile).infolist() return [f.filename for f in files] -def _request_post_file(url: str, file_to_upload: str, parameters: Optional[Any] = None) -> requests.Response: +def _request_post_file(url, file_to_upload, parameters=None): """Execute a request post, return the resp.""" - resp: requests.Response = requests.Response() + resp = {} try: - upload_file: io.BufferedReader = open(file_to_upload, "rb") + upload_file = open(file_to_upload, "rb") except FileNotFoundError: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file_to_upload) @@ -126,33 +123,32 @@ def _request_post_file(url: str, file_to_upload: str, parameters: Optional[Any] else: resp = requests.post(url, data=upload_file.read()) except requests.exceptions.MissingSchema: - raise requests.HTTPError(f"Not valid URL: {url}", response=resp) + raise requests.HTTPError("Not valid URL: {}".format(url)) except requests.exceptions.ConnectionError: - raise requests.HTTPError(f"Could not connect to URL: {url}", response=resp) + raise requests.HTTPError("Could not connect to URL: {}".format(url)) except requests.exceptions.InvalidURL: - raise requests.HTTPError(f"Invalid URL: {url}", response=resp) + raise requests.HTTPError("Invalid URL: {}".format(url)) if resp.status_code == 400: - raise requests.HTTPError("Repository is read only", response=resp) + raise requests.HTTPError("Repository is read only") elif resp.status_code == 404: - raise requests.HTTPError("Did not find repository.", response=resp) + raise requests.HTTPError("Did not find repository.") if not str(resp.status_code).startswith("20"): raise requests.HTTPError( "Failed to upload to Nexus with status code: {}.\n{}\n{}".format( resp.status_code, resp.text, file_to_upload - ), - response=resp, + ) ) return resp -def _request_put_file(url: str, file_to_upload: str, parameters: Optional[Any] = None) -> bool: +def _request_put_file(url, file_to_upload, parameters=None): """Execute a request put, return the resp.""" - resp: requests.Response = requests.Response() + resp = {} try: - upload_file: io.BufferedReader = open(file_to_upload, "rb") + upload_file = open(file_to_upload, "rb") except FileNotFoundError: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file_to_upload) @@ -163,60 +159,53 @@ def _request_put_file(url: str, file_to_upload: str, parameters: Optional[Any] = else: resp = requests.put(url, data=upload_file) except requests.exceptions.MissingSchema: - raise requests.HTTPError(f"Not valid URL format. Check for https:// etc..: {url}", response=resp) + raise requests.HTTPError("Not valid URL format. Check for https:// etc..: {}".format(url)) except requests.exceptions.ConnectTimeout: - raise requests.HTTPError(f"Timed out connecting to {url}", response=resp) + raise requests.HTTPError("Timed out connecting to {}".format(url)) except requests.exceptions.ReadTimeout: - raise requests.HTTPError(f"Timed out waiting for the server to reply ({url})", response=resp) + raise requests.HTTPError("Timed out waiting for the server to reply ({})".format(url)) except requests.exceptions.ConnectionError: - raise requests.HTTPError(f"A connection error occurred ({url})", response=resp) + raise requests.HTTPError("A connection error occurred ({})".format(url)) except requests.exceptions.InvalidURL: - raise requests.HTTPError(f"Invalid URL format: {url}", response=resp) + raise requests.HTTPError("Invalid URL format: {}".format(url)) except requests.RequestException as e: log.error(e) if resp.status_code == 201: return True if resp.status_code == 400: - raise requests.HTTPError("Repository is read only", response=resp) + raise requests.HTTPError("Repository is read only") if resp.status_code == 401: - raise requests.HTTPError("Invalid repository credentials", response=resp) + raise requests.HTTPError("Invalid repository credentials") if resp.status_code == 404: - raise requests.HTTPError("Did not find repository.", response=resp) + raise requests.HTTPError("Did not find repository.") if not str(resp.status_code).startswith("20"): raise requests.HTTPError( "Failed to upload to Nexus with status code: {}.\n{}\n{}".format( resp.status_code, resp.text, file_to_upload - ), - response=resp, + ) ) - return False - -def _get_node_from_xml(xml_data: str, tag_name: str) -> str: +def _get_node_from_xml(xml_data, tag_name): """Extract tag data from xml data.""" log.debug("xml={}".format(xml_data)) try: - dom1: Document = parseString(xml_data) - childnode: Node = dom1.getElementsByTagName(tag_name)[0] + dom1 = parseString(xml_data) + childnode = dom1.getElementsByTagName(tag_name)[0] except Exception: - _log_error_and_exit(f"Received bad XML, can not find tag {tag_name}", xml_data) - if childnode.firstChild: - return str(childnode.firstChild.data) # type: ignore - else: - _log_error_and_exit(f"No data in {tag_name}", xml_data) - return "" + _log_error_and_exit("Received bad XML, can not find tag {}".format(tag_name), xml_data) + return childnode.firstChild.data -def _remove_duplicates_and_sort(lst: List[str]) -> List[str]: +def _remove_duplicates_and_sort(lst): # Remove duplicates from list, and sort it no_dups_lst = list(dict.fromkeys(lst)) no_dups_lst.sort() - duplicated_list: List[str] = [] + duplicated_list = [] for i in range(len(no_dups_lst)): if lst.count(no_dups_lst[i]) > 1: duplicated_list.append(no_dups_lst[i]) @@ -225,7 +214,7 @@ def _remove_duplicates_and_sort(lst: List[str]) -> List[str]: return no_dups_lst -def copy_archives(workspace: str, pattern: Optional[List[str]] = None) -> None: +def copy_archives(workspace, pattern=None): """Copy files matching PATTERN in a WORKSPACE to the current directory. The best way to use this function is to cd into the directory you wish to @@ -241,51 +230,51 @@ def copy_archives(workspace: str, pattern: Optional[List[str]] = None) -> None: :arg str pattern: Space-separated list of Unix style glob patterns. (default: None) """ - archives_dir: str = os.path.join(workspace, "archives") - dest_dir: str = os.getcwd() + archives_dir = os.path.join(workspace, "archives") + dest_dir = os.getcwd() - log.debug(f"Copying files from {workspace} with pattern '{pattern}' to {dest_dir}.") - log.debug(f"archives_dir = {archives_dir}") + log.debug("Copying files from {} with pattern '{}' to {}.".format(workspace, pattern, dest_dir)) + log.debug("archives_dir = {}".format(archives_dir)) if os.path.exists(archives_dir): if os.path.isfile(archives_dir): - log.error(f"Archives {archives_dir} is a file, not a directory.") + log.error("Archives {} is a file, not a directory.".format(archives_dir)) raise OSError(errno.ENOENT, "Not a directory", archives_dir) else: - log.debug(f"Archives dir {archives_dir} does exist.") + log.debug("Archives dir {} does exist.".format(archives_dir)) for file_or_dir in os.listdir(archives_dir): f = os.path.join(archives_dir, file_or_dir) try: - log.debug(f"Moving {f}") + log.debug("Moving {}".format(f)) shutil.move(f, dest_dir) except shutil.Error as e: log.error(e) raise OSError(errno.EPERM, "Could not move to", archives_dir) else: - log.error(f"Archives dir {archives_dir} does not exist.") + log.error("Archives dir {} does not exist.".format(archives_dir)) raise OSError(errno.ENOENT, "Missing directory", archives_dir) if pattern is None: return - no_dups_pattern: List[str] = _remove_duplicates_and_sort(pattern) + no_dups_pattern = _remove_duplicates_and_sort(pattern) - paths: List[str] = [] + paths = [] for p in no_dups_pattern: if p == "": # Skip empty patterns as they are invalid continue - search: str = os.path.join(workspace, p) + search = os.path.join(workspace, p) paths.extend(glob.glob(search, recursive=True)) - log.debug(f"Files found: {paths}") + log.debug("Files found: {}".format(paths)) - no_dups_paths: List[str] = _remove_duplicates_and_sort(paths) + no_dups_paths = _remove_duplicates_and_sort(paths) for src in no_dups_paths: if len(os.path.basename(src)) > 255: log.warn("Filename {} is over 255 characters. Skipping...".format(os.path.basename(src))) - dest: str = os.path.join(dest_dir, src[len(workspace) + 1 :]) - log.debug(f"{src} -> {dest}") + dest = os.path.join(dest_dir, src[len(workspace) + 1 :]) + log.debug("{} -> {}".format(src, dest)) if os.path.isfile(src): try: @@ -295,20 +284,20 @@ def copy_archives(workspace: str, pattern: Optional[List[str]] = None) -> None: os.makedirs(os.path.dirname(dest)) shutil.move(src, dest) else: - log.info(f"Not copying directories: {src}.") + log.info("Not copying directories: {}.".format(src)) # Create a temp file to handle empty dirs in AWS S3 buckets. if os.environ.get("S3_BUCKET") is not None: - now: datetime.datetime = datetime.datetime.now() - prefix: str = now.strftime("_%d%m%Y_%H%M%S_") + now = datetime.datetime.now() + p = now.strftime("_%d%m%Y_%H%M%S_") for dirpath, dirnames, files in os.walk(dest_dir): if not files: - fd, tmp = tempfile.mkstemp(prefix=prefix, dir=dirpath) + fd, tmp = tempfile.mkstemp(prefix=p, dir=dirpath) os.close(fd) log.debug("temp file created in dir: {}.".format(dirpath)) -def deploy_archives(nexus_url: str, nexus_path: str, workspace: str, pattern: Optional[List[str]] = None) -> None: +def deploy_archives(nexus_url, nexus_path, workspace, pattern=None): """Archive files to a Nexus site repository named logs. Provides 2 ways to archive files: @@ -331,15 +320,15 @@ def deploy_archives(nexus_url: str, nexus_path: str, workspace: str, pattern: Op archive. (optional) """ nexus_url = _format_url(nexus_url) - previous_dir: str = os.getcwd() - work_dir: str = tempfile.mkdtemp(prefix="lftools-da.") + previous_dir = os.getcwd() + work_dir = tempfile.mkdtemp(prefix="lftools-da.") os.chdir(work_dir) - log.debug(f"workspace: {workspace}, work_dir: {work_dir}") + log.debug("workspace: {}, work_dir: {}".format(workspace, work_dir)) copy_archives(workspace, pattern) _compress_text(work_dir) - archives_zip: str = shutil.make_archive(f"{workspace}/archives", "zip") + archives_zip = shutil.make_archive("{}/archives".format(workspace), "zip") log.debug("archives zip: {}".format(archives_zip)) deploy_nexus_zip(nexus_url, "logs", nexus_path, archives_zip) @@ -347,7 +336,7 @@ def deploy_archives(nexus_url: str, nexus_path: str, workspace: str, pattern: Op shutil.rmtree(work_dir) -def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None: +def deploy_logs(nexus_url, nexus_path, build_url): """Deploy logs to a Nexus site repository named logs. Fetches logs and system information and pushes them to Nexus @@ -366,16 +355,16 @@ def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None: via the $BUILD_URL environment variable. """ nexus_url = _format_url(nexus_url) - previous_dir: str = os.getcwd() - work_dir: str = tempfile.mkdtemp(prefix="lftools-dl.") + previous_dir = os.getcwd() + work_dir = tempfile.mkdtemp(prefix="lftools-dl.") os.chdir(work_dir) - log.debug(f"work_dir: {work_dir}") + log.debug("work_dir: {}".format(work_dir)) build_details = open("_build-details.log", "w+") build_details.write("build-url: {}".format(build_url)) with open("_sys-info.log", "w+") as sysinfo_log: - sys_cmds: List[List[str]] = [] + sys_cmds = [] log.debug("Platform: {}".format(sys.platform)) if sys.platform == "linux" or sys.platform == "linux2": @@ -392,7 +381,7 @@ def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None: for c in sys_cmds: try: - output: str = subprocess.check_output(c).decode("utf-8") + output = subprocess.check_output(c).decode("utf-8") except OSError: # TODO: Switch to FileNotFoundError when Python < 3.5 support is dropped. log.debug("Command not found: {}".format(c)) continue @@ -404,7 +393,7 @@ def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None: build_details.close() # Magic string used to trim console logs at the appropriate level during wget - MAGIC_STRING: str = "-----END_OF_BUILD-----" + MAGIC_STRING = "-----END_OF_BUILD-----" log.info(MAGIC_STRING) resp = requests.get("{}/consoleText".format(_format_url(build_url))) @@ -417,7 +406,7 @@ def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None: _compress_text(work_dir) - console_zip: tempfile._TemporaryFileWrapper[bytes] = tempfile.NamedTemporaryFile(prefix="lftools-dl", delete=True) + console_zip = tempfile.NamedTemporaryFile(prefix="lftools-dl", delete=True) log.debug("console-zip: {}".format(console_zip.name)) shutil.make_archive(console_zip.name, "zip", work_dir) deploy_nexus_zip(nexus_url, "logs", nexus_path, "{}.zip".format(console_zip.name)) @@ -427,9 +416,7 @@ def deploy_logs(nexus_url: str, nexus_path: str, build_url: str) -> None: shutil.rmtree(work_dir) -def deploy_s3( - s3_bucket: str, s3_path: str, build_url: str, workspace: str, pattern: Optional[List[str]] = None -) -> None: +def deploy_s3(s3_bucket, s3_path, build_url, workspace, pattern=None): """Add logs and archives to temp directory to be shipped to S3 bucket. Fetches logs and system information and pushes them and archives to S3 @@ -450,45 +437,68 @@ def deploy_s3( archive. (optional) """ - def _upload_to_s3(file: str) -> bool: - guess: Tuple[Optional[str], Optional[str]] = mimetypes.guess_type(file) - - extra_args: Dict[str, str] = {"Content-Type": "text/plain"} - - if guess[0]: - extra_args["Content-Type"] = guess[0] - - if guess[1]: - extra_args["Content-Encoding"] = guess[1] - + def _upload_to_s3(file): + extra_args = {"ContentType": "text/plain"} + text_html_extra_args = {"ContentType": "text/html", "ContentEncoding": mimetypes.guess_type(file)[1]} + text_plain_extra_args = {"ContentType": "text/plain", "ContentEncoding": mimetypes.guess_type(file)[1]} + app_xml_extra_args = {"ContentType": "application/xml", "ContentEncoding": mimetypes.guess_type(file)[1]} if file == "_tmpfile": - for directory in (logs_dir, silo_dir, jenkins_node_dir): + for dir in (logs_dir, silo_dir, jenkins_node_dir): try: - s3.Bucket(s3_bucket).upload_file(file, f"{directory}{file}") + s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(dir, file)) except ClientError as e: log.error(e) return False - return True + if mimetypes.guess_type(file)[0] is None and mimetypes.guess_type(file)[1] is None: + try: + s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args) + except ClientError as e: + log.error(e) + return False + return True + elif mimetypes.guess_type(file)[0] is None or mimetypes.guess_type(file)[0] in "text/plain": + extra_args = text_plain_extra_args + try: + s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args) + except ClientError as e: + log.error(e) + return False + return True + elif mimetypes.guess_type(file)[0] in "text/html": + extra_args = text_html_extra_args + try: + s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args) + except ClientError as e: + log.error(e) + return False + return True + elif mimetypes.guess_type(file)[0] in "application/xml": + extra_args = app_xml_extra_args + try: + s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args) + except ClientError as e: + log.error(e) + return False + return True + else: + try: + s3.Bucket(s3_bucket).upload_file(file, "{}{}".format(s3_path, file), ExtraArgs=extra_args) + except ClientError as e: + log.error(e) + return False + return True - try: - s3.Bucket(s3_bucket).upload_file(file, f"{s3_path}{file}", ExtraArgs=extra_args) - except ClientError as e: - log.error(e) - return False - - return True - - previous_dir: str = os.getcwd() - work_dir: str = tempfile.mkdtemp(prefix="lftools-dl.") + previous_dir = os.getcwd() + work_dir = tempfile.mkdtemp(prefix="lftools-dl.") os.chdir(work_dir) s3_bucket = s3_bucket.lower() s3 = boto3.resource("s3") - logs_dir: str = s3_path.split("/")[0] + "/" - silo_dir: str = s3_path.split("/")[1] + "/" - jenkins_node_dir: str = logs_dir + silo_dir + s3_path.split("/")[2] + "/" + logs_dir = s3_path.split("/")[0] + "/" + silo_dir = s3_path.split("/")[1] + "/" + jenkins_node_dir = logs_dir + silo_dir + s3_path.split("/")[2] + "/" - log.debug(f"work_dir: {work_dir}") + log.debug("work_dir: {}".format(work_dir)) # Copy archive files to tmp dir copy_archives(workspace, pattern) @@ -528,7 +538,7 @@ def deploy_s3( sysinfo_log.close() # Magic string used to trim console logs at the appropriate level during wget - MAGIC_STRING: str = "-----END_OF_BUILD-----" + MAGIC_STRING = "-----END_OF_BUILD-----" log.info(MAGIC_STRING) resp = requests.get("{}/consoleText".format(_format_url(build_url))) @@ -550,8 +560,8 @@ def deploy_s3( _compress_text(work_dir) # Create file list to upload - file_list: List[str] = [] - files: List[str] = glob.glob("**/*", recursive=True) + file_list = [] + files = glob.glob("**/*", recursive=True) for file in files: if os.path.isfile(file): file_list.append(file) @@ -561,13 +571,13 @@ def deploy_s3( # Perform s3 upload for file in file_list: - log.info(f"Attempting to upload file {file}") + log.info("Attempting to upload file {}".format(file)) if _upload_to_s3(file): - log.info(f"Successfully uploaded {file}") + log.info("Successfully uploaded {}".format(file)) else: - log.error(f"FAILURE: Uploading {file} failed") + log.error("FAILURE: Uploading {} failed".format(file)) - log.info(f"Finished deploying from {work_dir} to {s3_bucket}/{s3_path}") + log.info("Finished deploying from {} to {}/{}".format(work_dir, s3_bucket, s3_path)) log.info("#######################################################") # Cleanup @@ -578,7 +588,7 @@ def deploy_s3( # shutil.rmtree(work_dir) -def deploy_nexus_zip(nexus_url: str, nexus_repo: str, nexus_path: str, zip_file: str) -> None: +def deploy_nexus_zip(nexus_url, nexus_repo, nexus_path, zip_file): """"Deploy zip file containing artifacts to Nexus using requests. This function simply takes a zip file preformatted in the correct @@ -605,23 +615,23 @@ def deploy_nexus_zip(nexus_url: str, nexus_repo: str, nexus_path: str, zip_file: tst_path \ tests/fixtures/deploy/zip-test-files/test.zip """ - url: str = "{}/service/local/repositories/{}/content-compressed/{}".format( + url = "{}/service/local/repositories/{}/content-compressed/{}".format( _format_url(nexus_url), nexus_repo, nexus_path ) - log.debug(f"Uploading {zip_file} to {url}") + log.debug("Uploading {} to {}".format(zip_file, url)) try: - resp: requests.Response = _request_post_file(url, zip_file) + resp = _request_post_file(url, zip_file) except requests.HTTPError as e: - files: List[str] = _get_filenames_in_zipfile(zip_file) - log.info(f"Uploading {zip_file} failed. It contained the following files") + files = _get_filenames_in_zipfile(zip_file) + log.info("Uploading {} failed. It contained the following files".format(zip_file)) for f in files: - log.info(f" {f}") - raise requests.HTTPError(e, response=resp) - log.debug(f"{resp.status_code}: {resp.text}") + log.info(" {}".format(f)) + raise requests.HTTPError(e) + log.debug("{}: {}".format(resp.status_code, resp.text)) -def nexus_stage_repo_create(nexus_url: str, staging_profile_id: str) -> str: +def nexus_stage_repo_create(nexus_url, staging_profile_id): """Create a Nexus staging repo. Parameters: @@ -636,7 +646,7 @@ def nexus_stage_repo_create(nexus_url: str, staging_profile_id: str) -> str: """ nexus_url = "{0}/service/local/staging/profiles/{1}/start".format(_format_url(nexus_url), staging_profile_id) - log.debug(f"Nexus URL = {nexus_url}") + log.debug("Nexus URL = {}".format(nexus_url)) xml = """ @@ -646,32 +656,32 @@ def nexus_stage_repo_create(nexus_url: str, staging_profile_id: str) -> str: """ - headers: Dict[str, str] = {"Content-Type": "application/xml"} - resp: requests.Response = _request_post(nexus_url, xml, headers) + headers = {"Content-Type": "application/xml"} + resp = _request_post(nexus_url, xml, headers) - log.debug(f"resp.status_code = {resp.status_code}") - log.debug(f"resp.text = {resp.text}") + log.debug("resp.status_code = {}".format(resp.status_code)) + log.debug("resp.text = {}".format(resp.text)) if re.search("nexus-error", resp.text): - error_msg: str = _get_node_from_xml(resp.text, "msg") + error_msg = _get_node_from_xml(resp.text, "msg") if re.search(".*profile with id:.*does not exist.", error_msg): - _log_error_and_exit(f"Staging profile id {staging_profile_id} not found.") + _log_error_and_exit("Staging profile id {} not found.".format(staging_profile_id)) _log_error_and_exit(error_msg) if resp.status_code == 405: _log_error_and_exit("HTTP method POST is not supported by this URL", nexus_url) if resp.status_code == 404: - _log_error_and_exit(f"Did not find nexus site: {nexus_url}") + _log_error_and_exit("Did not find nexus site: {}".format(nexus_url)) if not resp.status_code == 201: - _log_error_and_exit(f"Failed with status code {resp.status_code}", resp.text) + _log_error_and_exit("Failed with status code {}".format(resp.status_code), resp.text) staging_repo_id = _get_node_from_xml(resp.text, "stagedRepositoryId") - log.debug(f"staging_repo_id = {staging_repo_id}") + log.debug("staging_repo_id = {}".format(staging_repo_id)) return staging_repo_id -def nexus_stage_repo_close(nexus_url: str, staging_profile_id: str, staging_repo_id: str) -> None: +def nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id): """Close a Nexus staging repo. Parameters: @@ -685,8 +695,8 @@ def nexus_stage_repo_close(nexus_url: str, staging_profile_id: str, staging_repo """ nexus_url = "{0}/service/local/staging/profiles/{1}/finish".format(_format_url(nexus_url), staging_profile_id) - log.debug(f"Nexus URL = {nexus_url}") - log.debug(f"staging_repo_id = {staging_repo_id}") + log.debug("Nexus URL = {}".format(nexus_url)) + log.debug("staging_repo_id = {}".format(staging_repo_id)) xml = """ @@ -699,19 +709,19 @@ def nexus_stage_repo_close(nexus_url: str, staging_profile_id: str, staging_repo staging_repo_id ) - headers: Dict[str, str] = {"Content-Type": "application/xml"} - resp: requests.Response = _request_post(nexus_url, xml, headers) + headers = {"Content-Type": "application/xml"} + resp = _request_post(nexus_url, xml, headers) - log.debug(f"resp.status_code = {resp.status_code}") - log.debug(f"resp.text = {resp.text}") + log.debug("resp.status_code = {}".format(resp.status_code)) + log.debug("resp.text = {}".format(resp.text)) if re.search("nexus-error", resp.text): - error_msg: str = _get_node_from_xml(resp.text, "msg") + error_msg = _get_node_from_xml(resp.text, "msg") else: error_msg = resp.text if resp.status_code == 404: - _log_error_and_exit(f"Did not find nexus site: {nexus_url}") + _log_error_and_exit("Did not find nexus site: {}".format(nexus_url)) if re.search("invalid state: closed", error_msg): _log_error_and_exit("Staging repository is already closed.") @@ -719,19 +729,12 @@ def nexus_stage_repo_close(nexus_url: str, staging_profile_id: str, staging_repo _log_error_and_exit("Staging repository do not exist.") if not resp.status_code == 201: - _log_error_and_exit(f"Failed with status code {resp.status_code}", resp.text) + _log_error_and_exit("Failed with status code {}".format(resp.status_code), resp.text) def upload_maven_file_to_nexus( - nexus_url: str, - nexus_repo_id: str, - group_id: str, - artifact_id: str, - version: str, - packaging: str, - file: str, - classifier: Optional[str] = None, -) -> None: + nexus_url, nexus_repo_id, group_id, artifact_id, version, packaging, file, classifier=None +): """Upload file to Nexus as a Maven artifact. This function will upload an artifact to Nexus while providing all of @@ -756,26 +759,26 @@ def upload_maven_file_to_nexus( """ url = "{}/service/local/artifact/maven/content".format(_format_url(nexus_url)) - log.info(f"Uploading URL: {url}") + log.info("Uploading URL: {}".format(url)) params = {} - params.update({"r": (None, f"{nexus_repo_id}")}) - params.update({"g": (None, f"{group_id}")}) - params.update({"a": (None, f"{artifact_id}")}) - params.update({"v": (None, f"{version}")}) - params.update({"p": (None, f"{packaging}")}) + params.update({"r": (None, "{}".format(nexus_repo_id))}) + params.update({"g": (None, "{}".format(group_id))}) + params.update({"a": (None, "{}".format(artifact_id))}) + params.update({"v": (None, "{}".format(version))}) + params.update({"p": (None, "{}".format(packaging))}) if classifier: - params.update({"c": (None, f"{classifier}")}) + params.update({"c": (None, "{}".format(classifier))}) - log.debug(f"Maven Parameters: {params}") + log.debug("Maven Parameters: {}".format(params)) - resp: requests.Response = _request_post_file(url, file, params) + resp = _request_post_file(url, file, params) if re.search("nexus-error", resp.text): - error_msg: str = _get_node_from_xml(resp.text, "msg") - raise requests.HTTPError(f"Nexus Error: {error_msg}", response=resp) + error_msg = _get_node_from_xml(resp.text, "msg") + raise requests.HTTPError("Nexus Error: {}".format(error_msg)) -def deploy_nexus(nexus_repo_url: str, deploy_dir: str, snapshot: bool = False, workers: int = 2) -> None: +def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False, workers=2): """Deploy a local directory of files to a Nexus repository. One purpose of this is so that we can get around the problematic @@ -799,17 +802,17 @@ def deploy_nexus(nexus_repo_url: str, deploy_dir: str, snapshot: bool = False, w tests/fixtures/deploy/zip-test-files """ - def _get_filesize(file: str) -> str: + def _get_filesize(file): bytesize = os.path.getsize(file) if bytesize == 0: return "0B" - suffix: List[str] = ["b", "kb", "mb", "gb"] - i: int = int(math.floor(math.log(bytesize, 1024))) - p: int = int(math.pow(1024, i)) - s: int = int(round(bytesize / p, 2)) - return f"{s} {suffix[i]}" + suffix = ("b", "kb", "mb", "gb") + i = int(math.floor(math.log(bytesize, 1024))) + p = math.pow(1024, i) + s = round(bytesize / p, 2) + return "{} {}".format(s, suffix[i]) - def _deploy_nexus_upload(file: str) -> bool: + def _deploy_nexus_upload(file): # Fix file path, and call _request_put_file. nexus_url_with_file = "{}/{}".format(_format_url(nexus_repo_url), file) log.info("Attempting to upload {} ({})".format(file, _get_filesize(file))) @@ -818,13 +821,13 @@ def deploy_nexus(nexus_repo_url: str, deploy_dir: str, snapshot: bool = False, w else: return False - file_list: List[str] = [] - previous_dir: str = os.getcwd() + file_list = [] + previous_dir = os.getcwd() os.chdir(deploy_dir) - files: List[str] = glob.glob("**/*", recursive=True) + files = glob.glob("**/*", recursive=True) for file in files: if os.path.isfile(file): - base_name: str = os.path.basename(file) + base_name = os.path.basename(file) # Skip blacklisted files if base_name == "_remote.repositories" or base_name == "resolver-status.properties": @@ -837,37 +840,38 @@ def deploy_nexus(nexus_repo_url: str, deploy_dir: str, snapshot: bool = False, w file_list.append(file) log.info("#######################################################") - log.info(f"Deploying directory {deploy_dir} to {nexus_repo_url}") + log.info("Deploying directory {} to {}".format(deploy_dir, nexus_repo_url)) with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: # this creates a dict where the key is the Future object, and the value is the file name # see concurrent.futures.Future for more info - futures: Dict[Future[Any], str] = { - executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list - } + futures = {executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list} for future in concurrent.futures.as_completed(futures): - filename: str = futures[future] + filename = futures[future] try: - future.result() + data = future.result() + # remove pyflake warning + if data == data: + pass except Exception as e: - log.error(f"Uploading {filename}: {e}") + log.error("Uploading {}: {}".format(filename, e)) # wait until all threads complete (successfully or not) # then log the results of the upload threads concurrent.futures.wait(futures) for k, v in futures.items(): if k.result(): - log.info(f"Successfully uploaded {v}") + log.info("Successfully uploaded {}".format(v)) else: - log.error(f"FAILURE: Uploading {v} failed") + log.error("FAILURE: Uploading {} failed".format(v)) - log.info(f"Finished deploying {deploy_dir} to {nexus_repo_url}") + log.info("Finished deploying {} to {}".format(deploy_dir, nexus_repo_url)) log.info("#######################################################") os.chdir(previous_dir) -def deploy_nexus_stage(nexus_url: str, staging_profile_id: str, deploy_dir: str) -> None: +def deploy_nexus_stage(nexus_url, staging_profile_id, deploy_dir): """Deploy Maven artifacts to Nexus staging repo. Parameters: @@ -886,18 +890,18 @@ def deploy_nexus_stage(nexus_url: str, staging_profile_id: str, deploy_dir: str) ~/LF/work/lftools-dev/lftools/shell Completed uploading files to aaf-1005. """ - staging_repo_id: str = nexus_stage_repo_create(nexus_url, staging_profile_id) - log.info(f"Staging repository {staging_repo_id} created.") + staging_repo_id = nexus_stage_repo_create(nexus_url, staging_profile_id) + log.info("Staging repository {} created.".format(staging_repo_id)) deploy_nexus_url = "{0}/service/local/staging/deployByRepositoryId/{1}".format( _format_url(nexus_url), staging_repo_id ) - sz_m2repo: int = sum(os.path.getsize(f) for f in os.listdir(deploy_dir) if os.path.isfile(f)) - log.debug(f"Staging repository upload size: {sz_m2repo} bytes") + sz_m2repo = sum(os.path.getsize(f) for f in os.listdir(deploy_dir) if os.path.isfile(f)) + log.debug("Staging repository upload size: {} bytes".format(sz_m2repo)) log.debug("Nexus Staging URL: {}".format(_format_url(deploy_nexus_url))) deploy_nexus(deploy_nexus_url, deploy_dir) nexus_stage_repo_close(nexus_url, staging_profile_id, staging_repo_id) - log.info(f"Completed uploading files to {staging_repo_id}.") + log.info("Completed uploading files to {}.".format(staging_repo_id))