--- /dev/null
+# -*- code: utf-8 -*-
+# vim: sw=4 ts=4 sts=4 et :
+
+#
+
+"""
+Library for working with Sonatype Nexus REST API
+"""
+
+__title__ = 'nexus'
+__version__ = '0.1.0'
+__build__ = 0x000101
+__author__ = 'Andrew Grimberg'
+__license__ = 'Apache 2.0'
+__copyright__ = 'Copyright 2017 Andrew Grimberg'
+
+import requests
+import json
+from requests.auth import HTTPBasicAuth
+
+class Nexus:
+ def __init__(self, baseurl=None, username=None, password=None):
+ self.baseurl = baseurl
+
+ if username and password:
+ self.add_credentials(username, password)
+ else:
+ self.auth = None
+
+ self.headers = {
+ 'Accept': 'application/json',
+ 'Content-Type': 'application/json',
+ }
+
+ def add_credentials(self, username, password):
+ """
+ Create an authentication object to be used.
+ """
+ self.auth = HTTPBasicAuth(username, password)
+
+ def add_baseurl(self, url):
+ """
+ Set the base URL for nexus
+ """
+ self.baseurl = url
+
+ def get_target(self, name):
+ """
+ Get the ID of a given target name
+ """
+ url = '/'.join([self.baseurl, 'service/local/repo_targets'])
+ targets = requests.get(url, auth=self.auth, headers=self.headers).json()
+
+ for priv in targets['data']:
+ if priv['name'] == name:
+ return priv['id']
+ raise LookupError("No target found named '%s'" % (name))
+
+ def create_target(self, name, patterns):
+ """
+ Create a target with the given patterns
+ """
+ url = '/'.join([self.baseurl, 'service/local/repo_targets'])
+
+ target = {
+ 'data': {
+ 'contentClass': 'any',
+ 'patterns': patterns,
+ 'name': name,
+ }
+ }
+
+ json_data = json.dumps(target, encoding='latin-1')
+
+ r = requests.post(url, auth=self.auth, headers=self.headers, data=json_data)
+
+ return r.json()['data']['id']
+
+ def get_priv(self, name, priv):
+ """
+ Get the ID for the privilege with the given name
+ """
+ url = '/'.join([self.baseurl, 'service/local/privileges'])
+
+ search_name = '%s - (%s)' % (name, priv)
+ privileges = requests.get(url, auth=self.auth, headers=self.headers).json()
+
+ for priv in privileges['data']:
+ if priv['name'] == search_name:
+ return priv['id']
+
+ raise LookupError("No privilege found named '%s'" % name)
+
+ def create_priv(self, name, target_id, priv):
+ """
+ Create a given privilege
+
+ Privilege must be one of the following:
+
+ create
+ read
+ delete
+ update
+ """
+ url = '/'.join([self.baseurl,'service/local/privileges_target'])
+
+ privileges = {
+ 'data': {
+ 'name': name,
+ 'description': name,
+ 'method': [
+ priv,
+ ],
+ 'repositoryGroupId': '',
+ 'repositoryId': '',
+ 'repositoryTargetId': target_id,
+ 'type': 'target',
+ }
+ }
+
+ json_data = json.dumps(privileges, encoding='latin-1')
+ privileges = requests.post(url, auth=self.auth, headers=self.headers, data=json_data).json()
+
+ return privileges['data'][0]['id']
+
+ def get_role(self, name):
+ """
+ Get the id of a role with a given name
+ """
+ url = '/'.join([self.baseurl, 'service/local/roles'])
+ roles = requests.get(url, auth=self.auth, headers=self.headers).json()
+
+ for role in roles['data']:
+ if role['name'] == name:
+ return role['id']
+
+ raise LookupError("No role with name '%s'" % (name))
+
+ def create_role(self, name, privs):
+ """
+ Create a role with the given privileges
+ """
+ url = '/'.join([self.baseurl, 'service/local/roles'])
+
+ role = {
+ 'data': {
+ 'id': name,
+ 'name': name,
+ 'description': name,
+ 'privileges': privs,
+ 'roles': [
+ 'repository-any-read',
+ ],
+ 'sessionTimeout': 60,
+ }
+ }
+
+ json_data = json.dumps(role, encoding='latin-1')
+
+ r = requests.post(url, auth=self.auth, headers=self.headers, data=json_data)
+
+ return r.json()['data']['id']
+
+ def get_user(self, user_id):
+ """
+ Determine if a user with a given userId exists
+ """
+ url = '/'.join([self.baseurl, 'service/local/users'])
+ users = requests.get(url, auth=self.auth, headers=self.headers).json()
+
+ for user in users['data']:
+ if user['userId'] == user_id:
+ return
+
+ raise LookupError("No user with id '%s'" % (user_id))
+
+ def create_user(self, name, domain, role_id, password, extra_roles=[]):
+ """
+ Create a Deployment user with a specific role_id and potentially extra roles
+
+ User is created with the nx-deployment role attached
+ """
+ url = '/'.join([self.baseurl, 'service/local/users'])
+
+ user = {
+ 'data': {
+ 'userId': name,
+ 'email': '%s-deploy@%s' % (name, domain),
+ 'firstName': name,
+ 'lastName': 'Deployment',
+ 'roles': [
+ role_id,
+ 'nx-deployment',
+ ],
+ 'password': password,
+ 'status': 'active',
+ }
+ }
+
+ for role in extra_roles:
+ user['data']['roles'].append(self.get_role(role))
+
+ json_data = json.dumps(user, encoding='latin-1')
+
+ r = requests.post(url, auth=self.auth, headers=self.headers, data=json_data)
+
+ def get_repo_group(self, name):
+ """
+ Get the repository ID for a repo group that has a specific name
+ """
+ url = '/'.join([self.baseurl, 'service/local/repo_groups'])
+
+ repos = requests.get(url, auth=self.auth, headers=self.headers).json()
+
+ for repo in repos['data']:
+ if repo['name'] == name:
+ return repo['id']
+
+ raise LookupError("No repository group named '%s'" % (name))
+
+ def get_repo_group_details(self, repoId):
+ """
+ Get the current configuration of a given repo group with a specific ID
+ """
+ url = '/'.join([self.baseurl, 'service/local/repo_groups', repoId])
+
+ return requests.get(url, auth=self.auth, headers=self.headers).json()['data']
+
+ def update_repo_group_details(self, repoId, data):
+ """
+ Update the given repo group with new configuration
+ """
+ url = '/'.join([self.baseurl, 'service/local/repo_groups', repoId])
+
+ repo = {
+ 'data': data
+ }
+
+ json_data = json.dumps(repo, encoding='latin-1')
+
+ r = requests.put(url, auth=self.auth, headers=self.headers, data=json_data)
--- /dev/null
+#!/usr/bin/env python
+# -*- code: utf-8 -*-
+# vim: sw=4 ts=4 sts=4 et :
+
+#
+# NOTE: This is a hack for forcing the 'Staging Repositories' repo group
+# to be in the correct reverse sorted order. There is a problem with
+# Nexus where it is not doing this like it should be
+#
+
+import argparse
+import sys
+import nexus
+import yaml
+from operator import itemgetter
+
+parser = argparse.ArgumentParser()
+parser.add_argument('-s', '--settings', type=str,
+ help='security and settings yaml file')
+args = parser.parse_args()
+
+if not args.settings:
+ sys.exit('Settings file is required')
+
+
+# open our settings file
+f = open(args.settings, 'r')
+settings = yaml.load(f)
+f.close()
+
+for setting in ['nexus', 'user', 'password']:
+ if not setting in settings:
+ sys.exit('{} needs to be defined'.format(setting))
+
+n = nexus.Nexus(settings['nexus'], settings['user'], settings['password'])
+
+try:
+ repo_id = n.get_repo_group('Staging Repositories')
+except LookupError as e:
+ sys.exit("Staging repository 'Staging Repositories' cannot be found")
+
+repo_details = n.get_repo_group_details(repo_id)
+
+sorted_repos = sorted(repo_details['repositories'], key=lambda k: k['id'], reverse=True)
+
+for repos in sorted_repos:
+ del repos['resourceURI']
+ del repos['name']
+
+repo_update = repo_details
+repo_update['repositories'] = sorted_repos
+del repo_update['contentResourceURI']
+del repo_update['repoType']
+
+n.update_repo_group_details(repo_id, repo_update)
--- /dev/null
+#!/usr/bin/env python
+# -*- code: utf-8 -*-
+# vim: sw=4 ts=4 sts=4 et :
+
+import argparse
+import sys
+import nexus
+import yaml
+
+parser = argparse.ArgumentParser()
+parser.add_argument('-s', '--settings', type=str,
+ help='security and settings yaml file')
+parser.add_argument('-c', '--config', type=str,
+ help='configuration to be created')
+args = parser.parse_args()
+
+if not args.settings:
+ sys.exit('Settings file is required')
+
+if not args.config:
+ sys.exit('Config file is required')
+
+
+# open our settings file
+f = open(args.settings, 'r')
+settings = yaml.load(f)
+f.close()
+
+for setting in ['nexus', 'user', 'password', 'email_domain']:
+ if not setting in settings:
+ sys.exit('{} needs to be defined'.format(setting))
+
+# open our config file
+f = open(args.config, 'r')
+config = yaml.load(f)
+f.close()
+
+n = nexus.Nexus(settings['nexus'], settings['user'], settings['password'])
+
+def create_nexus_perms(name, targets, email, password, extra_privs=[]):
+ # Create target
+ try:
+ target_id = n.get_target(name)
+ except LookupError as e:
+ target_id = n.create_target(name, targets)
+
+ # Create privileges
+ privs_set = [
+ 'create',
+ 'delete',
+ 'read',
+ 'update',
+ ]
+
+ privs = {}
+ for priv in privs_set:
+ try:
+ privs[priv] = n.get_priv(name, priv)
+ except LookupError as e:
+ privs[priv] = n.create_priv(name, target_id, priv)
+
+ # Create Role
+ try:
+ role_id = n.get_role(name)
+ except LookupError as e:
+ role_id = n.create_role(name, privs)
+
+ # Create user
+ try:
+ n.get_user(name)
+ except LookupError as e:
+ n.create_user(name, email, role_id, password, extra_privs)
+
+def do_build_repo(repo, repoId, config, base_groupId):
+ print('Building for %s.%s' % (base_groupId, repo))
+ groupId = '%s.%s' % (base_groupId, repo)
+ target = '^/%s/.*' % groupId.replace('.', '[/\.]')
+ if 'extra_privs' in config:
+ extra_privs = config['extra_privs']
+ else:
+ extra_privs = []
+ create_nexus_perms(repoId, [target], settings['email_domain'],
+ config['password'], extra_privs)
+ if 'repositories' in config:
+ for sub_repo in config['repositories']:
+ sub_repo_id = '%s-%s' % (repoId, sub_repo)
+ do_build_repo(sub_repo, sub_repo_id, config['repositories'][sub_repo],
+ groupId)
+
+for repo in config['repositories']:
+ do_build_repo(repo, repo, config['repositories'][repo], config['base_groupId'])