Chore: Initial code drop of core Python modules 34/71234/9
authorMatthew Watkins <mwatkins@linuxfoundation.org>
Tue, 14 Feb 2023 22:19:24 +0000 (22:19 +0000)
committerMatthew Watkins <mwatkins@linuxfoundation.org>
Fri, 21 Apr 2023 13:18:24 +0000 (14:18 +0100)
Change-Id: Iecd5e9dc1cffe38fa5c7a8cbe7fbfda521bbc904
Signed-off-by: Matthew Watkins <mwatkins@linuxfoundation.org>
12 files changed:
.gitignore
.pre-commit-config.yaml
.readthedocs.yml
CHANGELOG.rst
README.rst
setup.cfg
src/python_one_password/__init__.py
src/python_one_password/caching.py [new file with mode: 0755]
src/python_one_password/cli.py [new file with mode: 0755]
src/python_one_password/credentials.py [new file with mode: 0755]
src/python_one_password/functions.py [new file with mode: 0755]
src/python_one_password/tags.py [new file with mode: 0755]

index e9e1e9b..eae96cf 100644 (file)
@@ -1,4 +1,5 @@
 # Temporary and binary files
+install.sh
 *~
 *.py[cod]
 *.so
@@ -52,3 +53,6 @@ MANIFEST
 .venv*/
 .conda*/
 .python-version
+
+# Data/output files
+*.metadata.json
index 2c368e7..27369dc 100644 (file)
@@ -16,21 +16,21 @@ repos:
       - id: end-of-file-fixer
       - id: requirements-txt-fixer
       - id: mixed-line-ending
-        args: ['--fix=lf']  # replace 'auto' with 'lf' to enforce Linux/Mac line endings or 'crlf' for Windows
+        args: ['--fix=lf']
 
   - repo: https://github.com/jorisroovers/gitlint
     rev: v0.17.0
     hooks:
       - id: gitlint
 
-## If you want to automatically "modernize" your Python code:
+# If you want to automatically "modernize" your Python code:
 # - repo: https://github.com/asottile/pyupgrade
 #   rev: v3.3.1
 #   hooks:
 #   - id: pyupgrade
 #     args: ['--py37-plus']
 
-## If you want to avoid flake8 errors due to unused vars or imports:
+# If you want to avoid flake8 errors due to unused vars or imports:
 # - repo: https://github.com/PyCQA/autoflake
 #   rev: v2.0.0
 #   hooks:
@@ -52,7 +52,7 @@ repos:
       - id: black
         language_version: python3
 
-## If like to embrace black styles even in the docs:
+# If like to embrace black styles even in the docs:
 # - repo: https://github.com/asottile/blacken-docs
 #   rev: v1.13.0
 #   hooks:
@@ -67,8 +67,19 @@ repos:
       ## You can add flake8 plugins via `additional_dependencies`:
       #  additional_dependencies: [flake8-bugbear]
 
-## Check for misspells in documentation files:
+# Check for misspells in documentation files:
 # - repo: https://github.com/codespell-project/codespell
 #   rev: v2.2.2
 #   hooks:
 #   - id: codespell
+
+  - repo: https://github.com/pre-commit/mirrors-mypy
+    rev: v1.0.0
+    hooks:
+      - id: mypy
+
+  - repo: https://github.com/adrienverge/yamllint.git
+    rev: v1.29.0
+    hooks:
+      - id: yamllint
+        types: [yaml]
index a2bcab3..2b29b56 100644 (file)
@@ -1,3 +1,4 @@
+---
 # Read the Docs configuration file
 # See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
 
@@ -9,7 +10,7 @@ sphinx:
   configuration: docs/conf.py
 
 # Build documentation with MkDocs
-#mkdocs:
+# mkdocs:
 #  configuration: mkdocs.yml
 
 # Optionally build your docs in additional formats such as PDF
index 226e6f5..9d93b7b 100644 (file)
@@ -5,6 +5,5 @@ Changelog
 Version 0.1
 ===========
 
-- Feature A added
-- FIX: nasty bug #1729 fixed
-- add your changes here!
+- Initial implementation of vault data import and credential tag operations
+- Correctly set package dependencies
index e3f4343..fe78c50 100644 (file)
@@ -37,7 +37,177 @@ python-one-password
     Tag editor for 1Password
 
 
