Refactored multithreaded nexus upload 25/16625/6
authorDW Talton <dtalton@contractor.linuxfoundation.org>
Tue, 20 Aug 2019 19:24:48 +0000 (12:24 -0700)
committerDW Talton <dtalton@contractor.linuxfoundation.org>
Thu, 3 Oct 2019 21:41:19 +0000 (14:41 -0700)
Refactored multithreaded nexus upload to use concurrent.futures
which allows for easy information gathering. Added additional
error code trapping to various requests methods, and overall
improved message output of the entire upload process from end
to end.

ISSUE: RELENG-2264

Signed-off-by: DW Talton <dtalton@contractor.linuxfoundation.org>
Change-Id: I0707342b2017cdfe0139738ddff5123b110ddec2

lftools/deploy.py [changed mode: 0644->0755]
releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml [new file with mode: 0644]

old mode 100644 (file)
new mode 100755 (executable)
index 08b6784..fc3d349
 """Library of functions for deploying artifacts to Nexus."""
 
 from datetime import timedelta
+from defusedxml.minidom import parseString
+from multiprocessing import cpu_count
+import concurrent.futures
 import errno
+import glob2  # Switch to glob when Python < 3.5 support is dropped
 import gzip
 import io
 import logging
-import multiprocessing
-from multiprocessing.dummy import Pool as ThreadPool
+import math
 import os
 import re
+import requests
 import shutil
+import six
 import subprocess
 import sys
 import tempfile
 import time
 import zipfile
 
-from defusedxml.minidom import parseString
-import glob2  # Switch to glob when Python < 3.5 support is dropped
-import requests
-import six
 
 log = logging.getLogger(__name__)
 
@@ -150,23 +151,31 @@ def _request_put_file(url, file_to_upload, parameters=None):
         else:
             resp = requests.put(url, data=upload_file.read())
     except requests.exceptions.MissingSchema:
-        raise requests.HTTPError("Not valid URL: {}".format(url))
+        raise requests.HTTPError("Not valid URL format. Check for https:// etc..: {}".format(url))
+    except requests.exceptions.ConnectTimeout:
+        raise requests.HTTPError("Timed out connecting to {}".format(url))
+    except requests.exceptions.ReadTimeout:
+        raise requests.HTTPError("Timed out waiting for the server to reply ({})".format(url))
     except requests.exceptions.ConnectionError:
-        raise requests.HTTPError("Could not connect to URL: {}".format(url))
+        raise requests.HTTPError("A connection error occurred ({})".format(url))
     except requests.exceptions.InvalidURL:
-        raise requests.HTTPError("Invalid URL: {}".format(url))
+        raise requests.HTTPError("Invalid URL format: {}".format(url))
+    except requests.RequestException as e:
+        log.error(e)
 
+    if resp.status_code == 201:
+        return True
     if resp.status_code == 400:
         raise requests.HTTPError("Repository is read only")
-    elif resp.status_code == 404:
+    if resp.status_code == 401:
+        raise requests.HTTPError("Invalid repository credentials")
+    if resp.status_code == 404:
         raise requests.HTTPError("Did not find repository.")
 
     if not str(resp.status_code).startswith('20'):
         raise requests.HTTPError("Failed to upload to Nexus with status code: {}.\n{}\n{}".format(
                 resp.status_code, resp.text, file_to_upload))
 
-    return resp
-
 
 def _get_node_from_xml(xml_data, tag_name):
     """Extract tag data from xml data."""
@@ -583,10 +592,17 @@ def upload_maven_file_to_nexus(nexus_url, nexus_repo_id,
         raise requests.HTTPError("Nexus Error: {}".format(error_msg))
 
 
+
+
+
 def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False):
-    """Deploy a local directory to a Nexus repository.
+    """Deploy a local directory of files to a Nexus repository.
+
+    One purpose of this is so that we can get around the problematic
+    deploy-at-end configuration with upstream Maven.
+    https://issues.apache.org/jira/browse/MDEPLOY-193
 
-    This function intentially strips out the following files:
+    This function ignores these files:
 
         - _remote.repositories
         - resolver-status.properties
