From: Matthew Watkins Date: Tue, 14 Feb 2023 22:19:24 +0000 (+0000) Subject: Chore: Initial code drop of core Python modules X-Git-Tag: v0.1.0~2 X-Git-Url: https://gerrit.linuxfoundation.org/infra/gitweb?a=commitdiff_plain;h=b2de9d6353fc3e4d4e95f8efcfa4cacd70c284e6;p=releng%2Fpython-one-password.git Chore: Initial code drop of core Python modules Change-Id: Iecd5e9dc1cffe38fa5c7a8cbe7fbfda521bbc904 Signed-off-by: Matthew Watkins --- diff --git a/.gitignore b/.gitignore index e9e1e9b..eae96cf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # Temporary and binary files +install.sh *~ *.py[cod] *.so @@ -52,3 +53,6 @@ MANIFEST .venv*/ .conda*/ .python-version + +# Data/output files +*.metadata.json diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2c368e7..27369dc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,21 +16,21 @@ repos: - id: end-of-file-fixer - id: requirements-txt-fixer - id: mixed-line-ending - args: ['--fix=lf'] # replace 'auto' with 'lf' to enforce Linux/Mac line endings or 'crlf' for Windows + args: ['--fix=lf'] - repo: https://github.com/jorisroovers/gitlint rev: v0.17.0 hooks: - id: gitlint -## If you want to automatically "modernize" your Python code: +# If you want to automatically "modernize" your Python code: # - repo: https://github.com/asottile/pyupgrade # rev: v3.3.1 # hooks: # - id: pyupgrade # args: ['--py37-plus'] -## If you want to avoid flake8 errors due to unused vars or imports: +# If you want to avoid flake8 errors due to unused vars or imports: # - repo: https://github.com/PyCQA/autoflake # rev: v2.0.0 # hooks: @@ -52,7 +52,7 @@ repos: - id: black language_version: python3 -## If like to embrace black styles even in the docs: +# If like to embrace black styles even in the docs: # - repo: https://github.com/asottile/blacken-docs # rev: v1.13.0 # hooks: @@ -67,8 +67,19 @@ repos: ## You can add flake8 plugins via `additional_dependencies`: # additional_dependencies: [flake8-bugbear] -## Check for misspells in documentation files: +# Check for misspells in documentation files: # - repo: https://github.com/codespell-project/codespell # rev: v2.2.2 # hooks: # - id: codespell + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.0.0 + hooks: + - id: mypy + + - repo: https://github.com/adrienverge/yamllint.git + rev: v1.29.0 + hooks: + - id: yamllint + types: [yaml] diff --git a/.readthedocs.yml b/.readthedocs.yml index a2bcab3..2b29b56 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -1,3 +1,4 @@ +--- # Read the Docs configuration file # See https://docs.readthedocs.io/en/stable/config-file/v2.html for details @@ -9,7 +10,7 @@ sphinx: configuration: docs/conf.py # Build documentation with MkDocs -#mkdocs: +# mkdocs: # configuration: mkdocs.yml # Optionally build your docs in additional formats such as PDF diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 226e6f5..9d93b7b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,5 @@ Changelog Version 0.1 =========== -- Feature A added -- FIX: nasty bug #1729 fixed -- add your changes here! +- Initial implementation of vault data import and credential tag operations +- Correctly set package dependencies diff --git a/README.rst b/README.rst index e3f4343..fe78c50 100644 --- a/README.rst +++ b/README.rst @@ -37,7 +37,177 @@ python-one-password Tag editor for 1Password -A longer description of your project goes here... +Gathers vault and credential JSON metadata from 1Password databases; enables bulk manipulation of tags + + +Getting Started +=============== + +Before running these tools, you will need to install the 1Password CLI for your operating system. + +1Password CLI + + +Installation +============ + +The python-one-password tool and its dependencies can be installed from PyPI +using the standard Python PIP command: + +`` +% python3 -m pip install python_one_password +`` + +Interactive Help +================ + +The primary command and sub-commands have embedded help, which can be accessed +as shown below: + +`` +% python-one-password --help +% python-one-password credentials --help +% python-one-password tags --help +`` + + +Importing Credentials +===================== + +The first step required to begin working with the 1Password database is to +import credentials from one or more vault(s). + +The first command that creates an interaction with the 1Password CLI tools is +likely to generate an authentication prompt. + +Note: supply your password and/or biometric data to unlock the 1Password database if/when prompted + +You can then import credentials from one or more vaults, as shown below for a vault called "Testing": + +`` + % python-one-password credentials fetch -n -i Testing +Importing data from 1Password database... +Total number of vaults: 20 +Vaults imported into cache: 1 + +########## Vault Summary ########## + +ID Name +cnx76s6avkg3xikw6u5bf7jdki Testing + +Importing credential metadata from 1Password database... +Credential data gathered for: 1 vault(s) +Credential metadata records loaded: 5 +Loaded cached JSON metadata: [5] records + +Review current credential state? (y/n): y + +### Credentials: Current State ### + +yczzflaacyziwew2ahy24kqdxi Test4 +gbikz2upboavuksupx65xb5fie Test5 +fkn3cp42ouua47rqtnergchm6q Test3 +rfoxd64sumvzbk2m7nkruyvr5e Test1 +xiu64ukcwxtxfco7j2wjcf36eq Test2 + +`` + +Once a set of credentials have been loaded, you can review them with: + +`` +% python-one-password credentials show +Loaded cached JSON metadata: [5] records + +### Credentials: Current State ### + +gbikz2upboavuksupx65xb5fie [] Test5 +fkn3cp42ouua47rqtnergchm6q ['c3po', 'luke', 'r2d2', 'chewbacca'] Test3 +yczzflaacyziwew2ahy24kqdxi [] Test4 +rfoxd64sumvzbk2m7nkruyvr5e ['c3po', 'luke', 'chewbacca'] Test1 +xiu64ukcwxtxfco7j2wjcf36eq ['c3po', 'luke', 'r2d2', 'chewbacca'] Test2 +`` + +You can subsequently refine your selection using match/reject search patterns: + +`` + % python-one-password credentials refine --reject chewbacca +Loaded cached JSON metadata: [5] records +Matching query: [3] chewbacca +Subsequently rejected: 3/5 + +Credentials now selected: 2 + +Review current credential state? (y/n): y + +### Credentials: Current State ### + +yczzflaacyziwew2ahy24kqdxi [] Test4 +gbikz2upboavuksupx65xb5fie [] Test5 + +Update working credential set to selection? (y/n): y +`` + +When you have obtained a suitable set of credentials to work with, you can move +on to tag manipulation. + + +Working with tags +================= + +The help for tag manipulation can be called up as follows: + +`` +% python-one-password tags --help +`` + +The basic tag operations are: + +* add Adds tags (to the selected credentials) │ +* allocate Adds tags from a list round-robin (to the selected credentials) │ +* replace Replaces a given tag with another (from the selected credentials) │ +* strip Strips tags (from the selected credentials) + +These cover a broad range of use cases for working with metadata tags. + +Most have an option to either append to the existing tags, or overwrite the +existing tags and discard them. + +If appending would create duplicates, the list is deduplicated before +application to prevent unintended replication during changes. + +It is worth discussing briefly the operation of the "allocate" option. This is +useful where you have a list of team members (staff) who might be assigned a +set of credential as part of a rotation task/project. You can specify a list of +team members on the command-line and the list will be iterated over, allocating +credentials in a round-robin fashion. + +`` + % python-one-password tags allocate -o bob sarah steve +Loaded cached JSON metadata: [5] records + +Review current credential state? (y/n): y + +### Credentials: Current State ### + +gbikz2upboavuksupx65xb5fie [] Test5 +yczzflaacyziwew2ahy24kqdxi [] Test4 +fkn3cp42ouua47rqtnergchm6q ['c3po', 'luke', 'r2d2', 'chewbacca'] Test3 +rfoxd64sumvzbk2m7nkruyvr5e ['c3po', 'luke', 'chewbacca'] Test1 +xiu64ukcwxtxfco7j2wjcf36eq ['c3po', 'luke', 'r2d2', 'chewbacca'] Test2 + +Tags to allocate: ['bob', 'sarah', 'steve'] + +### Credentials: Future State ### + +gbikz2upboavuksupx65xb5fie ['bob'] Test5 +yczzflaacyziwew2ahy24kqdxi ['sarah'] Test4 +fkn3cp42ouua47rqtnergchm6q ['steve'] Test3 +rfoxd64sumvzbk2m7nkruyvr5e ['bob'] Test1 +xiu64ukcwxtxfco7j2wjcf36eq ['sarah'] Test2 + +Commit these updates to the 1Password database? (y/n): y +[5] Credentials updated +`` .. _pyscaffold-notes: @@ -45,16 +215,20 @@ A longer description of your project goes here... Making Changes & Contributing ============================= -This project uses `pre-commit`_, please make sure to install it before making any -changes:: +This project uses `pre-commit`_, please make sure to install it before making +any changes: - pip install pre-commit - cd python-one-password - pre-commit install +`` +% pip install pre-commit +% cd python-one-password +% pre-commit install +`` It is a good idea to update the hooks to the latest version:: - pre-commit autoupdate +`` +% pre-commit autoupdate +`` Don't forget to tell your contributors to also install and use pre-commit. diff --git a/setup.cfg b/setup.cfg index c7cf2ed..f0d00fb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,8 +4,8 @@ # https://setuptools.pypa.io/en/latest/references/keywords.html [metadata] -name = python-one-password -description = Tag editor for 1Password +name = python_one_password +description = Imports metadata from 1Password vaults and allows for bulk manipulation of tags author = Matthew Watkins author_email = mwatkins@linuxfoundation.org license = Apache-2.0 @@ -49,7 +49,7 @@ package_dir = # For more information, check out https://semver.org/. install_requires = importlib-metadata; python_version<"3.8" - + typer[all]~=0.7.0 [options.packages.find] where = src @@ -68,15 +68,8 @@ testing = pytest-cov [options.entry_points] -# Add here console scripts like: -# console_scripts = -# script_name = python_one_password.module:function -# For example: -# console_scripts = -# fibonacci = python_one_password.skeleton:run -# And any other entry points, for example: -# pyscaffold.cli = -# awesome = pyscaffoldext.awesome.extension:AwesomeExtension +console_scripts = + python-one-password = python_one_password.cli:run [tool:pytest] # Specify command line options as you would do when invoking pytest directly. diff --git a/src/python_one_password/__init__.py b/src/python_one_password/__init__.py index c5c960d..5f13da6 100644 --- a/src/python_one_password/__init__.py +++ b/src/python_one_password/__init__.py @@ -8,7 +8,7 @@ else: try: # Change here if project is renamed and does not equal the package name - dist_name = "python-one-password" + dist_name = "python_one_password" __version__ = version(dist_name) except PackageNotFoundError: # pragma: no cover __version__ = "unknown" diff --git a/src/python_one_password/caching.py b/src/python_one_password/caching.py new file mode 100755 index 0000000..170a1a2 --- /dev/null +++ b/src/python_one_password/caching.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2023 The Linux Foundation and others. +# +# All rights reserved. This program and the accompanying materials are made +# available under the terms of the Apache-2.0 license which accompanies this +# distribution, and is available at +# https://opensource.org/licenses/Apache-2.0 +############################################################################## + +"""Functions to load/save cached JSON metadata""" + +__author__ = "Matthew Watkins" + +# Standard imports, alphabetical order ### +import json +import logging +import os +import os.path +import sys +import time +from typing import Any, List + +# Setup logging +log = logging.getLogger(__name__) + + +# Define the filesystem names of various JSON metadata cache files +datastores = { + "vault_summary": "vault.summary.metadata.json", + "vaults_detail": "vault.detailed.metadata.json", + "vault_credentials": "vault.credentials.metadata.json", + "credentials": "credentials.metadata.json", +} + + +# File/caching operations + + +def load_json_file(filename: str) -> str: + """Returns JSON data object from a given file""" + try: + with open(filename, encoding="utf-8") as open_file: + data = json.loads(open_file.read()) + log.debug("JSON read from file: %s", filename) + except IOError as error: + log.error("Error reading JSON from file: %s", filename) + log.error(error) + sys.exit(1) + return data + + +def save_json_file(json_data: List[str], filename: str): + """Saves a JSON data object to disk""" + try: + with open(filename, "w", encoding="utf-8") as write_file: + json.dump(json_data, write_file) + log.debug("JSON written to file: %s", filename) + except IOError as error: + log.error("Error writing JSON to file: %s", filename) + log.error(error) + sys.exit(1) + + +def validate_cache(filename: str): + """Checks local metadata for freshness""" + if not check_cache_age(filename, 3600): + refresh_cache() + + +def get_file_age(filepath: str) -> float: + """Returns the age of a file on disk in seconds""" + return time.time() - os.path.getmtime(filepath) + + +def load_cache(data_store: str): + """Loads data from the filesystem and returns the JSON""" + log.debug("Request to load records from cache: %s", data_store) + if filename := datastores.get(data_store, None): + # Check cache validity + validate_cache(filename) + data = load_json_file(filename) + log.info("Loaded cached JSON metadata: [%s] records", len(data)) + log.debug("\n%s", data) + return data + log.error("The requested cache does not exist: %s", data_store) + sys.exit(1) + + +def save_cache(json_data: Any, data_store: str): + """Saves JSON data to the filesystem""" + log.debug("Saving [%s] records to cache: %s", len(json_data), data_store) + if filename := datastores.get(data_store, None): + save_json_file(json_data, filename) + return + log.error("The requested cache does not exist: %s", data_store) + sys.exit(1) + + +def check_cache_age(filename: str, max_age: int) -> bool: + """Returns cache validity when given a maximum age (in seconds)""" + log.debug("Cache lifetime/expiry set to: %s seconds", max_age) + if os.path.isfile(filename): + age = get_file_age(filename) + if age < max_age: + return True + return False + log.error("The requested file could not be opened: %s", filename) + sys.exit(1) + + +# TODO: check metadata modification times and version numbers; selectively reload data +def refresh_cache(): + """Currently throws an error; could eventually invoke cache validation""" + log.error("The local cache file(s) are invalid or have expired") + log.error("Please reload credentials from the required vault(s)") + sys.exit(1) diff --git a/src/python_one_password/cli.py b/src/python_one_password/cli.py new file mode 100755 index 0000000..0e02288 --- /dev/null +++ b/src/python_one_password/cli.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2023 The Linux Foundation and others. +# +# All rights reserved. This program and the accompanying materials are made +# available under the terms of the Apache-2.0 license which accompanies this +# distribution, and is available at +# https://opensource.org/licenses/Apache-2.0 +############################################################################## + +"""Python wrapper for manipulating 1Password credential metadata/tags""" + +__author__ = "Matthew Watkins" + +# External modules +import typer + +# Bundled modules +from python_one_password.credentials import app as credentials +from python_one_password.tags import app as tags + +# Define command structure with typer module +app = typer.Typer() + + +# Additional sub-commands + + +app.add_typer( + credentials, + name="credentials", + help="Imports and filters credentials from 1Password", +) + +app.add_typer( + tags, + name="tags", + help="Manipulates metadata tags of the current credentials", +) + + +def run(): + app() diff --git a/src/python_one_password/credentials.py b/src/python_one_password/credentials.py new file mode 100755 index 0000000..503a9d1 --- /dev/null +++ b/src/python_one_password/credentials.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2023 The Linux Foundation and others. +# +# All rights reserved. This program and the accompanying materials are made +# available under the terms of the Apache-2.0 license which accompanies this +# distribution, and is available at +# https://opensource.org/licenses/Apache-2.0 +############################################################################## + +"""Python wrapper for manipulating 1Password credential metadata/tags""" + +__author__ = "Matthew Watkins" + +# Standard imports, alphabetical order +import logging +import logging.handlers +from typing import List, Optional + +# External modules +import typer + +# Bundled modules +import python_one_password.caching as caching +import python_one_password.functions as f + +# Setup logging +log = logging.getLogger(__name__) + + +# Define command structure with typer module +app = typer.Typer() + + +# Credential operations + + +@app.command() +def fetch( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ), + include: Optional[List[str]] = typer.Option( + ["All"], + "--include", + "-i", + envvar="OP_VAULT_INC", + help="Includes the specified vault(s) from processing", + ), + exclude: Optional[List[str]] = typer.Option( + None, + "--exclude", + "-e", + envvar="OP_VAULT_EXC", + help="Excludes the specified vault(s) from processing", + ), + no_tags: bool = typer.Option( + False, "--no-tags", "-n", help="Hide metadata tags in credential summary/output" + ), +): + """Import vaults and credentials from the 1Password database""" + f.startup_tasks(debug) + f.validate_import_data_opts(include, exclude) + vaults_dictionary = f.populate_vault_json(include, exclude) + f.show_vault_summary(vaults_dictionary) + f.import_credentials(vaults_dictionary) + credential_data = f.caching.load_cache("credentials") + f.credential_summary(credential_data, no_tags, True) + + +@app.command() +def vaults( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ) +): + """Show credentials in current/filtered working set""" + f.startup_tasks(debug) + vault_summ_cmd = "op vaults list" + raw_data = f.run_cmd_mp(vault_summ_cmd) + log.info(raw_data) + + +@app.command() +def show( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ), + no_tags: bool = typer.Option( + False, "--no-tags", "-n", help="Hide metadata tags in credential summary/output" + ), +): + """Show credentials in current/filtered working set""" + f.startup_tasks(debug) + credential_data = caching.load_cache("credentials") + f.credential_summary(credential_data, no_tags, False) + + +@app.command() +def refine( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ), + no_tags: bool = typer.Option( + False, "--no-tags", "-n", help="Hide metadata tags in credential summary/output" + ), + match: Optional[List[str]] = typer.Option( + None, + "--match", + "-m", + help="Match/select credentials containing the specified text", + ), + reject: Optional[List[str]] = typer.Option( + None, + "--reject", + "-r", + help="Reject/exclude credentials containing the specified text", + ), + ignore_case: bool = typer.Option( + False, + "--ignore-case", + "-i", + show_default=True, + help="Ignore case when matching strings in credentials", + ), +): + """Refine credential selection using match/reject (string) operations""" + f.startup_tasks(debug) + f.validate_filter_items_opts(match, reject) + credential_data = f.filter_credentials(match, reject, ignore_case) + f.credential_summary(credential_data, no_tags, True) + if f.prompt("Update working credential set to selection?"): + caching.save_cache(credential_data, "credentials") + else: + log.info("Select/reject did not modify credential database") diff --git a/src/python_one_password/functions.py b/src/python_one_password/functions.py new file mode 100755 index 0000000..f00d820 --- /dev/null +++ b/src/python_one_password/functions.py @@ -0,0 +1,654 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2023 The Linux Foundation and others. +# +# All rights reserved. This program and the accompanying materials are made +# available under the terms of the Apache-2.0 license which accompanies this +# distribution, and is available at +# https://opensource.org/licenses/Apache-2.0 +############################################################################## + +"""Shared functions for use when interacting with 1Password""" + +__author__ = "Matthew Watkins" + +import json +import logging +import logging.handlers +import multiprocessing +import os +import os.path +import platform +import sys +import time + +# Standard imports, alphabetical order +from itertools import cycle +from subprocess import PIPE, Popen +from typing import Any, Dict, List, Optional, Set + +# Bundled modules +import python_one_password.caching as caching + +# Setup logging +log = logging.getLogger(__name__) + +# Define variables +# Get the home and present working directories +home_dir = os.path.expanduser("~") +pwd = os.getcwd() + +# Used to source passwords from the shared password store +pass_mapping_file = home_dir + "/.password-store/.shared-configs/cloud-mappings.txt" + + +# General/shared functions + + +def contains_duplicates(this_list: List[str]) -> bool: + """Determines if a list contains duplicate elements""" + return this_list != list(dict.fromkeys(this_list)) + + +def deduplicate_list(this_list: List[str]) -> List[str]: + """De-duplicates a given list, preserving original order""" + return list(dict.fromkeys(this_list)) + + +def list_to_csv(this_list: List[str]) -> str: + """Returns a comma separated string from a list object""" + delim = "," + return delim.join(list(map(str, this_list))) + + +def cred_update_refresh( + update_commands: List[str], refresh_commands: List[str] +) -> None: + """Modifies credential data and refreshes local metadata""" + if not prompt("Commit these updates to the 1Password database?"): + sys.exit(0) + + # Use timers to profile performance + timer = start_timer() + # Code must be single-threaded when making edits + # TODO: implement a proper counter here to better progress + log.info("Updating [%s] records:", len(update_commands)) + log_level = logging.getLogger().getEffectiveLevel() + for command in update_commands: + cmd_output = run_cmd_mp(command) + if log_level == 10: + log.debug(cmd_output) + else: + # Display progress on console when not debugging + print(".", end="") + stop_timer(timer) + if log_level != 10: + # Terminate progress line on console + print("") + + # Refresh the local credential cache after updates + + # Use timers to profile performance + timer = start_timer() + updated_credential_db = [] + log.info("Multiprocessing call refreshing credential metadata") + with multiprocessing.Pool() as pool: + # Call the function for each item in parallel + for raw_data in pool.map(run_cmd_mp, refresh_commands): + log.debug(raw_data) + updated_credential = json.loads(raw_data) + updated_credential_db.append(updated_credential) + stop_timer(timer) + + # Save vault detail dictionary to cache file + caching.save_cache(updated_credential_db, "credentials") + log.info("Credential metadata tags updated") + + +def tag_search_replace( + credentials: List[Dict], search_string: str, replace_string: str +) -> None: + """Searches credentials for a tag and replaces it with another""" + log.debug( + "\nSearching for: [%s] Replacing with: [%s]\n", search_string, replace_string + ) + + update_commands = [] + refresh_commands = [] + matches = 0 + + for credential in credentials: + cred_id: str = credential["id"] + name: str = credential["title"] + try: + tags: str = credential["tags"] + except KeyError: + # This credential has no tags + # current_tags = [] + continue + + if search_string in tags: + log.debug("Matched: %s %s %s", cred_id, tags, name) + matches += 1 + + new_tags = [] + for tag in tags: + new_tag = tag.replace(search_string, replace_string) + new_tags.append(new_tag) + + # At this point duplicate tags are a concern; deduplicate them + new_tags = deduplicate_list(new_tags) + + log.info("Updating: %s %s %s", cred_id, new_tags, name) + + tags_string = list_to_csv(new_tags) + + update_commands.append("op item edit " + cred_id + " --tags " + tags_string) + refresh_commands.append( + "op item get " + cred_id + " --format=json --no-color" + ) + + log.info("Total matches: %s", matches) + if matches > 0: + cred_update_refresh(update_commands, refresh_commands) + + +def tags_update( + credentials: List[Dict], tags: List[str], overwrite: bool, round_robin: bool +) -> None: + """Performs tag operations on all currently selected credentials""" + update_commands = [] + refresh_commands = [] + + # Used only for round-robin allocations + tags_cycle = cycle(tags) + + def next_tag(): + return next(tags_cycle) + + log.info("\n### Credentials: Future State ###\n") + for credential in credentials: + cred_id = credential["id"] + name = credential["title"] + current_tags: List[str] = [] + try: + current_tags = credential["tags"] + except KeyError: + # Empty list already defined; no need to handle the error + pass + + # When allocating tags, extract the next one from the list + if round_robin: + tags = [next_tag()] + + if not overwrite: + # When NOT overwriting, add new tags to existing tags + tags = tags + current_tags + + # De-duplicate list of tags; important when NOT overwriting + # As we may be adding same tag may already exist... + tags = deduplicate_list(tags) + + log.info("%s %s %s", cred_id, tags, name) + + if not tags: + tags_string = '""' + else: + tags_string = list_to_csv(tags) + + update_commands.append("op item edit " + cred_id + " --tags " + tags_string) + refresh_commands.append("op item get " + cred_id + " --format=json --no-color") + + cred_update_refresh(update_commands, refresh_commands) + + +def match_strings(ignore_case: bool, search_pattern: str, string: str) -> bool: + """Returns true/false on string matching; optionally case-insensitive""" + if ignore_case is True: + if search_pattern.lower() in string.lower(): + return True + else: + if search_pattern in string: + return True + # Sub-string was not found in string + return False + + +def credential_summary( + credential_list: List[Dict], no_tags: bool, prompt_flag: bool +) -> None: + """Displays a list/summary of the current working set of credentials""" + + # Prompt the user to review/display credentials + # Note: can optionally be bypassed by setting prompt_flag to False + if prompt_flag and not prompt("Review current credential state?"): + return + + log.info("\n### Credentials: Current State ###\n") + # Note: column output is tab separated + for credential in credential_list: + cred_id = credential["id"] + name = credential["title"] + # Tag display might make for messy output and take up excessive screen space + # We therefore provide an option to suppress them in summary/output + if no_tags: + log.info("%s %s", cred_id, name) + continue + # Not all credentials have tags, catch the exception + try: + tags = credential["tags"] + log.info("%s %s %s", cred_id, tags, name) + except KeyError: + # Note: credentials without tags are padded with empty square brackets + log.info("%s [] %s", cred_id, name) + + +def match_elements( + ignore_case: bool, search_patterns: List[str], list_of_dictionaries: List[Dict] +) -> List[Dict]: + """Returns a subset of elements matching a query from a list of dictionaries""" + matched = [] + # Search individual term from a list of terms + for search_pattern in search_patterns: + # Create a counter to store searches matched for this specific query + pattern_matches = 0 + for element in list_of_dictionaries: + match = match_strings(ignore_case, search_pattern, str(element)) + # Need to prevent duplicates + if match and element not in matched: + pattern_matches += 1 + matched.append(element) + log.info("Matching query: [%s] %s", pattern_matches, search_pattern) + return matched + + +def subtract_lists( + primary_list: List[Dict], list_to_subtract: List[Dict] +) -> List[Dict]: + """Subtracts one list of dictionaries from another based on id values""" + primary_ids = [] + for dictionary in primary_list: + primary_ids.append(dictionary["id"]) + log.debug("Identities: %s", primary_ids) + subtract_ids = [] + for dictionary in list_to_subtract: + subtract_ids.append(dictionary["id"]) + log.debug("Subtracting: %s", subtract_ids) + remaining_ids = subtract_common_elements(primary_ids, subtract_ids) + log.debug("Number of results: %s", len(remaining_ids)) + result = [] + for dictionary in primary_list: + if dictionary["id"] in remaining_ids: + result.append(dictionary) + return result + + +def prompt(question: str) -> bool: + """Displays a question; prompts for yes/no and returns a boolean""" + while "Invalid selection": + reply = str(input("\n" + question + " (y/n): ")).lower().strip() + if reply[:1] == "y": + return True + if reply[:1] == "n": + break + return False + + +def filter_credentials( + match: Optional[List[str]], reject: Optional[List[str]], ignore_case: bool +) -> List[Dict[str, Any]]: + """Refines the current credential set through select/reject criteria""" + if match == [] and reject == []: + log.error("Error: provide at least one filter operation") + log.error("Choose match, reject, or use both together") + sys.exit(1) + + # List to hold filtered credentials, initially the complete database + credentials = caching.load_cache("credentials") + starting_number = len(credentials) + + if match: + credentials = match_elements(ignore_case, match, credentials) + log.info("Selected: %s/%s", len(credentials), starting_number) + # Print summary + log.debug("\n### Selected Credentials ###\n") + for credential in credentials: + log.debug("%s %s", credential["id"], credential["title"]) + + if reject: + rejected = match_elements(ignore_case, reject, credentials) + log.info("Subsequently rejected: %s/%s", len(rejected), len(credentials)) + log.debug("\n### Rejected Credentials ###\n") + for credential in rejected: + log.debug("%s %s", credential["id"], credential["title"]) + if len(rejected) != 0: + credentials = subtract_lists(credentials, rejected) + + if credentials is None or len(credentials) == 0: + log.info("\nNo results were returned for your filter(s)") + sys.exit(1) + else: + log.info("\nCredentials now selected: %s", len(credentials)) + return credentials + + +def validate_filter_items_opts( + match: Optional[List[str]], reject: Optional[List[str]] +) -> None: + """Handles the options provided to the filter_items sub-command""" + if match and reject: + log.info("Both match and reject operations were requested...") + log.warning("Note: match operations will run first, then reject") + + +def get_credentials_mp(vault: str) -> tuple[str, List[str]]: + """Retrieves credential metadata JSON and returns as dictionary""" + # Multiprocessing functions need logging setup + log = logging.getLogger(__name__) + cred_summ_cmd = "op item list --format=json --no-color --vault " + vault + raw_data = run_cmd_mp(cred_summ_cmd) + vault_credentials = json.loads(raw_data) + log.debug("Credentials list:") + log.debug(vault_credentials) + return (vault, vault_credentials) + + +def import_credentials(vaults: Dict[str, str]) -> None: + """Given a dictionary of vaults, populates the credential database(s)""" + log.info("Importing credential metadata from 1Password database...") + + vault_credentials = [] + + # Use timers to profile performance + timer = start_timer() + + with multiprocessing.Pool() as pool: + # Call the function for each item in parallel + for vault, credentials in pool.map(get_credentials_mp, vaults): + vault_creds_dictionary = {vault: credentials} + vault_credentials.append(vault_creds_dictionary) + + stop_timer(timer) + + vaults_enumerated = len(vault_credentials) + log.info("Credential data gathered for: %s vault(s)", vaults_enumerated) + # Save vault detail dictionary to cache file + caching.save_cache(vault_credentials, "vault_credentials") + + credentials = [] + # Copy individual credentials out into an abstract list + for dictionary in vault_credentials: + dict_credentials = list(dictionary.values()) + for credential_list in dict_credentials: + for credential in credential_list: + credentials.append(credential) + + # Print out total number of credentials in database + log.info("Credential metadata records loaded: %s", len(credentials)) + # Save vault detail dictionary to cache file + caching.save_cache(credentials, "credentials") + + +def validate_import_data_opts( + match: Optional[List[str]], reject: Optional[List[str]] +) -> None: + """Handles the options provided to the import_data sub-command""" + log.debug("Validating command-line options/arguments") + if reject and match != ["All"]: + log.error("Match/reject options are mutually exclusive") + sys.exit(1) + + +def get_vault_detail_mp(vault_dictionary: Dict) -> Dict[str, Any]: + """Retrieves detailed vault metadata JSON and returns as dictionary""" + # Multiprocessing functions need logging setup + log = logging.getLogger(__name__) + # Enumerate the details of each vault + vault_id = vault_dictionary["id"] + log.debug("Gathering data for vault: %s", vault_id) + vault_detail_cmd = "op vault get " + vault_id + " --format=json --no-color" + raw_data = run_cmd_mp(vault_detail_cmd) + vault_detail = json.loads(raw_data) + log.debug("get_vault_detail_mp returned data for: %s\n%s", vault_id, vault_detail) + return vault_detail + + +# Functions to track elapsed time when performing bulk operations +def start_timer() -> float: + """Starts a timer to track functions expected to do bulk work""" + log.debug("Timer function started") + return time.perf_counter() + + +def stop_timer(started: float) -> None: + """Takes the time started and prints the elapsed time""" + finished = time.perf_counter() + elapsed = finished - started + log.debug("Time taken in seconds: %s", elapsed) + + +def populate_vault_json( + include: Optional[List[str]], exclude: Optional[List[str]] +) -> Dict[str, Any]: + """Fetches additional vault metadata the summary doesn't provide""" + + # Handle optionals gracefully by defining empty lists + if not include: + include = [] + if not exclude: + exclude = [] + + log.info("Importing data from 1Password database...") + vault_list_cmd = "op vault list --format=json --no-color" + raw_data = run_cmd_mp(vault_list_cmd) + vault_summary = json.loads(raw_data) + log.debug("\nVaults summary:\n") + log.debug("%s\n", vault_summary) + # Save vault summary list to cache file + caching.save_cache(vault_summary, "vault_summary") + + # Use timers to profile the performance of this code + timer = start_timer() + # TODO: Implement progress bar (more complex when multiprocessing) + + vaults_detail = {} + with multiprocessing.Pool() as pool: + # Call the function for each item in parallel + for data in pool.map(get_vault_detail_mp, vault_summary): + vault_id: str = data["id"] + vault_name: str = data["name"] + # Add items conditionally, based on include/exclude options + if include == ["All"] and exclude == []: + vaults_detail[vault_id] = data + if include == ["All"] and exclude != []: + # For convenience, include/exclude can match both name/id + if vault_id not in exclude and vault_name not in exclude: + vaults_detail[vault_id] = data + else: + # Include was specified on the command-line + if vault_id in include or vault_name in include: + vaults_detail[vault_id] = data + stop_timer(timer) + + details_retrieved = len(vaults_detail) + if details_retrieved == 0: + log.warning("No vaults matched your request; no data imported") + sys.exit(1) + + total_vaults = len(vault_summary) + log.info("Total number of vaults: %s", total_vaults) + + # Save vault detail dictionary to cache file + caching.save_cache(vaults_detail, "vaults_detail") + + # Integrity check; verify we collected detail records for all vaults + if details_retrieved != total_vaults: + log.info("Vaults imported into cache: %s", details_retrieved) + + return vaults_detail + + +def show_vault_summary(vaults_dictionary: Dict) -> None: + """Prints a summary of vaults from a vaults dictionary""" + log.info("\n########## Vault Summary ##########\n") + log.info("ID Name") + for vault in vaults_dictionary.values(): + cred_id = vault["id"] + name = vault["name"] + log_string = cred_id + " " + name + log.info(log_string) + log.info("") + + +def startup_tasks(debug: bool) -> None: + """Invokes some common initialisation operations""" + setup_logging(debug) + op_login(debug) + + +def op_login(debug: bool) -> None: + """Makes a connection to the 1Password database/servers""" + # Documentation says the default timeout is 30 minutes + run_cmd_mp("eval $(op signin)") + + if debug: + # Gather some useful debugging information + user_info = run_cmd_mp("op whoami") + log.debug("\n%s", user_info) + + +def setup_logging(debug: bool) -> None: + """Logging setup common to all sub-commands""" + console_format = logging.Formatter("%(message)s") + file_format = logging.Formatter("%(asctime)s %(levelname)s - %(message)s") + + # Change root logger level from WARNING (default) to NOTSET + # (makes sure all messages are delegated) + logging.getLogger().setLevel(logging.NOTSET) + + console = logging.StreamHandler(sys.stdout) + console.setFormatter(console_format) + console.setLevel(logging.INFO) + + if debug: + logging.getLogger().setLevel(logging.DEBUG) + print("Logging level: " + str(logging.getLogger().getEffectiveLevel())) + console.setLevel(logging.DEBUG) + # Add second file handler, with level set to DEBUG + debug_file = logging.handlers.RotatingFileHandler(filename="debug.log") + debug_file.setFormatter(file_format) + debug_file.setLevel(logging.DEBUG) + logging.getLogger().addHandler(debug_file) + + logging.getLogger().addHandler(console) + + # Default file log output with standard level INFO + standard_file = logging.handlers.RotatingFileHandler(filename="standard.log") + standard_file.setLevel(logging.INFO) + standard_file.setFormatter(file_format) + logging.getLogger().addHandler(standard_file) + + # Capture additional user/system information + # Report CPUs available to Python multiprocessing + log.debug("Processor cores available: %s", multiprocessing.cpu_count()) + log.debug("Python version: %s", platform.python_version()) + + +def common_elements(first: List[str], second: List[str]) -> Set[str]: + """Check for common elements in two lists""" + return set(first) & set(second) + + +def subtract_common_elements(first: List[str], second: List[str]) -> Set[str]: + """Remove common elements from two lists""" + return set(first) - set(second) + + +# pylint: disable-next=R1710 +def id_to_label(target: str, element: Dict[str, str]) -> str: + """Returns the object name when given an id string""" + if element["id"] == target: + # TODO: return either element["name"] or element["id"] + if element["name"]: + return element["name"] + if element["title"]: + return element["title"] + # If internal data structures are correct, the error below should never be thrown + log.error("Function id_to_label failed to match requested string: %s", target) + sys.exit(1) + + +# pylint: disable-next=R1710 +def label_to_id(target: str, element: Dict[str, str]) -> str: + """Returns an id string when given an object name/title""" + # Vaults are named + if element["name"] == target: + return element["id"] + # Credentials have titles + if element["title"] == target: + return element["id"] + # If internal data structures are correct, the error below should never be thrown + log.error("Function label_to_id failed to match requested string: %s", target) + sys.exit(1) + + +def lookup_target(target: str, data: List[Dict]) -> tuple[str, str]: + """Returns a tuple of element id/title from a single parameter""" + # Vaults can either be specified as a key (id) or a value (name) + # Credentials can either be specified as a key (id) or a value (title) + # This function returns both properties when provided just one element + for dictionary in data: + if dictionary[target]: + object_id = target + label = id_to_label(object_id, dictionary) + return (object_id, label) + if target in dictionary.values(): + label = target + object_id = label_to_id(label, dictionary) + return (object_id, label) + log.error("The specified vault id/label was not valid: %s", target) + sys.exit(1) + + +def run_cmd_mp(shell_command: str) -> str: + """Runs shell commands, returns stdout as text, handles errors""" + # Multiprocessing functions need logging setup + log = logging.getLogger(__name__) + log.debug("Running shell command: %s", shell_command) + with Popen(shell_command, stdout=PIPE, stderr=PIPE, shell=True) as command: + # Capture the command exit status for error handling + output, error = command.communicate(timeout=60) + # Convert all command output into text + command_output = output.decode("utf-8") + command_error = error.decode("utf-8") + + # Create flag to capture error conditions + errors = False + + if command.returncode == 127: + separate_command_args = shell_command.split(" ", 1) + root_command = separate_command_args.pop(0) + log.info("A shell command was not found: %s", root_command) + errors = True + if command.returncode != 0: + log.error("Error running command: %s", shell_command) + + log.info("Exit status: %s", command.returncode) + # Provide some helpful hints based on the return code + if command.returncode == (1, 6): + log.info("Unlock/authenticate access to your password vault?") + log.info("Try this command from a shell prompt: op signin") + errors = True + if command_output is None: + log.warning("Shell command returned no text output: %s", shell_command) + errors = True + if errors: + # We should always exit with error status if *any* shell commands fail + log.error(command_error) + log.error("Shell commands resulted in errors; exiting") + sys.exit(1) + + # Without errors, return data to the calling site + return command_output diff --git a/src/python_one_password/tags.py b/src/python_one_password/tags.py new file mode 100755 index 0000000..551b4e7 --- /dev/null +++ b/src/python_one_password/tags.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: Apache-2.0 +############################################################################## +# Copyright (c) 2023 The Linux Foundation and others. +# +# All rights reserved. This program and the accompanying materials are made +# available under the terms of the Apache-2.0 license which accompanies this +# distribution, and is available at +# https://opensource.org/licenses/Apache-2.0 +############################################################################## + +"""Provides a library of functions for manipulating tag metadata""" + +__author__ = "Matthew Watkins" + +# Standard imports, alphabetical order +import logging +import logging.handlers +import sys +from typing import List + +# External modules +import typer + +# Bundled modules +import python_one_password.caching as caching +import python_one_password.functions as f + +# Setup logging +log = logging.getLogger(__name__) + + +# Define command structure with typer module +app = typer.Typer() + + +# Tag operations + + +@app.command() +def add( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ), + overwrite: bool = typer.Option( + False, + "--overwrite", + "-o", + show_default=False, + help="Overwrite any pre-existing tags (they are otherwise preserved)", + ), + tags: List[str] = typer.Argument( + None, + show_default=False, + help="Add tag(s) to the currently selected credentials", + ), +): + """Adds tags (to the selected credentials)""" + f.startup_tasks(debug) + if len(tags) == 0: + log.error("To update credentials you must specify at least one tag") + sys.exit(1) + # Check for duplicate tags + if f.contains_duplicates(tags): + log.error("You cannot specify duplicate tags!") + sys.exit(1) + credential_data = caching.load_cache("credentials") + f.credential_summary(credential_data, False, True) + if overwrite: + log.warning("\nWarning: addition operation will overwrite existing tags") + log.info("\nTag(s) to apply: %s", tags) + f.tags_update(credential_data, tags, overwrite, False) + + +@app.command() +def allocate( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ), + overwrite: bool = typer.Option( + False, + "--overwrite", + "-o", + show_default=False, + help="Overwrite any pre-existing tags (they are otherwise preserved)", + ), + tags: List[str] = typer.Argument( + None, + show_default=False, + help="Allocate tag(s) to the currently selected credentials", + ), +): + """Adds tags from a list round-robin (to the selected credentials)""" + # Useful for allocating credentials to users in a team, etc + f.startup_tasks(debug) + # Check for duplicate tags + if len(tags) < 2: + log.error("Round-robin allocation requires at least two tags!") + sys.exit(1) + # Check for duplicate tags; preserving order + if f.contains_duplicates(tags): + log.error("You cannot specify duplicate tags!") + sys.exit(1) + credential_data = caching.load_cache("credentials") + f.credential_summary(credential_data, False, True) + log.info("\nTags to allocate: %s", tags) + f.tags_update(credential_data, tags, overwrite, True) + + +@app.command() +def strip( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ) +): + """Strips tags (from the selected credentials)""" + f.startup_tasks(debug) + credential_data = caching.load_cache("credentials") + f.credential_summary(credential_data, False, True) + log.warning("Stripping all tags from credentials") + f.tags_update(credential_data, [], True, False) + + +@app.command() +def replace( + debug: bool = typer.Option( + False, + "--debug", + "-d", + show_default=False, + help="Enable verbose debug output/logging", + ), + search: str = typer.Argument( + None, show_default=False, help="Tag to match and substitute" + ), + # pylint: disable-next=W0621 + replace: str = typer.Argument( + None, show_default=False, help="Replacement for existing tag" + ), +): + """Replaces a given tag with another (from the selected credentials)""" + f.startup_tasks(debug) + if not search or not replace: + log.error("You must specify both a search and a replace string!") + sys.exit(1) + credential_data = caching.load_cache("credentials") + # credential_summary(credentials, False) + f.tag_search_replace(credential_data, search, replace)