-A longer description of your project goes here...
+Gathers vault and credential JSON metadata from 1Password databases; enables bulk manipulation of tags
+
+
+Getting Started
+===============
+
+Before running these tools, you will need to install the 1Password CLI for your operating system.
+
+1Password CLI <https://developer.1password.com/docs/cli/get-started/>
+
+
+Installation
+============
+
+The python-one-password tool and its dependencies can be installed from PyPI
+using the standard Python PIP command:
+
+``
+% python3 -m pip install python_one_password
+``
+
+Interactive Help
+================
+
+The primary command and sub-commands have embedded help, which can be accessed
+as shown below:
+
+``
+% python-one-password --help
+% python-one-password credentials --help
+% python-one-password tags --help
+``
+
+
+Importing Credentials
+=====================
+
+The first step required to begin working with the 1Password database is to
+import credentials from one or more vault(s).
+
+The first command that creates an interaction with the 1Password CLI tools is
+likely to generate an authentication prompt.
+
+Note: supply your password and/or biometric data to unlock the 1Password database if/when prompted
+
+You can then import credentials from one or more vaults, as shown below for a vault called "Testing":
+
+``
+ % python-one-password credentials fetch -n -i Testing
+Importing data from 1Password database...
+Total number of vaults: 20
+Vaults imported into cache: 1
+
+########## Vault Summary ##########
+
+ID                             Name
+cnx76s6avkg3xikw6u5bf7jdki     Testing
+
+Importing credential metadata from 1Password database...
+Credential data gathered for: 1 vault(s)
+Credential metadata records loaded: 5
+Loaded cached JSON metadata: [5] records
+
+Review current credential state? (y/n): y
+
+### Credentials: Current State ###
+
+yczzflaacyziwew2ahy24kqdxi     Test4
+gbikz2upboavuksupx65xb5fie     Test5
+fkn3cp42ouua47rqtnergchm6q     Test3
+rfoxd64sumvzbk2m7nkruyvr5e     Test1
+xiu64ukcwxtxfco7j2wjcf36eq     Test2
+
+``
+
+Once a set of credentials have been loaded, you can review them with:
+
+``
+% python-one-password credentials show
+Loaded cached JSON metadata: [5] records
+
+### Credentials: Current State ###
+
+gbikz2upboavuksupx65xb5fie     []      Test5
+fkn3cp42ouua47rqtnergchm6q     ['c3po', 'luke', 'r2d2', 'chewbacca']   Test3
+yczzflaacyziwew2ahy24kqdxi     []      Test4
+rfoxd64sumvzbk2m7nkruyvr5e     ['c3po', 'luke', 'chewbacca']   Test1
+xiu64ukcwxtxfco7j2wjcf36eq     ['c3po', 'luke', 'r2d2', 'chewbacca']   Test2
+``
+
+You can subsequently refine your selection using match/reject search patterns:
+
+``
+ % python-one-password credentials refine --reject chewbacca
+Loaded cached JSON metadata: [5] records
+Matching query:        [3] chewbacca
+Subsequently rejected: 3/5
+
+Credentials now selected: 2
+
+Review current credential state? (y/n): y
+
+### Credentials: Current State ###
+
+yczzflaacyziwew2ahy24kqdxi     []      Test4
+gbikz2upboavuksupx65xb5fie     []      Test5
+
+Update working credential set to selection? (y/n): y
+``
+
+When you have obtained a suitable set of credentials to work with, you can move
+on to tag manipulation.
+
+
+Working with tags
+=================
+
+The help for tag manipulation can be called up as follows:
+
+``
+% python-one-password tags --help
+``
+
+The basic tag operations are:
+
+* add         Adds tags (to the selected credentials)                                                                                                                                 │
+* allocate    Adds tags from a list round-robin (to the selected credentials)                                                                                                         │
+* replace     Replaces a given tag with another (from the selected credentials)                                                                                                       │
+* strip       Strips tags (from the selected credentials)
+
+These cover a broad range of use cases for working with metadata tags.
+
+Most have an option to either append to the existing tags, or overwrite the
+existing tags and discard them.
+
+If appending would create duplicates, the list is deduplicated before
+application to prevent unintended replication during changes.
+
+It is worth discussing briefly the operation of the "allocate" option. This is
+useful where you have a list of team members (staff) who might be assigned a
+set of credential as part of a rotation task/project. You can specify a list of
+team members on the command-line and the list will be iterated over, allocating
+credentials in a round-robin fashion.
+
+``
+ % python-one-password tags allocate -o bob sarah steve
+Loaded cached JSON metadata: [5] records
+
+Review current credential state? (y/n): y
+
+### Credentials: Current State ###
+
+gbikz2upboavuksupx65xb5fie     []      Test5
+yczzflaacyziwew2ahy24kqdxi     []      Test4
+fkn3cp42ouua47rqtnergchm6q     ['c3po', 'luke', 'r2d2', 'chewbacca']   Test3
+rfoxd64sumvzbk2m7nkruyvr5e     ['c3po', 'luke', 'chewbacca']   Test1
+xiu64ukcwxtxfco7j2wjcf36eq     ['c3po', 'luke', 'r2d2', 'chewbacca']   Test2
+
+Tags to allocate: ['bob', 'sarah', 'steve']
+
+### Credentials: Future State ###
+
+gbikz2upboavuksupx65xb5fie     ['bob'] Test5
+yczzflaacyziwew2ahy24kqdxi     ['sarah']       Test4
+fkn3cp42ouua47rqtnergchm6q     ['steve']       Test3
+rfoxd64sumvzbk2m7nkruyvr5e     ['bob'] Test1
+xiu64ukcwxtxfco7j2wjcf36eq     ['sarah']       Test2
+
+Commit these updates to the 1Password database? (y/n): y
+[5] Credentials updated
+``
 
 
 .. _pyscaffold-notes:
@@ -45,16 +215,20 @@ A longer description of your project goes here...
 Making Changes & Contributing
 =============================
 
-This project uses `pre-commit`_, please make sure to install it before making any
-changes::
+This project uses `pre-commit`_, please make sure to install it before making
+any changes:
 
-    pip install pre-commit
-    cd python-one-password
-    pre-commit install
+``
+% pip install pre-commit
+% cd python-one-password
+% pre-commit install
+``
 
 It is a good idea to update the hooks to the latest version::
 
-    pre-commit autoupdate
+``
+% pre-commit autoupdate
+``
 
 Don't forget to tell your contributors to also install and use pre-commit.
 
index c7cf2ed..f0d00fb 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -4,8 +4,8 @@
 # https://setuptools.pypa.io/en/latest/references/keywords.html
 
 [metadata]
-name = python-one-password
-description = Tag editor for 1Password
+name = python_one_password
+description = Imports metadata from 1Password vaults and allows for bulk manipulation of tags
 author = Matthew Watkins
 author_email = mwatkins@linuxfoundation.org
 license = Apache-2.0
@@ -49,7 +49,7 @@ package_dir =
 # For more information, check out https://semver.org/.
 install_requires =
     importlib-metadata; python_version<"3.8"
-
+    typer[all]~=0.7.0
 
 [options.packages.find]
 where = src
@@ -68,15 +68,8 @@ testing =
     pytest-cov
 
 [options.entry_points]
-# Add here console scripts like:
-# console_scripts =
-#     script_name = python_one_password.module:function
-# For example:
-# console_scripts =
-#     fibonacci = python_one_password.skeleton:run
-# And any other entry points, for example:
-# pyscaffold.cli =
-#     awesome = pyscaffoldext.awesome.extension:AwesomeExtension
+console_scripts =
+     python-one-password = python_one_password.cli:run
 
 [tool:pytest]
 # Specify command line options as you would do when invoking pytest directly.
index c5c960d..5f13da6 100644 (file)
@@ -8,7 +8,7 @@ else:
 
 try:
     # Change here if project is renamed and does not equal the package name
-    dist_name = "python-one-password"
+    dist_name = "python_one_password"
     __version__ = version(dist_name)
 except PackageNotFoundError:  # pragma: no cover
     __version__ = "unknown"
