return resp
-def _request_post_file(url, file_to_upload):
+def _request_post_file(url, file_to_upload, parameters=None):
"""Execute a request post, return the resp."""
resp = {}
try:
files = {'file': upload_file}
try:
- resp = requests.post(url, files=files)
+ if parameters:
+ resp = requests.post(url, data=parameters, files=files)
+ else:
+ resp = requests.post(url, files=files)
except requests.exceptions.MissingSchema:
raise requests.HTTPError("Not valid URL: {}".format(url))
except requests.exceptions.ConnectionError:
raise requests.HTTPError("Failed to upload to Nexus with status code: {}.\n{}\n{}".format(
resp.status_code, resp.text, zipfile.ZipFile(file_to_upload).infolist()))
else:
- raise requests.HTTPError("Failed to upload to Nexus with status code: {}.\n{}\n{}\{}".format(
+ raise requests.HTTPError("Failed to upload to Nexus with status code: {}.\n{}\n{}".format(
resp.status_code, resp.text, file_to_upload))
return resp
if not resp.status_code == 201:
_log_error_and_exit("Failed with status code {}".format(resp.status_code), resp.text)
+
+
+def upload_maven_file_to_nexus(nexus_url, nexus_repo_id,
+ group_id, artifact_id, version,
+ packaging, file, classifier=None):
+ """Upload file to Nexus as a Maven artifact.
+
+ This function will upload an artifact to Nexus while providing all of
+ the usual Maven pom.xml information so that it conforms to Maven 2 repo
+ specs.
+
+ Parameters:
+ nexus_url: The URL to the Nexus repo.
+ (Ex: https://nexus.example.org)
+ nexus_repo_id: Repo ID of repo to push artifact to.
+ group_id: Maven style Group ID to upload artifact as.
+ artifact_id: Maven style Artifact ID to upload artifact as.
+ version: Maven style Version to upload artifact as.
+ packaging: Packaging type to upload as (Eg. tar.xz)
+ file: File to upload.
+ classifier: Maven classifier. (optional)
+
+ Sample:
+ lftools deploy nexus \
+ http://192.168.1.26:8081/nexus/content/repositories/releases \
+ tests/fixtures/deploy/zip-test-files
+ """
+ url = '{}/service/local/artifact/maven/content'.format(_format_url(nexus_url))
+
+ log.info('Uploading URL: {}'.format(url))
+ params = {}
+ params.update({'r': (None, '{}'.format(nexus_repo_id))})
+ params.update({'g': (None, '{}'.format(group_id))})
+ params.update({'a': (None, '{}'.format(artifact_id))})
+ params.update({'v': (None, '{}'.format(version))})
+ params.update({'p': (None, '{}'.format(packaging))})
+ if classifier:
+ params.update({'c': (None, '{}'.format(classifier))})
+
+ log.debug('Maven Parameters: {}'.format(params))
+
+ resp = _request_post_file(url, file, params)
+
+ if re.search('nexus-error', resp.text):
+ error_msg = _get_node_from_xml(resp.text, 'msg')
+ raise requests.HTTPError("Nexus Error: {}".format(error_msg))
assert 'Failed to upload to Nexus with status code' in str(excinfo.value)
+def test__request_post_file_data(responses, mocker):
+ """Test _request_post_file."""
+
+ param={'r':(None, 'testing')}
+ zip_file='zip-test-files/test.zip'
+ resp = {}
+ test_url='http://connection.error.test'
+ exception = requests.exceptions.ConnectionError(test_url)
+ responses.add(responses.POST, test_url, body=exception)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys._request_post_file(test_url, zip_file, param)
+ assert 'Could not connect to URL' in str(excinfo.value)
+
+ test_url='http://invalid.url.test:8081'
+ exception = requests.exceptions.InvalidURL(test_url)
+ responses.add(responses.POST, test_url, body=exception)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys._request_post_file(test_url, zip_file, param)
+ assert 'Invalid URL' in str(excinfo.value)
+
+ test_url='http://missing.schema.test:8081'
+ exception = requests.exceptions.MissingSchema(test_url)
+ responses.add(responses.POST, test_url, body=exception)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys._request_post_file(test_url, zip_file, param)
+ assert 'Not valid URL' in str(excinfo.value)
+
+ test_url='http://repository.read.only:8081'
+ responses.add(responses.POST, test_url, body=None, status=400)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys._request_post_file(test_url, zip_file, param)
+ assert 'Repository is read only' in str(excinfo.value)
+
+ test_url='http://repository.not.found:8081'
+ responses.add(responses.POST, test_url, body=None, status=404)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys._request_post_file(test_url, zip_file, param)
+ assert 'Did not find repository' in str(excinfo.value)
+
+ test_url='http://other.upload.error:8081'
+ responses.add(responses.POST, test_url, body=None, status=500)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys._request_post_file(test_url, zip_file, param)
+ assert 'Failed to upload to Nexus with status code' in str(excinfo.value)
+
+
def test_nexus_stage_repo_close(responses, mocker):
"""Test nexus_stage_repo_close."""
mocker.patch('lftools.deploy._log_error_and_exit', side_effect=mocked_log_error)
with pytest.raises(ValueError) as excinfo:
res = deploy_sys.nexus_stage_repo_create('site.not.found', 'INVALID')
assert 'site.not.found' in str(excinfo.value)
+
+
+def test__upload_maven_file_to_nexus(responses, mocker):
+ """Test upload_to_nexus."""
+
+ zip_file='zip-test-files/test.tar.xz'
+ common_urlpart='service/local/artifact/maven/content'
+
+ nexus_repo_id='testing'
+ group_id='com5.test'
+ artifact_id='ArtId2'
+ version='1.2.7'
+ packaging='tar.xz'
+ classified=None
+
+ resp = {}
+
+ test_url='http://all.ok.upload:8081'
+ responses.add(responses.POST, '{}/{}'.format(test_url, common_urlpart), body=None, status=201)
+ resp = deploy_sys.upload_maven_file_to_nexus(test_url, nexus_repo_id, group_id, artifact_id, version, packaging, zip_file)
+
+ xml_other_error = """
+ <nexus-error><errors><error>
+ <id>*</id>
+ <msg>Something went wrong.</msg>
+ </error></errors></nexus-error>
+ """
+ test_url='http://something.went.wrong:8081'
+ responses.add(responses.POST, '{}/{}'.format(test_url, common_urlpart), body=xml_other_error, status=405)
+ with pytest.raises(requests.HTTPError) as excinfo:
+ resp = deploy_sys.upload_maven_file_to_nexus(test_url, nexus_repo_id, group_id, artifact_id, version, packaging, zip_file)
+ assert 'Something went wrong' in str(excinfo.value)