@@ -597,26 +613,33 @@ def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False):
                         (Ex: https://nexus.example.org/content/repositories/releases)
         deploy_dir:     The directory to deploy. (Ex: /tmp/m2repo)
 
-    One purpose of this is so that we can get around the problematic
-    deploy-at-end configuration with upstream Maven.
-    https://issues.apache.org/jira/browse/MDEPLOY-193
     Sample:
         lftools deploy nexus \
             http://192.168.1.26:8081/nexus/content/repositories/releases \
             tests/fixtures/deploy/zip-test-files
     """
+    def _get_filesize(file):
+        bytesize = os.path.getsize(file)
+        if bytesize == 0:
+            return "0B"
+        suffix = ("b", "kb", "mb", "gb")
+        i = int(math.floor(math.log(bytesize, 1024)))
+        p = math.pow(1024, i)
+        s = round(bytesize / p, 2)
+        return "{} {}".format(s, suffix[i])
+
     def _deploy_nexus_upload(file):
-        """Fix file path, and call _request_put_file."""
+        # Fix file path, and call _request_put_file.
         nexus_url_with_file = '{}/{}'.format(_format_url(nexus_repo_url), file)
-        log.info('Uploading {}'.format(file))
-        _request_put_file(nexus_url_with_file, file)
-        log.debug('Uploaded {}'.format(file))
+        log.info("Attempting to upload {} ({})".format(file, _get_filesize(file)))
+        if _request_put_file(nexus_url_with_file, file):
+            return True
+        else:
+            return False
 
     file_list = []
-
     previous_dir = os.getcwd()
     os.chdir(deploy_dir)
-    log.info('Deploying directory {} to {}'.format(deploy_dir, nexus_repo_url))
     files = glob2.glob('**/*')
     for file in files:
         if os.path.isfile(file):
@@ -633,14 +656,35 @@ def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False):
 
             file_list.append(file)
 
-    # Perform parallel upload
-    upload_start = time.time()
-    pool = ThreadPool(multiprocessing.cpu_count())
-    pool.map(_deploy_nexus_upload, file_list)
-    pool.close()
-    pool.join()
-    upload_time = time.time() - upload_start
-    log.info("Uploaded in {} seconds.".format(timedelta(seconds=round(upload_time))))
+    # Multithreaded upload
+    # default threads to CPU count / 2 so we're a nice neighbor to other builds
+
+    log.info("#######################################################")
+    log.info('Deploying directory {} to {}'.format(deploy_dir, nexus_repo_url))
+
+    workers = int(cpu_count() / 2)
+    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
+        # this creates a dict where the key is the Future object, and the value is the file name
+        # see concurrent.futures.Future for more info
+        futures = {executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list}
+        for future in concurrent.futures.as_completed(futures):
+            filename = futures[future]
+            try:
+                data = future.result()
+            except Exception as e:
+                log.error('Uploading {}: {}'.format(filename, e))
+
+        # wait until all threads complete (successfully or not)
+        # then log the results of the upload threads
+        concurrent.futures.wait(futures)
+        for k, v in futures.items():
+            if k.result():
+                log.info("Successfully uploaded {}".format(v))
+            else:
+                log.error("FAILURE: Uploading {} failed".format(v))
+
+    log.info("Finished deploying {} to {}".format(deploy_dir, nexus_repo_url))
+    log.info("#######################################################")
 
     os.chdir(previous_dir)
 
@@ -654,7 +698,7 @@ def deploy_nexus_stage(nexus_url, staging_profile_id, deploy_dir):
                         staging repo.
     deploy_dir:         The directory to deploy. (Ex: /tmp/m2repo)
 
-    Sample:
+   # Sample:
         lftools deploy nexus-stage http://192.168.1.26:8081/nexus 4e6f95cd2344 /tmp/slask
             Deploying Maven artifacts to staging repo...
             Staging repository aaf-1005 created.
diff --git a/releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml b/releasenotes/notes/deploy_nexus-6e26fda6a3b0c5b1.yaml
new file mode 100644 (file)
index 0000000..db67638
--- /dev/null
@@ -0,0 +1,10 @@
+---
+fixes:
+  - |
+    Refactored deploy_nexus to use concurrent.futures rather than multiprocessing.
+    This allows for non-blocking I/O, and also allows for easy state tracking.
+    It should also fix any random failures that are hard to troubleshoot.
+features:
+  - |
+    Added a get_filesize method to calculate filesize is an appropriate format.
+    This may be useful in logs if an upload fails.