diff --git a/src/python_one_password/caching.py b/src/python_one_password/caching.py
new file mode 100755 (executable)
index 0000000..170a1a2
--- /dev/null
@@ -0,0 +1,119 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: Apache-2.0
+##############################################################################
+# Copyright (c) 2023 The Linux Foundation and others.
+#
+# All rights reserved. This program and the accompanying materials are made
+# available under the terms of the Apache-2.0 license which accompanies this
+# distribution, and is available at
+# https://opensource.org/licenses/Apache-2.0
+##############################################################################
+
+"""Functions to load/save cached JSON metadata"""
+
+__author__ = "Matthew Watkins"
+
+# Standard imports, alphabetical order ###
+import json
+import logging
+import os
+import os.path
+import sys
+import time
+from typing import Any, List
+
+# Setup logging
+log = logging.getLogger(__name__)
+
+
+# Define the filesystem names of various JSON metadata cache files
+datastores = {
+    "vault_summary": "vault.summary.metadata.json",
+    "vaults_detail": "vault.detailed.metadata.json",
+    "vault_credentials": "vault.credentials.metadata.json",
+    "credentials": "credentials.metadata.json",
+}
+
+
+# File/caching operations
+
+
+def load_json_file(filename: str) -> str:
+    """Returns JSON data object from a given file"""
+    try:
+        with open(filename, encoding="utf-8") as open_file:
+            data = json.loads(open_file.read())
+            log.debug("JSON read from file: %s", filename)
+    except IOError as error:
+        log.error("Error reading JSON from file: %s", filename)
+        log.error(error)
+        sys.exit(1)
+    return data
+
+
+def save_json_file(json_data: List[str], filename: str):
+    """Saves a JSON data object to disk"""
+    try:
+        with open(filename, "w", encoding="utf-8") as write_file:
+            json.dump(json_data, write_file)
+            log.debug("JSON written to file: %s", filename)
+    except IOError as error:
+        log.error("Error writing JSON to file: %s", filename)
+        log.error(error)
+        sys.exit(1)
+
+
+def validate_cache(filename: str):
+    """Checks local metadata for freshness"""
+    if not check_cache_age(filename, 3600):
+        refresh_cache()
+
+
+def get_file_age(filepath: str) -> float:
+    """Returns the age of a file on disk in seconds"""
+    return time.time() - os.path.getmtime(filepath)
+
+
+def load_cache(data_store: str):
+    """Loads data from the filesystem and returns the JSON"""
+    log.debug("Request to load records from cache: %s", data_store)
+    if filename := datastores.get(data_store, None):
+        # Check cache validity
+        validate_cache(filename)
+        data = load_json_file(filename)
+        log.info("Loaded cached JSON metadata: [%s] records", len(data))
+        log.debug("\n%s", data)
+        return data
+    log.error("The requested cache does not exist: %s", data_store)
+    sys.exit(1)
+
+
+def save_cache(json_data: Any, data_store: str):
+    """Saves JSON data to the filesystem"""
+    log.debug("Saving [%s] records to cache: %s", len(json_data), data_store)
+    if filename := datastores.get(data_store, None):
+        save_json_file(json_data, filename)
+        return
+    log.error("The requested cache does not exist: %s", data_store)
+    sys.exit(1)
+
+
+def check_cache_age(filename: str, max_age: int) -> bool:
+    """Returns cache validity when given a maximum age (in seconds)"""
+    log.debug("Cache lifetime/expiry set to: %s seconds", max_age)
+    if os.path.isfile(filename):
+        age = get_file_age(filename)
+        if age < max_age:
+            return True
+        return False
+    log.error("The requested file could not be opened: %s", filename)
+    sys.exit(1)
+
+
+# TODO: check metadata modification times and version numbers; selectively reload data
+def refresh_cache():
+    """Currently throws an error; could eventually invoke cache validation"""
+    log.error("The local cache file(s) are invalid or have expired")
+    log.error("Please reload credentials from the required vault(s)")
+    sys.exit(1)
diff --git a/src/python_one_password/cli.py b/src/python_one_password/cli.py
new file mode 100755 (executable)
index 0000000..0e02288
--- /dev/null
@@ -0,0 +1,45 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: Apache-2.0
+##############################################################################
+# Copyright (c) 2023 The Linux Foundation and others.
+#
+# All rights reserved. This program and the accompanying materials are made
+# available under the terms of the Apache-2.0 license which accompanies this
+# distribution, and is available at
+# https://opensource.org/licenses/Apache-2.0
+##############################################################################
+
+"""Python wrapper for manipulating 1Password credential metadata/tags"""
+
+__author__ = "Matthew Watkins"
+
+# External modules
+import typer
+
+# Bundled modules
+from python_one_password.credentials import app as credentials
+from python_one_password.tags import app as tags
+
+# Define command structure with typer module
+app = typer.Typer()
+
+
+# Additional sub-commands
+
+
+app.add_typer(
+    credentials,
+    name="credentials",
+    help="Imports and filters credentials from 1Password",
+)
+
+app.add_typer(
+    tags,
+    name="tags",
+    help="Manipulates metadata tags of the current credentials",
+)
+
+
+def run():
+    app()
diff --git a/src/python_one_password/credentials.py b/src/python_one_password/credentials.py
new file mode 100755 (executable)
index 0000000..503a9d1
--- /dev/null
@@ -0,0 +1,153 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: Apache-2.0
+##############################################################################
+# Copyright (c) 2023 The Linux Foundation and others.
+#
+# All rights reserved. This program and the accompanying materials are made
+# available under the terms of the Apache-2.0 license which accompanies this
+# distribution, and is available at
+# https://opensource.org/licenses/Apache-2.0
+##############################################################################
+
+"""Python wrapper for manipulating 1Password credential metadata/tags"""
+
+__author__ = "Matthew Watkins"
+
+# Standard imports, alphabetical order
+import logging
+import logging.handlers
+from typing import List, Optional
+
+# External modules
+import typer
+
+# Bundled modules
+import python_one_password.caching as caching
+import python_one_password.functions as f
+
+# Setup logging
+log = logging.getLogger(__name__)
+
+
+# Define command structure with typer module
+app = typer.Typer()
+
+
+# Credential operations
+
+
+@app.command()
+def fetch(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    ),
+    include: Optional[List[str]] = typer.Option(
+        ["All"],
+        "--include",
+        "-i",
+        envvar="OP_VAULT_INC",
+        help="Includes the specified vault(s) from processing",
+    ),
+    exclude: Optional[List[str]] = typer.Option(
+        None,
+        "--exclude",
+        "-e",
+        envvar="OP_VAULT_EXC",
+        help="Excludes the specified vault(s) from processing",
+    ),
+    no_tags: bool = typer.Option(
+        False, "--no-tags", "-n", help="Hide metadata tags in credential summary/output"
+    ),
+):
+    """Import vaults and credentials from the 1Password database"""
+    f.startup_tasks(debug)
+    f.validate_import_data_opts(include, exclude)
+    vaults_dictionary = f.populate_vault_json(include, exclude)
+    f.show_vault_summary(vaults_dictionary)
+    f.import_credentials(vaults_dictionary)
+    credential_data = f.caching.load_cache("credentials")
+    f.credential_summary(credential_data, no_tags, True)
+
+
+@app.command()
+def vaults(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    )
+):
+    """Show credentials in current/filtered working set"""
+    f.startup_tasks(debug)
+    vault_summ_cmd = "op vaults list"
+    raw_data = f.run_cmd_mp(vault_summ_cmd)
+    log.info(raw_data)
+
+
+@app.command()
+def show(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    ),
+    no_tags: bool = typer.Option(
+        False, "--no-tags", "-n", help="Hide metadata tags in credential summary/output"
+    ),
+):
+    """Show credentials in current/filtered working set"""
+    f.startup_tasks(debug)
+    credential_data = caching.load_cache("credentials")
+    f.credential_summary(credential_data, no_tags, False)
+
+
+@app.command()
+def refine(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    ),
+    no_tags: bool = typer.Option(
+        False, "--no-tags", "-n", help="Hide metadata tags in credential summary/output"
+    ),
+    match: Optional[List[str]] = typer.Option(
+        None,
+        "--match",
+        "-m",
+        help="Match/select credentials containing the specified text",
+    ),
+    reject: Optional[List[str]] = typer.Option(
+        None,
+        "--reject",
+        "-r",
+        help="Reject/exclude credentials containing the specified text",
+    ),
+    ignore_case: bool = typer.Option(
+        False,
+        "--ignore-case",
+        "-i",
+        show_default=True,
+        help="Ignore case when matching strings in credentials",
+    ),
+):
+    """Refine credential selection using match/reject (string) operations"""
+    f.startup_tasks(debug)
+    f.validate_filter_items_opts(match, reject)
+    credential_data = f.filter_credentials(match, reject, ignore_case)
+    f.credential_summary(credential_data, no_tags, True)
+    if f.prompt("Update working credential set to selection?"):
+        caching.save_cache(credential_data, "credentials")
+    else:
+        log.info("Select/reject did not modify credential database")
diff --git a/src/python_one_password/functions.py b/src/python_one_password/functions.py
new file mode 100755 (executable)
index 0000000..f00d820
--- /dev/null
@@ -0,0 +1,654 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: Apache-2.0
+##############################################################################
+# Copyright (c) 2023 The Linux Foundation and others.
+#
+# All rights reserved. This program and the accompanying materials are made
+# available under the terms of the Apache-2.0 license which accompanies this
+# distribution, and is available at
+# https://opensource.org/licenses/Apache-2.0
+##############################################################################
+
+"""Shared functions for use when interacting with 1Password"""
+
+__author__ = "Matthew Watkins"
+
+import json
+import logging
+import logging.handlers
+import multiprocessing
+import os
+import os.path
+import platform
+import sys
+import time
+
+# Standard imports, alphabetical order
+from itertools import cycle
+from subprocess import PIPE, Popen
+from typing import Any, Dict, List, Optional, Set
+
+# Bundled modules
+import python_one_password.caching as caching
+
+# Setup logging
+log = logging.getLogger(__name__)
+
+# Define variables
+# Get the home and present working directories
+home_dir = os.path.expanduser("~")
+pwd = os.getcwd()
+
+# Used to source passwords from the shared password store
+pass_mapping_file = home_dir + "/.password-store/.shared-configs/cloud-mappings.txt"
+
+
+# General/shared functions
+
+
+def contains_duplicates(this_list: List[str]) -> bool:
+    """Determines if a list contains duplicate elements"""
+    return this_list != list(dict.fromkeys(this_list))
+
+
+def deduplicate_list(this_list: List[str]) -> List[str]:
+    """De-duplicates a given list, preserving original order"""
+    return list(dict.fromkeys(this_list))
+
+
+def list_to_csv(this_list: List[str]) -> str:
+    """Returns a comma separated string from a list object"""
+    delim = ","
+    return delim.join(list(map(str, this_list)))
+
+
+def cred_update_refresh(
+    update_commands: List[str], refresh_commands: List[str]
+) -> None:
+    """Modifies credential data and refreshes local metadata"""
+    if not prompt("Commit these updates to the 1Password database?"):
+        sys.exit(0)
+
+    # Use timers to profile performance
+    timer = start_timer()
+    # Code must be single-threaded when making edits
+    # TODO: implement a proper counter here to better progress
+    log.info("Updating [%s] records:", len(update_commands))
+    log_level = logging.getLogger().getEffectiveLevel()
+    for command in update_commands:
+        cmd_output = run_cmd_mp(command)
+        if log_level == 10:
+            log.debug(cmd_output)
+        else:
+            # Display progress on console when not debugging
+            print(".", end="")
+    stop_timer(timer)
+    if log_level != 10:
+        # Terminate progress line on console
+        print("")
+
+    # Refresh the local credential cache after updates
+
+    # Use timers to profile performance
+    timer = start_timer()
+    updated_credential_db = []
+    log.info("Multiprocessing call refreshing credential metadata")
+    with multiprocessing.Pool() as pool:
+        # Call the function for each item in parallel
+        for raw_data in pool.map(run_cmd_mp, refresh_commands):
+            log.debug(raw_data)
+            updated_credential = json.loads(raw_data)
+            updated_credential_db.append(updated_credential)
+    stop_timer(timer)
+
+    # Save vault detail dictionary to cache file
+    caching.save_cache(updated_credential_db, "credentials")
+    log.info("Credential metadata tags updated")
+
+
+def tag_search_replace(
+    credentials: List[Dict], search_string: str, replace_string: str
+) -> None:
+    """Searches credentials for a tag and replaces it with another"""
+    log.debug(
+        "\nSearching for: [%s] Replacing with: [%s]\n", search_string, replace_string
+    )
+
+    update_commands = []
+    refresh_commands = []
+    matches = 0
+
+    for credential in credentials:
+        cred_id: str = credential["id"]
+        name: str = credential["title"]
+        try:
+            tags: str = credential["tags"]
+        except KeyError:
+            # This credential has no tags
+            # current_tags = []
+            continue
+
+        if search_string in tags:
+            log.debug("Matched:        %s      %s      %s", cred_id, tags, name)
+            matches += 1
+
+            new_tags = []
+            for tag in tags:
+                new_tag = tag.replace(search_string, replace_string)
+                new_tags.append(new_tag)
+
+            # At this point duplicate tags are a concern; deduplicate them
+            new_tags = deduplicate_list(new_tags)
+
+            log.info("Updating:        %s      %s      %s", cred_id, new_tags, name)
+
+            tags_string = list_to_csv(new_tags)
+
+            update_commands.append("op item edit " + cred_id + " --tags " + tags_string)
+            refresh_commands.append(
+                "op item get " + cred_id + " --format=json --no-color"
+            )
+
+    log.info("Total matches: %s", matches)
+    if matches > 0:
+        cred_update_refresh(update_commands, refresh_commands)
+
+
+def tags_update(
+    credentials: List[Dict], tags: List[str], overwrite: bool, round_robin: bool
+) -> None:
+    """Performs tag operations on all currently selected credentials"""
+    update_commands = []
+    refresh_commands = []
+
+    # Used only for round-robin allocations
+    tags_cycle = cycle(tags)
+
+    def next_tag():
+        return next(tags_cycle)
+
+    log.info("\n### Credentials: Future State ###\n")
+    for credential in credentials:
+        cred_id = credential["id"]
+        name = credential["title"]
+        current_tags: List[str] = []
+        try:
+            current_tags = credential["tags"]
+        except KeyError:
+            # Empty list already defined; no need to handle the error
+            pass
+
+        # When allocating tags, extract the next one from the list
+        if round_robin:
+            tags = [next_tag()]
+
+        if not overwrite:
+            # When NOT overwriting, add new tags to existing tags
+            tags = tags + current_tags
+
+        # De-duplicate list of tags; important when NOT overwriting
+        # As we may be adding same tag may already exist...
+        tags = deduplicate_list(tags)
+
+        log.info("%s   %s      %s", cred_id, tags, name)
+
+        if not tags:
+            tags_string = '""'
+        else:
+            tags_string = list_to_csv(tags)
+
+        update_commands.append("op item edit " + cred_id + " --tags " + tags_string)
+        refresh_commands.append("op item get " + cred_id + " --format=json --no-color")
+
+    cred_update_refresh(update_commands, refresh_commands)
+
+
+def match_strings(ignore_case: bool, search_pattern: str, string: str) -> bool:
+    """Returns true/false on string matching; optionally case-insensitive"""
+    if ignore_case is True:
+        if search_pattern.lower() in string.lower():
+            return True
+    else:
+        if search_pattern in string:
+            return True
+    # Sub-string was not found in string
+    return False
+
+
+def credential_summary(
+    credential_list: List[Dict], no_tags: bool, prompt_flag: bool
+) -> None:
+    """Displays a list/summary of the current working set of credentials"""
+
+    # Prompt the user to review/display credentials
+    # Note: can optionally be bypassed by setting prompt_flag to False
+    if prompt_flag and not prompt("Review current credential state?"):
+        return
+
+    log.info("\n### Credentials: Current State ###\n")
+    # Note: column output is tab separated
+    for credential in credential_list:
+        cred_id = credential["id"]
+        name = credential["title"]
+        # Tag display might make for messy output and take up excessive screen space
+        # We therefore provide an option to suppress them in summary/output
+        if no_tags:
+            log.info("%s       %s", cred_id, name)
+            continue
+        # Not all credentials have tags, catch the exception
+        try:
+            tags = credential["tags"]
+            log.info("%s       %s      %s", cred_id, tags, name)
+        except KeyError:
+            # Note: credentials without tags are padded with empty square brackets
+            log.info("%s       []      %s", cred_id, name)
+
+
+def match_elements(
+    ignore_case: bool, search_patterns: List[str], list_of_dictionaries: List[Dict]
+) -> List[Dict]:
+    """Returns a subset of elements matching a query from a list of dictionaries"""
+    matched = []
+    # Search individual term from a list of terms
+    for search_pattern in search_patterns:
+        # Create a counter to store searches matched for this specific query
+        pattern_matches = 0
+        for element in list_of_dictionaries:
+            match = match_strings(ignore_case, search_pattern, str(element))
+            # Need to prevent duplicates
+            if match and element not in matched:
+                pattern_matches += 1
+                matched.append(element)
+        log.info("Matching query:        [%s] %s", pattern_matches, search_pattern)
+    return matched
+
+
+def subtract_lists(
+    primary_list: List[Dict], list_to_subtract: List[Dict]
+) -> List[Dict]:
+    """Subtracts one list of dictionaries from another based on id values"""
+    primary_ids = []
+    for dictionary in primary_list:
+        primary_ids.append(dictionary["id"])
+    log.debug("Identities: %s", primary_ids)
+    subtract_ids = []
+    for dictionary in list_to_subtract:
+        subtract_ids.append(dictionary["id"])
+    log.debug("Subtracting: %s", subtract_ids)
+    remaining_ids = subtract_common_elements(primary_ids, subtract_ids)
+    log.debug("Number of results: %s", len(remaining_ids))
+    result = []
+    for dictionary in primary_list:
+        if dictionary["id"] in remaining_ids:
+            result.append(dictionary)
+    return result
+
+
+def prompt(question: str) -> bool:
+    """Displays a question; prompts for yes/no and returns a boolean"""
+    while "Invalid selection":
+        reply = str(input("\n" + question + " (y/n): ")).lower().strip()
+        if reply[:1] == "y":
+            return True
+        if reply[:1] == "n":
+            break
+    return False
+
+
+def filter_credentials(
+    match: Optional[List[str]], reject: Optional[List[str]], ignore_case: bool
+) -> List[Dict[str, Any]]:
+    """Refines the current credential set through select/reject criteria"""
+    if match == [] and reject == []:
+        log.error("Error: provide at least one filter operation")
+        log.error("Choose match, reject, or use both together")
+        sys.exit(1)
+
+    # List to hold filtered credentials, initially the complete database
+    credentials = caching.load_cache("credentials")
+    starting_number = len(credentials)
+
+    if match:
+        credentials = match_elements(ignore_case, match, credentials)
+        log.info("Selected:              %s/%s", len(credentials), starting_number)
+        # Print summary
+        log.debug("\n### Selected Credentials ###\n")
+        for credential in credentials:
+            log.debug("%s      %s", credential["id"], credential["title"])
+
+    if reject:
+        rejected = match_elements(ignore_case, reject, credentials)
+        log.info("Subsequently rejected: %s/%s", len(rejected), len(credentials))
+        log.debug("\n### Rejected Credentials ###\n")
+        for credential in rejected:
+            log.debug("%s      %s", credential["id"], credential["title"])
+        if len(rejected) != 0:
+            credentials = subtract_lists(credentials, rejected)
+
+    if credentials is None or len(credentials) == 0:
+        log.info("\nNo results were returned for your filter(s)")
+        sys.exit(1)
+    else:
+        log.info("\nCredentials now selected: %s", len(credentials))
+    return credentials
+
+
+def validate_filter_items_opts(
+    match: Optional[List[str]], reject: Optional[List[str]]
+) -> None:
+    """Handles the options provided to the filter_items sub-command"""
+    if match and reject:
+        log.info("Both match and reject operations were requested...")
+        log.warning("Note: match operations will run first, then reject")
+
+
+def get_credentials_mp(vault: str) -> tuple[str, List[str]]:
+    """Retrieves credential metadata JSON and returns as dictionary"""
+    # Multiprocessing functions need logging setup
+    log = logging.getLogger(__name__)
+    cred_summ_cmd = "op item list --format=json --no-color --vault " + vault
+    raw_data = run_cmd_mp(cred_summ_cmd)
+    vault_credentials = json.loads(raw_data)
+    log.debug("Credentials list:")
+    log.debug(vault_credentials)
+    return (vault, vault_credentials)
+
+
+def import_credentials(vaults: Dict[str, str]) -> None:
+    """Given a dictionary of vaults, populates the credential database(s)"""
+    log.info("Importing credential metadata from 1Password database...")
+
+    vault_credentials = []
+
+    # Use timers to profile performance
+    timer = start_timer()
+
+    with multiprocessing.Pool() as pool:
+        # Call the function for each item in parallel
+        for vault, credentials in pool.map(get_credentials_mp, vaults):
+            vault_creds_dictionary = {vault: credentials}
+            vault_credentials.append(vault_creds_dictionary)
+
+    stop_timer(timer)
+
+    vaults_enumerated = len(vault_credentials)
+    log.info("Credential data gathered for: %s vault(s)", vaults_enumerated)
+    # Save vault detail dictionary to cache file
+    caching.save_cache(vault_credentials, "vault_credentials")
+
+    credentials = []
+    # Copy individual credentials out into an abstract list
+    for dictionary in vault_credentials:
+        dict_credentials = list(dictionary.values())
+        for credential_list in dict_credentials:
+            for credential in credential_list:
+                credentials.append(credential)
+
+    # Print out total number of credentials in database
+    log.info("Credential metadata records loaded: %s", len(credentials))
+    # Save vault detail dictionary to cache file
+    caching.save_cache(credentials, "credentials")
+
+
+def validate_import_data_opts(
+    match: Optional[List[str]], reject: Optional[List[str]]
+) -> None:
+    """Handles the options provided to the import_data sub-command"""
+    log.debug("Validating command-line options/arguments")
+    if reject and match != ["All"]:
+        log.error("Match/reject options are mutually exclusive")
+        sys.exit(1)
+
+
+def get_vault_detail_mp(vault_dictionary: Dict) -> Dict[str, Any]:
+    """Retrieves detailed vault metadata JSON and returns as dictionary"""
+    # Multiprocessing functions need logging setup
+    log = logging.getLogger(__name__)
+    # Enumerate the details of each vault
+    vault_id = vault_dictionary["id"]
+    log.debug("Gathering data for vault: %s", vault_id)
+    vault_detail_cmd = "op vault get " + vault_id + " --format=json --no-color"
+    raw_data = run_cmd_mp(vault_detail_cmd)
+    vault_detail = json.loads(raw_data)
+    log.debug("get_vault_detail_mp returned data for: %s\n%s", vault_id, vault_detail)
+    return vault_detail
+
+
+# Functions to track elapsed time when performing bulk operations
+def start_timer() -> float:
+    """Starts a timer to track functions expected to do bulk work"""
+    log.debug("Timer function started")
+    return time.perf_counter()
+
+
+def stop_timer(started: float) -> None:
+    """Takes the time started and prints the elapsed time"""
+    finished = time.perf_counter()
+    elapsed = finished - started
+    log.debug("Time taken in seconds: %s", elapsed)
+
+
+def populate_vault_json(
+    include: Optional[List[str]], exclude: Optional[List[str]]
+) -> Dict[str, Any]:
+    """Fetches additional vault metadata the summary doesn't provide"""
+
+    # Handle optionals gracefully by defining empty lists
+    if not include:
+        include = []
+    if not exclude:
+        exclude = []
+
+    log.info("Importing data from 1Password database...")
+    vault_list_cmd = "op vault list --format=json --no-color"
+    raw_data = run_cmd_mp(vault_list_cmd)
+    vault_summary = json.loads(raw_data)
+    log.debug("\nVaults summary:\n")
+    log.debug("%s\n", vault_summary)
+    # Save vault summary list to cache file
+    caching.save_cache(vault_summary, "vault_summary")
+
+    # Use timers to profile the performance of this code
+    timer = start_timer()
+    # TODO: Implement progress bar (more complex when multiprocessing)
+
+    vaults_detail = {}
+    with multiprocessing.Pool() as pool:
+        # Call the function for each item in parallel
+        for data in pool.map(get_vault_detail_mp, vault_summary):
+            vault_id: str = data["id"]
+            vault_name: str = data["name"]
+            # Add items conditionally, based on include/exclude options
+            if include == ["All"] and exclude == []:
+                vaults_detail[vault_id] = data
+            if include == ["All"] and exclude != []:
+                # For convenience, include/exclude can match both name/id
+                if vault_id not in exclude and vault_name not in exclude:
+                    vaults_detail[vault_id] = data
+            else:
+                # Include was specified on the command-line
+                if vault_id in include or vault_name in include:
+                    vaults_detail[vault_id] = data
+    stop_timer(timer)
+
+    details_retrieved = len(vaults_detail)
+    if details_retrieved == 0:
+        log.warning("No vaults matched your request; no data imported")
+        sys.exit(1)
+
+    total_vaults = len(vault_summary)
+    log.info("Total number of vaults: %s", total_vaults)
+
+    # Save vault detail dictionary to cache file
+    caching.save_cache(vaults_detail, "vaults_detail")
+
+    # Integrity check; verify we collected detail records for all vaults
+    if details_retrieved != total_vaults:
+        log.info("Vaults imported into cache: %s", details_retrieved)
+
+    return vaults_detail
+
+
+def show_vault_summary(vaults_dictionary: Dict) -> None:
+    """Prints a summary of vaults from a vaults dictionary"""
+    log.info("\n########## Vault Summary ##########\n")
+    log.info("ID                               Name")
+    for vault in vaults_dictionary.values():
+        cred_id = vault["id"]
+        name = vault["name"]
+        log_string = cred_id + "       " + name
+        log.info(log_string)
+    log.info("")
+
+
+def startup_tasks(debug: bool) -> None:
+    """Invokes some common initialisation operations"""
+    setup_logging(debug)
+    op_login(debug)
+
+
+def op_login(debug: bool) -> None:
+    """Makes a connection to the 1Password database/servers"""
+    # Documentation says the default timeout is 30 minutes
+    run_cmd_mp("eval $(op signin)")
+
+    if debug:
+        # Gather some useful debugging information
+        user_info = run_cmd_mp("op whoami")
+        log.debug("\n%s", user_info)
+
+
+def setup_logging(debug: bool) -> None:
+    """Logging setup common to all sub-commands"""
+    console_format = logging.Formatter("%(message)s")
+    file_format = logging.Formatter("%(asctime)s %(levelname)s - %(message)s")
+
+    # Change root logger level from WARNING (default) to NOTSET
+    # (makes sure all messages are delegated)
+    logging.getLogger().setLevel(logging.NOTSET)
+
+    console = logging.StreamHandler(sys.stdout)
+    console.setFormatter(console_format)
+    console.setLevel(logging.INFO)
+
+    if debug:
+        logging.getLogger().setLevel(logging.DEBUG)
+        print("Logging level: " + str(logging.getLogger().getEffectiveLevel()))
+        console.setLevel(logging.DEBUG)
+        # Add second file handler, with level set to DEBUG
+        debug_file = logging.handlers.RotatingFileHandler(filename="debug.log")
+        debug_file.setFormatter(file_format)
+        debug_file.setLevel(logging.DEBUG)
+        logging.getLogger().addHandler(debug_file)
+
+    logging.getLogger().addHandler(console)
+
+    # Default file log output with standard level INFO
+    standard_file = logging.handlers.RotatingFileHandler(filename="standard.log")
+    standard_file.setLevel(logging.INFO)
+    standard_file.setFormatter(file_format)
+    logging.getLogger().addHandler(standard_file)
+
+    # Capture additional user/system information
+    # Report CPUs available to Python multiprocessing
+    log.debug("Processor cores available: %s", multiprocessing.cpu_count())
+    log.debug("Python version: %s", platform.python_version())
+
+
+def common_elements(first: List[str], second: List[str]) -> Set[str]:
+    """Check for common elements in two lists"""
+    return set(first) & set(second)
+
+
+def subtract_common_elements(first: List[str], second: List[str]) -> Set[str]:
+    """Remove common elements from two lists"""
+    return set(first) - set(second)
+
+
+# pylint: disable-next=R1710
+def id_to_label(target: str, element: Dict[str, str]) -> str:
+    """Returns the object name when given an id string"""
+    if element["id"] == target:
+        # TODO: return either element["name"] or element["id"]
+        if element["name"]:
+            return element["name"]
+        if element["title"]:
+            return element["title"]
+    # If internal data structures are correct, the error below should never be thrown
+    log.error("Function id_to_label failed to match requested string: %s", target)
+    sys.exit(1)
+
+
+# pylint: disable-next=R1710
+def label_to_id(target: str, element: Dict[str, str]) -> str:
+    """Returns an id string when given an object name/title"""
+    # Vaults are named
+    if element["name"] == target:
+        return element["id"]
+    # Credentials have titles
+    if element["title"] == target:
+        return element["id"]
+    # If internal data structures are correct, the error below should never be thrown
+    log.error("Function label_to_id failed to match requested string: %s", target)
+    sys.exit(1)
+
+
+def lookup_target(target: str, data: List[Dict]) -> tuple[str, str]:
+    """Returns a tuple of element id/title from a single parameter"""
+    # Vaults can either be specified as a key (id) or a value (name)
+    # Credentials can either be specified as a key (id) or a value (title)
+    # This function returns both properties when provided just one element
+    for dictionary in data:
+        if dictionary[target]:
+            object_id = target
+            label = id_to_label(object_id, dictionary)
+            return (object_id, label)
+        if target in dictionary.values():
+            label = target
+            object_id = label_to_id(label, dictionary)
+            return (object_id, label)
+    log.error("The specified vault id/label was not valid: %s", target)
+    sys.exit(1)
+
+
+def run_cmd_mp(shell_command: str) -> str:
+    """Runs shell commands, returns stdout as text, handles errors"""
+    # Multiprocessing functions need logging setup
+    log = logging.getLogger(__name__)
+    log.debug("Running shell command: %s", shell_command)
+    with Popen(shell_command, stdout=PIPE, stderr=PIPE, shell=True) as command:
+        # Capture the command exit status for error handling
+        output, error = command.communicate(timeout=60)
+        # Convert all command output into text
+        command_output = output.decode("utf-8")
+        command_error = error.decode("utf-8")
+
+        # Create flag to capture error conditions
+        errors = False
+
+        if command.returncode == 127:
+            separate_command_args = shell_command.split(" ", 1)
+            root_command = separate_command_args.pop(0)
+            log.info("A shell command was not found: %s", root_command)
+            errors = True
+        if command.returncode != 0:
+            log.error("Error running command: %s", shell_command)
+
+            log.info("Exit status: %s", command.returncode)
+            # Provide some helpful hints based on the return code
+            if command.returncode == (1, 6):
+                log.info("Unlock/authenticate access to your password vault?")
+                log.info("Try this command from a shell prompt: op signin")
+            errors = True
+        if command_output is None:
+            log.warning("Shell command returned no text output: %s", shell_command)
+            errors = True
+        if errors:
+            # We should always exit with error status if *any* shell commands fail
+            log.error(command_error)
+            log.error("Shell commands resulted in errors; exiting")
+            sys.exit(1)
+
+        # Without errors, return data to the calling site
+        return command_output
diff --git a/src/python_one_password/tags.py b/src/python_one_password/tags.py
new file mode 100755 (executable)
index 0000000..551b4e7
--- /dev/null
@@ -0,0 +1,161 @@
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: Apache-2.0
+##############################################################################
+# Copyright (c) 2023 The Linux Foundation and others.
+#
+# All rights reserved. This program and the accompanying materials are made
+# available under the terms of the Apache-2.0 license which accompanies this
+# distribution, and is available at
+# https://opensource.org/licenses/Apache-2.0
+##############################################################################
+
+"""Provides a library of functions for manipulating tag metadata"""
+
+__author__ = "Matthew Watkins"
+
+# Standard imports, alphabetical order
+import logging
+import logging.handlers
+import sys
+from typing import List
+
+# External modules
+import typer
+
+# Bundled modules
+import python_one_password.caching as caching
+import python_one_password.functions as f
+
+# Setup logging
+log = logging.getLogger(__name__)
+
+
+# Define command structure with typer module
+app = typer.Typer()
+
+
+# Tag operations
+
+
+@app.command()
+def add(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    ),
+    overwrite: bool = typer.Option(
+        False,
+        "--overwrite",
+        "-o",
+        show_default=False,
+        help="Overwrite any pre-existing tags (they are otherwise preserved)",
+    ),
+    tags: List[str] = typer.Argument(
+        None,
+        show_default=False,
+        help="Add tag(s) to the currently selected credentials",
+    ),
+):
+    """Adds tags (to the selected credentials)"""
+    f.startup_tasks(debug)
+    if len(tags) == 0:
+        log.error("To update credentials you must specify at least one tag")
+        sys.exit(1)
+    # Check for duplicate tags
+    if f.contains_duplicates(tags):
+        log.error("You cannot specify duplicate tags!")
+        sys.exit(1)
+    credential_data = caching.load_cache("credentials")
+    f.credential_summary(credential_data, False, True)
+    if overwrite:
+        log.warning("\nWarning: addition operation will overwrite existing tags")
+    log.info("\nTag(s) to apply: %s", tags)
+    f.tags_update(credential_data, tags, overwrite, False)
+
+
+@app.command()
+def allocate(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    ),
+    overwrite: bool = typer.Option(
+        False,
+        "--overwrite",
+        "-o",
+        show_default=False,
+        help="Overwrite any pre-existing tags (they are otherwise preserved)",
+    ),
+    tags: List[str] = typer.Argument(
+        None,
+        show_default=False,
+        help="Allocate tag(s) to the currently selected credentials",
+    ),
+):
+    """Adds tags from a list round-robin (to the selected credentials)"""
+    # Useful for allocating credentials to users in a team, etc
+    f.startup_tasks(debug)
+    # Check for duplicate tags
+    if len(tags) < 2:
+        log.error("Round-robin allocation requires at least two tags!")
+        sys.exit(1)
+    # Check for duplicate tags; preserving order
+    if f.contains_duplicates(tags):
+        log.error("You cannot specify duplicate tags!")
+        sys.exit(1)
+    credential_data = caching.load_cache("credentials")
+    f.credential_summary(credential_data, False, True)
+    log.info("\nTags to allocate: %s", tags)
+    f.tags_update(credential_data, tags, overwrite, True)
+
+
+@app.command()
+def strip(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    )
+):
+    """Strips tags (from the selected credentials)"""
+    f.startup_tasks(debug)
+    credential_data = caching.load_cache("credentials")
+    f.credential_summary(credential_data, False, True)
+    log.warning("Stripping all tags from credentials")
+    f.tags_update(credential_data, [], True, False)
+
+
+@app.command()
+def replace(
+    debug: bool = typer.Option(
+        False,
+        "--debug",
+        "-d",
+        show_default=False,
+        help="Enable verbose debug output/logging",
+    ),
+    search: str = typer.Argument(
+        None, show_default=False, help="Tag to match and substitute"
+    ),
+    # pylint: disable-next=W0621
+    replace: str = typer.Argument(
+        None, show_default=False, help="Replacement for existing tag"
+    ),
+):
+    """Replaces a given tag with another (from the selected credentials)"""
+    f.startup_tasks(debug)
+    if not search or not replace:
+        log.error("You must specify both a search and a replace string!")
+        sys.exit(1)
+    credential_data = caching.load_cache("credentials")
+    # credential_summary(credentials, False)
+    f.tag_search_replace(credential_data, search, replace)