"""Library of functions for deploying artifacts to Nexus."""
from datetime import timedelta
+from defusedxml.minidom import parseString
+from multiprocessing import cpu_count
+import concurrent.futures
import errno
+import glob2 # Switch to glob when Python < 3.5 support is dropped
import gzip
import io
import logging
-import multiprocessing
-from multiprocessing.dummy import Pool as ThreadPool
+import math
import os
import re
+import requests
import shutil
+import six
import subprocess
import sys
import tempfile
import time
import zipfile
-from defusedxml.minidom import parseString
-import glob2 # Switch to glob when Python < 3.5 support is dropped
-import requests
-import six
log = logging.getLogger(__name__)
resp = requests.put(url, data=upload_file.read())
except requests.exceptions.MissingSchema:
- raise requests.HTTPError("Not valid URL: {}".format(url))
+ raise requests.HTTPError("Not valid URL format. Check for https:// etc..: {}".format(url))
+ except requests.exceptions.ConnectTimeout:
+ raise requests.HTTPError("Timed out connecting to {}".format(url))
+ except requests.exceptions.ReadTimeout:
+ raise requests.HTTPError("Timed out waiting for the server to reply ({})".format(url))
except requests.exceptions.ConnectionError:
- raise requests.HTTPError("Could not connect to URL: {}".format(url))
+ raise requests.HTTPError("A connection error occurred ({})".format(url))
except requests.exceptions.InvalidURL:
- raise requests.HTTPError("Invalid URL: {}".format(url))
+ raise requests.HTTPError("Invalid URL format: {}".format(url))
+ except requests.RequestException as e:
+ log.error(e)
+ if resp.status_code == 201:
+ return True
if resp.status_code == 400:
raise requests.HTTPError("Repository is read only")
- elif resp.status_code == 404:
+ if resp.status_code == 401:
+ raise requests.HTTPError("Invalid repository credentials")
+ if resp.status_code == 404:
raise requests.HTTPError("Did not find repository.")
if not str(resp.status_code).startswith('20'):
raise requests.HTTPError("Failed to upload to Nexus with status code: {}.\n{}\n{}".format(
resp.status_code, resp.text, file_to_upload))
- return resp
def _get_node_from_xml(xml_data, tag_name):
"""Extract tag data from xml data."""
raise requests.HTTPError("Nexus Error: {}".format(error_msg))
def deploy_nexus(nexus_repo_url, deploy_dir, snapshot=False):
- """Deploy a local directory to a Nexus repository.
+ """Deploy a local directory of files to a Nexus repository.
+ One purpose of this is so that we can get around the problematic
+ deploy-at-end configuration with upstream Maven.
+ https://issues.apache.org/jira/browse/MDEPLOY-193
- This function intentially strips out the following files:
+ This function ignores these files:
- _remote.repositories
- resolver-status.properties
(Ex: https://nexus.example.org/content/repositories/releases)
deploy_dir: The directory to deploy. (Ex: /tmp/m2repo)
- One purpose of this is so that we can get around the problematic
- deploy-at-end configuration with upstream Maven.
- https://issues.apache.org/jira/browse/MDEPLOY-193
lftools deploy nexus \ \
+ def _get_filesize(file):
+ bytesize = os.path.getsize(file)
+ if bytesize == 0:
+ return "0B"
+ suffix = ("b", "kb", "mb", "gb")
+ i = int(math.floor(math.log(bytesize, 1024)))
+ p = math.pow(1024, i)
+ s = round(bytesize / p, 2)
+ return "{} {}".format(s, suffix[i])
def _deploy_nexus_upload(file):
- """Fix file path, and call _request_put_file."""
+ # Fix file path, and call _request_put_file.
nexus_url_with_file = '{}/{}'.format(_format_url(nexus_repo_url), file)
- log.info('Uploading {}'.format(file))
- _request_put_file(nexus_url_with_file, file)
- log.debug('Uploaded {}'.format(file))
+ log.info("Attempting to upload {} ({})".format(file, _get_filesize(file)))
+ if _request_put_file(nexus_url_with_file, file):
+ return True
+ else:
+ return False
file_list = []
previous_dir = os.getcwd()
- log.info('Deploying directory {} to {}'.format(deploy_dir, nexus_repo_url))
files = glob2.glob('**/*')
for file in files:
if os.path.isfile(file):
- # Perform parallel upload
- upload_start = time.time()
- pool = ThreadPool(multiprocessing.cpu_count())
- pool.map(_deploy_nexus_upload, file_list)
- pool.close()
- pool.join()
- upload_time = time.time() - upload_start
- log.info("Uploaded in {} seconds.".format(timedelta(seconds=round(upload_time))))
+ # Multithreaded upload
+ # default threads to CPU count / 2 so we're a nice neighbor to other builds
+ log.info("#######################################################")
+ log.info('Deploying directory {} to {}'.format(deploy_dir, nexus_repo_url))
+ workers = int(cpu_count() / 2)
+ with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
+ # this creates a dict where the key is the Future object, and the value is the file name
+ # see concurrent.futures.Future for more info
+ futures = {executor.submit(_deploy_nexus_upload, file_name): file_name for file_name in file_list}
+ for future in concurrent.futures.as_completed(futures):
+ filename = futures[future]
+ try:
+ data = future.result()
+ except Exception as e:
+ log.error('Uploading {}: {}'.format(filename, e))
+ # wait until all threads complete (successfully or not)
+ # then log the results of the upload threads
+ concurrent.futures.wait(futures)
+ for k, v in futures.items():
+ if k.result():
+ log.info("Successfully uploaded {}".format(v))
+ else:
+ log.error("FAILURE: Uploading {} failed".format(v))
+ log.info("Finished deploying {} to {}".format(deploy_dir, nexus_repo_url))
+ log.info("#######################################################")
staging repo.
deploy_dir: The directory to deploy. (Ex: /tmp/m2repo)
- Sample:
+ # Sample:
lftools deploy nexus-stage 4e6f95cd2344 /tmp/slask
Deploying Maven artifacts to staging repo...
Staging repository aaf-1005 created.