From aa759ec74910138dec6c305d0ac94cfc15052b07 Mon Sep 17 00:00:00 2001 From: DW Talton Date: Tue, 20 Aug 2019 12:24:48 -0700 Subject: [PATCH] Refactored multithreaded nexus upload Refactored multithreaded nexus upload to use concurrent.futures which allows for easy information gathering. Added additional error code trapping to various requests methods, and overall improved message output of the entire upload process from end to end. ISSUE: RELENG-2264 Signed-off-by: DW Talton Change-Id: I0707342b2017cdfe0139738ddff5123b110ddec2 --- lftools/deploy.py | 108 +++++++++++++++------ .../notes/deploy_nexus-6e26fda6a3b0c5b1.yaml | 10 ++ 2 files changed, 86 insertions(+), 32 deletions(-) mode change 100644 => 100755 lftools/deploy.py create mode 100644 releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml diff --git a/lftools/deploy.py b/lftools/deploy.py old mode 100644 new mode 100755 index 08b6784d..fc3d349f --- a/lftools/deploy.py +++ b/lftools/deploy.py @@ -11,25 +11,26 @@ """Library of functions for deploying artifacts to Nexus.""" from datetime import timedelta +from defusedxml.minidom import parseString +from multiprocessing import cpu_count +import concurrent.futures import errno +import glob2 # Switch to glob when Python < 3.5 support is dropped import gzip import io import logging -import multiprocessing -from multiprocessing.dummy import Pool as ThreadPool +import math import os import re +import requests import shutil +import six import subprocess import sys import tempfile import time import zipfile -from defusedxml.minidom import parseString -import glob2 # Switch to glob when Python < 3.5 support is dropped -import requests -import six log = logging.getLogger(__name__) @@ -150,23 +151,31 @@ def _request_put_file(url, file_to_upload, parameters=None): else: resp = requests.put(url, data=upload_file.read()) except requests.exceptions.MissingSchema: - raise requests.HTTPError("Not valid URL: {}".format(url)) + raise requests.HTTPError("Not valid URL format. Check for https:// etc..: {}".format(url)) + except requests.exceptions.ConnectTimeout: + raise requests.HTTPError("Timed out connecting to {}".format(url)) + except requests.exceptions.ReadTimeout: + raise requests.HTTPError("Timed out waiting for the server to reply ({})".format(url)) except requests.exceptions.ConnectionError: - raise requests.HTTPError("Could not connect to URL: {}".format(url)) + raise requests.HTTPError("A connection error occurred ({})".format(url)) except requests.exceptions.InvalidURL: - raise requests.HTTPError("Invalid URL: {}".format(url)) + raise requests.HTTPError("Invalid URL format: {}".format(url)) + except requests.RequestException as e: + log.error(e) + if resp.status_code == 201: + return True if resp.status_code == 400: raise requests.HTTPError("Repository is read only") - elif resp.status_code == 404: + if resp.status_code == 401: + raise requests.HTTPError("Invalid repository credentials") + if resp.status_code == 404: raise requests.HTTPError("Did not find repository.") if not str(resp.status_code).startswith('20'): raise requests.HTTPError("Failed to upload to Nexus with status code: {}.\n{}\n{}".format( resp.status_code, resp.text, file_to_upload)) - return resp - def _get_node_from_xml(xml_data, tag_name): """Extract tag data from xml data.""" @@ -583,10 +592,17 @@ def upload_maven_file_to_nexus(nexus_url, nexus_repo_id, raise requests.HTTPError("Nexus Error: {}".format(error_msg)) + + + def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False): - """Deploy a local directory to a Nexus repository. + """Deploy a local directory of files to a Nexus repository. + + One purpose of this is so that we can get around the problematic + deploy-at-end configuration with upstream Maven. + https://issues.apache.org/jira/browse/MDEPLOY-193 - This function intentially strips out the following files: + This function ignores these files: - _remote.repositories - resolver-status.properties @@ -597,26 +613,33 @@ def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False): (Ex: https://nexus.example.org/content/repositories/releases) deploy_dir: The directory to deploy. (Ex: /tmp/m2repo) - One purpose of this is so that we can get around the problematic - deploy-at-end configuration with upstream Maven. - https://issues.apache.org/jira/browse/MDEPLOY-193 Sample: lftools deploy nexus \ http://192.168.1.26:8081/nexus/content/repositories/releases \ tests/fixtures/deploy/zip-test-files """ + def _get_filesize(file): + bytesize = os.path.getsize(file) + if bytesize == 0: + return "0B" + suffix = ("b", "kb", "mb", "gb") + i = int(math.floor(math.log(bytesize, 1024))) + p = math.pow(1024, i) + s = round(bytesize / p, 2) + return "{} {}".format(s, suffix[i]) + def _deploy_nexus_upload(file): - """Fix file path, and call _request_put_file.""" + # Fix file path, and call _request_put_file. nexus_url_with_file = '{}/{}'.format(_format_url(nexus_repo_url), file) - log.info('Uploading {}'.format(file)) - _request_put_file(nexus_url_with_file, file) - log.debug('Uploaded {}'.format(file)) + log.info("Attempting to upload {} ({})".format(file, _get_filesize(file))) + if _request_put_file(nexus_url_with_file, file): + return True + else: + return False file_list = [] - previous_dir = os.getcwd() os.chdir(deploy_dir) - log.info('Deploying directory {} to {}'.format(deploy_dir, nexus_repo_url)) files = glob2.glob('**/*') for file in files: if os.path.isfile(file): @@ -633,14 +656,35 @@ def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False): file_list.append(file) - # Perform parallel upload - upload_start = time.time() - pool = ThreadPool(multiprocessing.cpu_count()) - pool.map(_deploy_nexus_upload, file_list) - pool.close() - pool.join() - upload_time = time.time() - upload_start - log.info("Uploaded in {} seconds.".format(timedelta(seconds=round(upload_time)))) + # Multithreaded upload + # default threads to CPU count / 2 so we're a nice neighbor to other builds + + log.info("#######################################################") + log.info('Deploying directory {} to {}'.format(deploy_dir, nexus_repo_url)) + + workers = int(cpu_count() / 2) + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + # this creates a dict where the key is the Future object, and the value is the file name + # see concurrent.futures.Future for more info + futures = {executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list} + for future in concurrent.futures.as_completed(futures): + filename = futures[future] + try: + data = future.result() + except Exception as e: + log.error('Uploading {}: {}'.format(filename, e)) + + # wait until all threads complete (successfully or not) + # then log the results of the upload threads + concurrent.futures.wait(futures) + for k, v in futures.items(): + if k.result(): + log.info("Successfully uploaded {}".format(v)) + else: + log.error("FAILURE: Uploading {} failed".format(v)) + + log.info("Finished deploying {} to {}".format(deploy_dir, nexus_repo_url)) + log.info("#######################################################") os.chdir(previous_dir) @@ -654,7 +698,7 @@ def deploy_nexus_stage(nexus_url, staging_profile_id, deploy_dir): staging repo. deploy_dir: The directory to deploy. (Ex: /tmp/m2repo) - Sample: + # Sample: lftools deploy nexus-stage http://192.168.1.26:8081/nexus 4e6f95cd2344 /tmp/slask Deploying Maven artifacts to staging repo... Staging repository aaf-1005 created. diff --git a/releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml b/releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml new file mode 100644 index 00000000..db67638c --- /dev/null +++ b/releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml @@ -0,0 +1,10 @@ +--- +fixes: + - | + Refactored deploy_nexus to use concurrent.futures rather than multiprocessing. + This allows for non-blocking I/O, and also allows for easy state tracking. + It should also fix any random failures that are hard to troubleshoot. +features: + - | + Added a get_filesize method to calculate filesize is an appropriate format. + This may be useful in logs if an upload fails. -- 2.16.6