Spaces:
Running
Running
import io | |
import logging | |
import os | |
import pathlib | |
import shutil | |
import sys | |
import tempfile | |
from collections import OrderedDict | |
from contextlib import contextmanager | |
from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple, | |
Union) | |
from .parser import Binding, parse_stream | |
from .variables import parse_variables | |
# A type alias for a string path to be used for the paths in this file. | |
# These paths may flow to `open()` and `shutil.move()`; `shutil.move()` | |
# only accepts string paths, not byte paths or file descriptors. See | |
# https://github.com/python/typeshed/pull/6832. | |
StrPath = Union[str, 'os.PathLike[str]'] | |
logger = logging.getLogger(__name__) | |
def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]: | |
for mapping in mappings: | |
if mapping.error: | |
logger.warning( | |
"Python-dotenv could not parse statement starting at line %s", | |
mapping.original.line, | |
) | |
yield mapping | |
class DotEnv: | |
def __init__( | |
self, | |
dotenv_path: Optional[StrPath], | |
stream: Optional[IO[str]] = None, | |
verbose: bool = False, | |
encoding: Optional[str] = None, | |
interpolate: bool = True, | |
override: bool = True, | |
) -> None: | |
self.dotenv_path: Optional[StrPath] = dotenv_path | |
self.stream: Optional[IO[str]] = stream | |
self._dict: Optional[Dict[str, Optional[str]]] = None | |
self.verbose: bool = verbose | |
self.encoding: Optional[str] = encoding | |
self.interpolate: bool = interpolate | |
self.override: bool = override | |
def _get_stream(self) -> Iterator[IO[str]]: | |
if self.dotenv_path and os.path.isfile(self.dotenv_path): | |
with open(self.dotenv_path, encoding=self.encoding) as stream: | |
yield stream | |
elif self.stream is not None: | |
yield self.stream | |
else: | |
if self.verbose: | |
logger.info( | |
"Python-dotenv could not find configuration file %s.", | |
self.dotenv_path or '.env', | |
) | |
yield io.StringIO('') | |
def dict(self) -> Dict[str, Optional[str]]: | |
"""Return dotenv as dict""" | |
if self._dict: | |
return self._dict | |
raw_values = self.parse() | |
if self.interpolate: | |
self._dict = OrderedDict(resolve_variables(raw_values, override=self.override)) | |
else: | |
self._dict = OrderedDict(raw_values) | |
return self._dict | |
def parse(self) -> Iterator[Tuple[str, Optional[str]]]: | |
with self._get_stream() as stream: | |
for mapping in with_warn_for_invalid_lines(parse_stream(stream)): | |
if mapping.key is not None: | |
yield mapping.key, mapping.value | |
def set_as_environment_variables(self) -> bool: | |
""" | |
Load the current dotenv as system environment variable. | |
""" | |
if not self.dict(): | |
return False | |
for k, v in self.dict().items(): | |
if k in os.environ and not self.override: | |
continue | |
if v is not None: | |
os.environ[k] = v | |
return True | |
def get(self, key: str) -> Optional[str]: | |
""" | |
""" | |
data = self.dict() | |
if key in data: | |
return data[key] | |
if self.verbose: | |
logger.warning("Key %s not found in %s.", key, self.dotenv_path) | |
return None | |
def get_key( | |
dotenv_path: StrPath, | |
key_to_get: str, | |
encoding: Optional[str] = "utf-8", | |
) -> Optional[str]: | |
""" | |
Get the value of a given key from the given .env. | |
Returns `None` if the key isn't found or doesn't have a value. | |
""" | |
return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get) | |
def rewrite( | |
path: StrPath, | |
encoding: Optional[str], | |
) -> Iterator[Tuple[IO[str], IO[str]]]: | |
pathlib.Path(path).touch() | |
with tempfile.NamedTemporaryFile(mode="w", encoding=encoding, delete=False) as dest: | |
error = None | |
try: | |
with open(path, encoding=encoding) as source: | |
yield (source, dest) | |
except BaseException as err: | |
error = err | |
if error is None: | |
shutil.move(dest.name, path) | |
else: | |
os.unlink(dest.name) | |
raise error from None | |
def set_key( | |
dotenv_path: StrPath, | |
key_to_set: str, | |
value_to_set: str, | |
quote_mode: str = "always", | |
export: bool = False, | |
encoding: Optional[str] = "utf-8", | |
) -> Tuple[Optional[bool], str, str]: | |
""" | |
Adds or Updates a key/value to the given .env | |
If the .env path given doesn't exist, fails instead of risking creating | |
an orphan .env somewhere in the filesystem | |
""" | |
if quote_mode not in ("always", "auto", "never"): | |
raise ValueError(f"Unknown quote_mode: {quote_mode}") | |
quote = ( | |
quote_mode == "always" | |
or (quote_mode == "auto" and not value_to_set.isalnum()) | |
) | |
if quote: | |
value_out = "'{}'".format(value_to_set.replace("'", "\\'")) | |
else: | |
value_out = value_to_set | |
if export: | |
line_out = f'export {key_to_set}={value_out}\n' | |
else: | |
line_out = f"{key_to_set}={value_out}\n" | |
with rewrite(dotenv_path, encoding=encoding) as (source, dest): | |
replaced = False | |
missing_newline = False | |
for mapping in with_warn_for_invalid_lines(parse_stream(source)): | |
if mapping.key == key_to_set: | |
dest.write(line_out) | |
replaced = True | |
else: | |
dest.write(mapping.original.string) | |
missing_newline = not mapping.original.string.endswith("\n") | |
if not replaced: | |
if missing_newline: | |
dest.write("\n") | |
dest.write(line_out) | |
return True, key_to_set, value_to_set | |
def unset_key( | |
dotenv_path: StrPath, | |
key_to_unset: str, | |
quote_mode: str = "always", | |
encoding: Optional[str] = "utf-8", | |
) -> Tuple[Optional[bool], str]: | |
""" | |
Removes a given key from the given `.env` file. | |
If the .env path given doesn't exist, fails. | |
If the given key doesn't exist in the .env, fails. | |
""" | |
if not os.path.exists(dotenv_path): | |
logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path) | |
return None, key_to_unset | |
removed = False | |
with rewrite(dotenv_path, encoding=encoding) as (source, dest): | |
for mapping in with_warn_for_invalid_lines(parse_stream(source)): | |
if mapping.key == key_to_unset: | |
removed = True | |
else: | |
dest.write(mapping.original.string) | |
if not removed: | |
logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path) | |
return None, key_to_unset | |
return removed, key_to_unset | |
def resolve_variables( | |
values: Iterable[Tuple[str, Optional[str]]], | |
override: bool, | |
) -> Mapping[str, Optional[str]]: | |
new_values: Dict[str, Optional[str]] = {} | |
for (name, value) in values: | |
if value is None: | |
result = None | |
else: | |
atoms = parse_variables(value) | |
env: Dict[str, Optional[str]] = {} | |
if override: | |
env.update(os.environ) # type: ignore | |
env.update(new_values) | |
else: | |
env.update(new_values) | |
env.update(os.environ) # type: ignore | |
result = "".join(atom.resolve(env) for atom in atoms) | |
new_values[name] = result | |
return new_values | |
def _walk_to_root(path: str) -> Iterator[str]: | |
""" | |
Yield directories starting from the given directory up to the root | |
""" | |
if not os.path.exists(path): | |
raise IOError('Starting path not found') | |
if os.path.isfile(path): | |
path = os.path.dirname(path) | |
last_dir = None | |
current_dir = os.path.abspath(path) | |
while last_dir != current_dir: | |
yield current_dir | |
parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir)) | |
last_dir, current_dir = current_dir, parent_dir | |
def find_dotenv( | |
filename: str = '.env', | |
raise_error_if_not_found: bool = False, | |
usecwd: bool = False, | |
) -> str: | |
""" | |
Search in increasingly higher folders for the given file | |
Returns path to the file if found, or an empty string otherwise | |
""" | |
def _is_interactive(): | |
""" Decide whether this is running in a REPL or IPython notebook """ | |
try: | |
main = __import__('__main__', None, None, fromlist=['__file__']) | |
except ModuleNotFoundError: | |
return False | |
return not hasattr(main, '__file__') | |
if usecwd or _is_interactive() or getattr(sys, 'frozen', False): | |
# Should work without __file__, e.g. in REPL or IPython notebook. | |
path = os.getcwd() | |
else: | |
# will work for .py files | |
frame = sys._getframe() | |
current_file = __file__ | |
while frame.f_code.co_filename == current_file or not os.path.exists( | |
frame.f_code.co_filename | |
): | |
assert frame.f_back is not None | |
frame = frame.f_back | |
frame_filename = frame.f_code.co_filename | |
path = os.path.dirname(os.path.abspath(frame_filename)) | |
for dirname in _walk_to_root(path): | |
check_path = os.path.join(dirname, filename) | |
if os.path.isfile(check_path): | |
return check_path | |
if raise_error_if_not_found: | |
raise IOError('File not found') | |
return '' | |
def load_dotenv( | |
dotenv_path: Optional[StrPath] = None, | |
stream: Optional[IO[str]] = None, | |
verbose: bool = False, | |
override: bool = False, | |
interpolate: bool = True, | |
encoding: Optional[str] = "utf-8", | |
) -> bool: | |
"""Parse a .env file and then load all the variables found as environment variables. | |
Parameters: | |
dotenv_path: Absolute or relative path to .env file. | |
stream: Text stream (such as `io.StringIO`) with .env content, used if | |
`dotenv_path` is `None`. | |
verbose: Whether to output a warning the .env file is missing. | |
override: Whether to override the system environment variables with the variables | |
from the `.env` file. | |
encoding: Encoding to be used to read the file. | |
Returns: | |
Bool: True if at least one environment variable is set else False | |
If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the | |
.env file. | |
""" | |
if dotenv_path is None and stream is None: | |
dotenv_path = find_dotenv() | |
dotenv = DotEnv( | |
dotenv_path=dotenv_path, | |
stream=stream, | |
verbose=verbose, | |
interpolate=interpolate, | |
override=override, | |
encoding=encoding, | |
) | |
return dotenv.set_as_environment_variables() | |
def dotenv_values( | |
dotenv_path: Optional[StrPath] = None, | |
stream: Optional[IO[str]] = None, | |
verbose: bool = False, | |
interpolate: bool = True, | |
encoding: Optional[str] = "utf-8", | |
) -> Dict[str, Optional[str]]: | |
""" | |
Parse a .env file and return its content as a dict. | |
The returned dict will have `None` values for keys without values in the .env file. | |
For example, `foo=bar` results in `{"foo": "bar"}` whereas `foo` alone results in | |
`{"foo": None}` | |
Parameters: | |
dotenv_path: Absolute or relative path to the .env file. | |
stream: `StringIO` object with .env content, used if `dotenv_path` is `None`. | |
verbose: Whether to output a warning if the .env file is missing. | |
encoding: Encoding to be used to read the file. | |
If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the | |
.env file. | |
""" | |
if dotenv_path is None and stream is None: | |
dotenv_path = find_dotenv() | |
return DotEnv( | |
dotenv_path=dotenv_path, | |
stream=stream, | |
verbose=verbose, | |
interpolate=interpolate, | |
override=True, | |
encoding=encoding, | |
).dict() | |