.. _nexus-reorder-staged-repos:
-reorder_staged_repos
+reorder-staged-repos
--------------------
.. program-output:: lftools nexus reorder-staged-repos --help
+
+.. _nexus-docker:
+
+docker
+------
+
+.. program-output:: lftools nexus docker --help
+
+While a settings.yaml file is still supported for ``nexus docker`` commands,
+the preferred way to login is to use an lftools.ini file, and provide the
+server address using the ``--server`` option. The config file should be at
+$HOME/.config/lftools/lftools.ini.
+
+.. _nexus-docker-delete:
+
+delete
+^^^^^^
+
+.. program-output:: lftools nexus docker delete --help
+
+.. _nexus-docker-list:
+
+list
+^^^^
+
+.. program-output:: lftools nexus docker list --help
# http://www.eclipse.org/legal/epl-v10.html
##############################################################################
"""CLI entry point for nexus commands."""
+from os import environ
+
import click
from lftools.nexus import cmd as nexuscmd
+NEXUS_URL_ENV = 'NEXUS_URL'
+
@click.group()
@click.pass_context
def repo(ctx, config, settings):
"""Create a Nexus repository as defined by a repo-config.yaml file."""
nexuscmd.create_repos(config, settings)
+
+
+@nexus.group()
+@click.pass_context
+def docker(ctx):
+ """Docker repos in Nexus."""
+ pass
+
+
+def docker_params(command):
+ """Common options and arguments for all docker subcommands."""
+ command = click.option(
+ '--settings', type=str,
+ help=('Yaml file containing "nexus" (url), "user", and "password" '
+ 'definitions.'))(command)
+ command = click.option(
+ '-s', '--server', type=str,
+ help=('Nexus server URL. Can also be set as {} in the environment. '
+ 'This will override any URL set in settings.yaml.').format(
+ NEXUS_URL_ENV))(command)
+ command = click.argument('REPO', type=str)(command)
+ command = click.argument('PATTERN', type=str, default="*")(command)
+ return command
+
+
+@docker.command(name="list")
+@docker_params
+@click.option(
+ '--csv', type=click.Path(dir_okay=False, writable=True),
+ help='Write a csv file of the search results to PATH.')
+@click.pass_context
+def list_images(ctx, settings, server, repo, pattern, csv):
+ """List images matching the PATTERN.
+
+ Use '*' for wildcard, or begin with '!' to search for images NOT matching
+ the string.
+ """
+ if not server and 'NEXUS_URL_ENV' in environ:
+ server = environ['NEXUS_URL_ENV']
+ images = nexuscmd.search(settings, server, repo, pattern)
+ nexuscmd.output_images(images, csv)
+
+
+@docker.command(name="delete")
+@docker_params
+@click.option(
+ '-y', '--yes', is_flag=True, help="Answer yes to all prompts")
+@click.pass_context
+def delete_images(ctx, settings, server, repo, pattern, yes):
+ """Delete all images matching the PATTERN.
+
+ By default, prints to console only. Use '*' for wildcard, or begin with '!'
+ to delete images NOT matching the string.
+ """
+ images = nexuscmd.search(settings, server, repo, pattern)
+ if yes or click.confirm("Would you like to delete all {} images?".format(
+ str(len(images)))):
+ nexuscmd.delete_images(settings, server, images)
__copyright__ = 'Copyright 2017 Andrew Grimberg'
import json
+import logging
+import os
+import sys
import requests
from requests.auth import HTTPBasicAuth
+log = logging.getLogger(__name__)
+
class Nexus:
"""Nexus class to handle communicating with Nexus over a rest api."""
def __init__(self, baseurl=None, username=None, password=None):
"""Initialize Nexus instance."""
self.baseurl = baseurl
+ self.set_full_baseurl()
+ if self.baseurl.find("local") < 0:
+ self.version = 2
+ else:
+ self.version = 3
if username and password:
self.add_credentials(username, password)
'Content-Type': 'application/json',
}
+ def set_full_baseurl(self):
+ """Find the correct REST API endpoint for this version of Nexus."""
+ endpoints = [
+ "service/local/repo_targets",
+ "service/siesta/rest/beta/read-only",
+ "service/rest/beta/read-only",
+ "service/rest/v1/read-only"
+ ]
+ for endpoint in endpoints:
+ url = os.path.join(self.baseurl, endpoint)
+ response = requests.get(url)
+ if response.status_code != 404:
+ self.baseurl = os.path.dirname(url)
+ return
+ raise LookupError("Could not determine Nexus version")
+
def add_credentials(self, username, password):
"""Create an authentication object to be used."""
self.auth = HTTPBasicAuth(username, password)
def get_target(self, name):
"""Get the ID of a given target name."""
- url = '/'.join([self.baseurl, 'service/local/repo_targets'])
+ url = os.path.join(self.baseurl, 'repo_targets')
targets = requests.get(url, auth=self.auth, headers=self.headers).json()
for priv in targets['data']:
def create_target(self, name, patterns):
"""Create a target with the given patterns."""
- url = '/'.join([self.baseurl, 'service/local/repo_targets'])
+ url = os.path.join(self.baseurl, 'repo_targets')
target = {
'data': {
def get_priv(self, name, priv):
"""Get the ID for the privilege with the given name."""
- url = '/'.join([self.baseurl, 'service/local/privileges'])
+ url = os.path.join(self.baseurl, 'privileges')
search_name = "{} - ({})".format(name, priv)
privileges = requests.get(url, auth=self.auth, headers=self.headers).json()
delete
update
"""
- url = '/'.join([self.baseurl, 'service/local/privileges_target'])
+ url = os.path.join(self.baseurl, 'privileges_target')
privileges = {
'data': {
'name': name,
'description': name,
'method': [
- priv,
- ],
+ priv,
+ ],
'repositoryGroupId': '',
'repositoryId': '',
'repositoryTargetId': target_id,
def get_role(self, name):
"""Get the id of a role with a given name."""
- url = '/'.join([self.baseurl, 'service/local/roles'])
+ url = os.path.join(self.baseurl, 'roles')
roles = requests.get(url, auth=self.auth, headers=self.headers).json()
for role in roles['data']:
def create_role(self, name, privs):
"""Create a role with the given privileges."""
- url = '/'.join([self.baseurl, 'service/local/roles'])
+ url = os.path.join(self.baseurl, 'roles')
role = {
'data': {
'description': name,
'privileges': privs,
'roles': [
- 'repository-any-read',
- ],
+ 'repository-any-read',
+ ],
'sessionTimeout': 60,
}
}
def get_user(self, user_id):
"""Determine if a user with a given userId exists."""
- url = '/'.join([self.baseurl, 'service/local/users'])
+ url = os.path.join(self.baseurl, 'users')
users = requests.get(url, auth=self.auth, headers=self.headers).json()
for user in users['data']:
User is created with the nx-deployment role attached
"""
- url = '/'.join([self.baseurl, 'service/local/users'])
+ url = os.path.join(self.baseurl, 'users')
user = {
'data': {
'firstName': name,
'lastName': 'Deployment',
'roles': [
- role_id,
- 'nx-deployment',
- ],
+ role_id,
+ 'nx-deployment',
+ ],
'password': password,
'status': 'active',
}
def get_repo_group(self, name):
"""Get the repository ID for a repo group that has a specific name."""
- url = '/'.join([self.baseurl, 'service/local/repo_groups'])
+ url = os.path.join(self.baseurl, 'repo_groups')
repos = requests.get(url, auth=self.auth, headers=self.headers).json()
def get_repo_group_details(self, repoId):
"""Get the current configuration of a given repo group with a specific ID."""
- url = '/'.join([self.baseurl, 'service/local/repo_groups', repoId])
+ url = os.path.join(self.baseurl, 'repo_groups', repoId)
return requests.get(url, auth=self.auth, headers=self.headers).json()['data']
def update_repo_group_details(self, repoId, data):
"""Update the given repo group with new configuration."""
- url = '/'.join([self.baseurl, 'service/local/repo_groups', repoId])
+ url = os.path.join(self.baseurl, 'repo_groups', repoId)
repo = {
'data': data
json_data = json.dumps(repo).encode(encoding='latin-1')
requests.put(url, auth=self.auth, headers=self.headers, data=json_data)
+
+ def get_all_images(self, repo):
+ """Get a list of all images in the given repository."""
+ url = "%s/search?repository=%s" % (self.baseurl, repo)
+ url_attr = requests.get(url)
+ if url_attr:
+ result = url_attr.json()
+ items = result["items"]
+ cont_token = result["continuationToken"]
+ else:
+ log.error("{} returned {}".format(url, str(url_attr)))
+ sys.exit(1)
+
+ # Check if there are multiple pages of data
+ while cont_token:
+ continue_url = "%s&continuationToken=%s" % (url, cont_token)
+ url_attr = requests.get(continue_url)
+ result = url_attr.json()
+ items += result["items"]
+ cont_token = result["continuationToken"]
+
+ return items
+
+ def search_images(self, repo, pattern):
+ """Find all images in the given repository matching the pattern."""
+ url = "{}/search?q={}&repository={}".format(self.baseurl, pattern, repo)
+ url_attr = requests.get(url)
+ if url_attr:
+ result = url_attr.json()
+ items = result["items"]
+ cont_token = result["continuationToken"]
+ else:
+ log.error("{} returned {}".format(url, str(url_attr)))
+ sys.exit(1)
+
+ # Check if there are multiple pages of data
+ while cont_token:
+ continue_url = "%s&continuationToken=%s" % (url, cont_token)
+ url_attr = requests.get(continue_url)
+ result = url_attr.json()
+ items += result["items"]
+ cont_token = result["continuationToken"]
+
+ return items
+
+ def delete_image(self, image):
+ """Delete an image from the repo, using the id field."""
+ url = os.path.join(self.baseurl, "components", image["id"])
+ log.info("Deleting {}:{}".format(image["name"], image["version"]))
+ url_attr = requests.delete(url, auth=self.auth)
+ if url_attr.status_code != 204:
+ log.error("{} returned {}".format(url, str(url_attr)))
+ sys.exit(1)
##############################################################################
"""Contains functions for various Nexus tasks."""
+import csv
import logging
import sys
import yaml
+from lftools import config
from lftools.nexus import Nexus
from lftools.nexus import util
log = logging.getLogger(__name__)
+def get_credentials(settings_file, url=None):
+ """Return credentials for Nexus instantiation."""
+ if settings_file:
+ try:
+ with open(settings_file, 'r') as f:
+ settings = yaml.safe_load(f)
+ except IOError:
+ log.error('Error reading settings file "{}"'.format(settings_file))
+ sys.exit(1)
+
+ if url and set(['user', 'password']).issubset(settings):
+ settings['nexus'] = url
+ return settings
+ elif set(['nexus', 'user', 'password']).issubset(settings):
+ return settings
+ elif url:
+ user = config.get_setting("global", "username")
+ password = config.get_setting("global", "password")
+ return {"nexus": url, "user": user, "password": password}
+ log.error('Please define a settings.yaml file, or include a url if using '
+ + 'lftools.ini')
+ sys.exit(1)
+
+
+def get_url(settings_file):
+ """Return URL from settings file, if it exists."""
+ if settings_file:
+ try:
+ with open(settings_file, 'r') as f:
+ settings = yaml.safe_load(f)
+ except IOError:
+ log.error('Error reading settings file "{}"'.format(settings_file))
+ sys.exit(1)
+
+ if "nexus" in settings:
+ return settings["nexus"]
+
+ return ""
+
+
def reorder_staged_repos(settings_file):
"""Reorder staging repositories in Nexus.
for setting in ['nexus', 'user', 'password']:
if not setting in settings:
- sys.exit('{} needs to be defined'.format(setting))
+ log.error('{} needs to be defined'.format(setting))
+ sys.exit(1)
_nexus = Nexus(settings['nexus'], settings['user'], settings['password'])
try:
repo_id = _nexus.get_repo_group('Staging Repositories')
except LookupError as e:
- sys.exit("Staging repository 'Staging Repositories' cannot be found")
+ log.error("Staging repository 'Staging Repositories' cannot be found")
+ sys.exit(1)
repo_details = _nexus.get_repo_group_details(repo_id)
for setting in ['nexus', 'user', 'password', 'email_domain']:
if not setting in settings:
- sys.exit('{} needs to be defined'.format(setting))
+ log.error('{} needs to be defined'.format(setting))
+ sys.exit(1)
_nexus = Nexus(settings['nexus'], settings['user'], settings['password'])
log.warning('Nexus repo creation started. Aborting now could leave tasks undone!')
for repo in config['repositories']:
build_repo(repo, repo, config['repositories'][repo], config['base_groupId'])
+
+
+def search(settings_file, url, repo, pattern):
+ """Return of list of images in the repo matching the pattern.
+
+ :arg str settings_file: Path to yaml file with Nexus settings.
+ :arg str url: Nexus URL. Overrides settings.yaml.
+ :arg str repo: The Nexus repository to audit.
+ :arg str pattern: The pattern to search for in repo.
+ """
+ if not url and settings_file:
+ url = get_url(settings_file)
+ if not url:
+ log.error("ERROR: No Nexus URL provided. Please provide Nexus URL in "
+ + "settings file or with the --server parameter.")
+ sys.exit(1)
+
+ _nexus = Nexus(url)
+
+ # Check for NoneType, remove CLI escape characters from pattern
+ if not pattern:
+ pattern = ""
+ pattern = pattern.replace("\\", "")
+
+ all_images = _nexus.search_images(repo, pattern)
+
+ # Ensure all of our images has a value for each of the keys we will use
+ included_keys = ["name", "version", "id"]
+ images = []
+ for image in all_images:
+ if set(included_keys).issubset(image):
+ # Keep only the keys we're using
+ restricted_image = {}
+ for key in included_keys:
+ restricted_image[key] = image[key]
+ images.append(restricted_image)
+ return images
+
+
+def output_images(images, csv_path=None):
+ """Output a list of images to stdout, or a provided file path.
+
+ :arg list images: Images to output.
+ :arg str csv_path: Path to write out csv file of matching images.
+ """
+ count = len(images)
+ if not count:
+ log.warning("{}.{} called with empty images list".format(
+ __name__, sys._getframe().f_code.co_name))
+ return
+ included_keys = images[0].keys()
+
+ if csv_path:
+ with open(csv_path, 'wb') as out_file:
+ dw = csv.DictWriter(out_file, fieldnames=included_keys,
+ quoting=csv.QUOTE_ALL)
+ dw.writeheader()
+ for image in images:
+ dw.writerow({k: v for k, v in image.items() if
+ k in included_keys})
+
+ for image in images:
+ log.info("Name: {}\nVersion: {}\nID: {}\n\n".format(
+ image["name"], image["version"], image["id"]))
+ log.info("Found {} images matching the query".format(count))
+
+
+def delete_images(settings_file, url, images):
+ """Delete all images in a list.
+
+ :arg str settings_file: Path to yaml file with Nexus settings.
+ :arg list images: List of images to delete.
+ """
+ credentials = get_credentials(settings_file, url)
+
+ _nexus = Nexus(credentials['nexus'], credentials['user'],
+ credentials['password'])
+
+ for image in images:
+ _nexus.delete_image(image)
--- /dev/null
+---
+features:
+ - |
+ Docker list and delete commands for Nexus docker repos.
+
+ Usage: lftools nexus docker [OPTIONS] COMMAND [ARGS]...
+
+ .. code-block:: none
+
+ Commands:
+ delete Delete all images matching the PATTERN.
+ list List images matching the PATTERN.