Main external function is docker_pull_tag_push
"""
- def __init__(self, nexus_proj):
+ def __init__(self, nexus_proj, docker_client=None):
"""Initialize this class."""
self.org_name = nexus_proj[0]
self.nexus_repo_name = nexus_proj[1]
self.docker_tags = DockerTagClass(self.org_name, self.docker_repo_name, repo_from_file)
self.tags_2_copy = TagClass(self.org_name, self.nexus_repo_name, repo_from_file)
self._populate_tags_to_copy()
- self.docker_client = docker.from_env()
+ self.docker_client = docker_client if docker_client is not None else docker.from_env()
def __lt__(self, other):
"""Implement sort order base on Nexus3 repo name."""
return True
-def fetch_all_tags(progbar=False):
+def fetch_all_tags(progbar=False, docker_client=None):
"""Fetch all tags function.
This function will use multi-threading to fetch all tags for all projects in
proj : Tuple with 'org' and 'repo'
('onap', 'aaf/aaf_service')
"""
- new_proj = ProjectClass(proj)
+ new_proj = ProjectClass(proj, docker_client)
projects.append(new_proj)
if progbar:
pbar.update(1)
progbar=False,
repofile=False,
version_regexp="",
+ docker_client=None,
):
"""Main function."""
# Verify find_pattern and specified_repo are not both used.
log.info("Could not get any catalog from Nexus3 with org = {}".format(org_name))
return
- fetch_all_tags(progbar)
+ fetch_all_tags(progbar, docker_client)
if verbose:
print_nexus_docker_proj_names()
print_nexus_valid_tags()
def test_ProjectClass_2_missing(self, responses, datafiles, mocker):
"""Test ProjectClass"""
+ mocker.patch("docker.from_env")
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_pull", side_effect=self.mocked_docker_pull)
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_tag", side_effect=self.mocked_docker_tag)
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_push", side_effect=self.mocked_docker_push)
def test_ProjectClass_1_missing(self, responses, datafiles, mocker):
"""Test ProjectClass"""
+ mocker.patch("docker.from_env")
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_pull", side_effect=self.mocked_docker_pull)
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_tag", side_effect=self.mocked_docker_tag)
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_push", side_effect=self.mocked_docker_push)
def test_ProjectClass_socket_timeout(self, responses, datafiles, mocker):
"""Test ProjectClass"""
+ mocker.patch("docker.from_env")
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_pull", side_effect=self.mocked_docker_pull)
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_tag", side_effect=self.mocked_docker_tag)
mocker.patch("lftools.nexus.release_docker_hub.ProjectClass._docker_push", side_effect=self.mocked_docker_push)
def test_fetch_all_tags(self, responses, datafiles, mocker):
self.initiate_test_fetch(responses, datafiles, mocker)
+ mock_docker_client = mocker.MagicMock()
rdh.initialize("onap")
rdh.get_nexus3_catalog("onap")
- rdh.fetch_all_tags()
+ rdh.fetch_all_tags(docker_client=mock_docker_client)
assert len(rdh.NexusCatalog) == 3
assert len(rdh.projects) == 3
assert len(rdh.projects[0].tags_2_copy.valid) == 0
def test_copy(self, responses, datafiles, mocker):
self.initiate_test_fetch(responses, datafiles, mocker)
+ mock_docker_client = mocker.MagicMock()
rdh.initialize("onap")
rdh.get_nexus3_catalog("onap")
- rdh.fetch_all_tags()
+ rdh.fetch_all_tags(docker_client=mock_docker_client)
rdh.copy_from_nexus_to_docker()
assert self.counter.pull == 3
assert self.counter.tag == 3
def test_start_no_copy(self, responses, datafiles, mocker):
self.initiate_test_fetch(responses, datafiles, mocker)
- rdh.start_point("onap", "", False, False)
+ mock_docker_client = mocker.MagicMock()
+ rdh.start_point("onap", "", False, False, docker_client=mock_docker_client)
assert self.counter.pull == 0
assert self.counter.tag == 0
assert self.counter.push == 0
def test_start_copy(self, responses, datafiles, mocker):
self.initiate_test_fetch(responses, datafiles, mocker)
- rdh.start_point("onap", "", False, False, False, True)
+ mock_docker_client = mocker.MagicMock()
+ rdh.start_point("onap", "", False, False, False, True, docker_client=mock_docker_client)
assert len(rdh.NexusCatalog) == 3
assert len(rdh.projects) == 3
assert len(rdh.projects[0].tags_2_copy.valid) == 0
def test_start_copy_repo(self, responses, datafiles, mocker):
self.initiate_test_fetch(responses, datafiles, mocker, "sanity")
- rdh.start_point("onap", "validator", False, False, False, True)
+ mock_docker_client = mocker.MagicMock()
+ rdh.start_point("onap", "validator", False, False, False, True, docker_client=mock_docker_client)
assert len(rdh.NexusCatalog) == 1
assert len(rdh.projects) == 1
assert len(rdh.projects[0].tags_2_copy.valid) == 1
assert len(rdh.projects) == 0
-def test_calculate_docker_project_name():
+def test_calculate_docker_project_name(mocker):
+ mocker.patch("docker.from_env")
project = ["onap", "this/is/a-test_project", ""]
rdh.initialize("onap")
test_proj = rdh.ProjectClass(project)