| import json |
| import os |
| import shlex |
| import sys |
| from contextlib import contextmanager |
| from subprocess import Popen |
| from typing import Any, Dict, IO, Iterator, List |
|
|
| try: |
| import click |
| except ImportError: |
| sys.stderr.write('It seems python-dotenv is not installed with cli option. \n' |
| 'Run pip install "python-dotenv[cli]" to fix this.') |
| sys.exit(1) |
|
|
| from .main import dotenv_values, set_key, unset_key |
| from .version import __version__ |
|
|
|
|
| def enumerate_env(): |
| """ |
| Return a path for the ${pwd}/.env file. |
| |
| If pwd does not exist, return None. |
| """ |
| try: |
| cwd = os.getcwd() |
| except FileNotFoundError: |
| return None |
| path = os.path.join(cwd, '.env') |
| return path |
|
|
|
|
| @click.group() |
| @click.option('-f', '--file', default=enumerate_env(), |
| type=click.Path(file_okay=True), |
| help="Location of the .env file, defaults to .env file in current working directory.") |
| @click.option('-q', '--quote', default='always', |
| type=click.Choice(['always', 'never', 'auto']), |
| help="Whether to quote or not the variable values. Default mode is always. This does not affect parsing.") |
| @click.option('-e', '--export', default=False, |
| type=click.BOOL, |
| help="Whether to write the dot file as an executable bash script.") |
| @click.version_option(version=__version__) |
| @click.pass_context |
| def cli(ctx: click.Context, file: Any, quote: Any, export: Any) -> None: |
| """This script is used to set, get or unset values from a .env file.""" |
| ctx.obj = {'QUOTE': quote, 'EXPORT': export, 'FILE': file} |
|
|
|
|
| @contextmanager |
| def stream_file(path: os.PathLike) -> Iterator[IO[str]]: |
| """ |
| Open a file and yield the corresponding (decoded) stream. |
| |
| Exits with error code 2 if the file cannot be opened. |
| """ |
|
|
| try: |
| with open(path) as stream: |
| yield stream |
| except OSError as exc: |
| print(f"Error opening env file: {exc}", file=sys.stderr) |
| exit(2) |
|
|
|
|
| @cli.command() |
| @click.pass_context |
| @click.option('--format', default='simple', |
| type=click.Choice(['simple', 'json', 'shell', 'export']), |
| help="The format in which to display the list. Default format is simple, " |
| "which displays name=value without quotes.") |
| def list(ctx: click.Context, format: bool) -> None: |
| """Display all the stored key/value.""" |
| file = ctx.obj['FILE'] |
|
|
| with stream_file(file) as stream: |
| values = dotenv_values(stream=stream) |
|
|
| if format == 'json': |
| click.echo(json.dumps(values, indent=2, sort_keys=True)) |
| else: |
| prefix = 'export ' if format == 'export' else '' |
| for k in sorted(values): |
| v = values[k] |
| if v is not None: |
| if format in ('export', 'shell'): |
| v = shlex.quote(v) |
| click.echo(f'{prefix}{k}={v}') |
|
|
|
|
| @cli.command() |
| @click.pass_context |
| @click.argument('key', required=True) |
| @click.argument('value', required=True) |
| def set(ctx: click.Context, key: Any, value: Any) -> None: |
| """Store the given key/value.""" |
| file = ctx.obj['FILE'] |
| quote = ctx.obj['QUOTE'] |
| export = ctx.obj['EXPORT'] |
| success, key, value = set_key(file, key, value, quote, export) |
| if success: |
| click.echo(f'{key}={value}') |
| else: |
| exit(1) |
|
|
|
|
| @cli.command() |
| @click.pass_context |
| @click.argument('key', required=True) |
| def get(ctx: click.Context, key: Any) -> None: |
| """Retrieve the value for the given key.""" |
| file = ctx.obj['FILE'] |
|
|
| with stream_file(file) as stream: |
| values = dotenv_values(stream=stream) |
|
|
| stored_value = values.get(key) |
| if stored_value: |
| click.echo(stored_value) |
| else: |
| exit(1) |
|
|
|
|
| @cli.command() |
| @click.pass_context |
| @click.argument('key', required=True) |
| def unset(ctx: click.Context, key: Any) -> None: |
| """Removes the given key.""" |
| file = ctx.obj['FILE'] |
| quote = ctx.obj['QUOTE'] |
| success, key = unset_key(file, key, quote) |
| if success: |
| click.echo(f"Successfully removed {key}") |
| else: |
| exit(1) |
|
|
|
|
| @cli.command(context_settings={'ignore_unknown_options': True}) |
| @click.pass_context |
| @click.option( |
| "--override/--no-override", |
| default=True, |
| help="Override variables from the environment file with those from the .env file.", |
| ) |
| @click.argument('commandline', nargs=-1, type=click.UNPROCESSED) |
| def run(ctx: click.Context, override: bool, commandline: List[str]) -> None: |
| """Run command with environment variables present.""" |
| file = ctx.obj['FILE'] |
| if not os.path.isfile(file): |
| raise click.BadParameter( |
| f'Invalid value for \'-f\' "{file}" does not exist.', |
| ctx=ctx |
| ) |
| dotenv_as_dict = { |
| k: v |
| for (k, v) in dotenv_values(file).items() |
| if v is not None and (override or k not in os.environ) |
| } |
|
|
| if not commandline: |
| click.echo('No command given.') |
| exit(1) |
| ret = run_command(commandline, dotenv_as_dict) |
| exit(ret) |
|
|
|
|
| def run_command(command: List[str], env: Dict[str, str]) -> int: |
| """Run command in sub process. |
| |
| Runs the command in a sub process with the variables from `env` |
| added in the current environment variables. |
| |
| Parameters |
| ---------- |
| command: List[str] |
| The command and it's parameters |
| env: Dict |
| The additional environment variables |
| |
| Returns |
| ------- |
| int |
| The return code of the command |
| |
| """ |
| |
| |
| cmd_env = os.environ.copy() |
| cmd_env.update(env) |
|
|
| p = Popen(command, |
| universal_newlines=True, |
| bufsize=0, |
| shell=False, |
| env=cmd_env) |
| _, _ = p.communicate() |
|
|
| return p.returncode |
|
|