Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two remaining Windows untrusted search path cases #1792

Merged
merged 16 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ repos:
hooks:
- id: shellcheck
args: [--color]
exclude: ^git/ext/
exclude: ^test/fixtures/polyglot$|^git/ext/

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
Expand Down
102 changes: 76 additions & 26 deletions git/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
Iterator,
List,
Mapping,
Optional,
Sequence,
TYPE_CHECKING,
TextIO,
Expand Down Expand Up @@ -102,7 +103,7 @@ def handle_process_output(
Callable[[bytes, "Repo", "DiffIndex"], None],
],
stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]],
finalizer: Union[None, Callable[[Union[subprocess.Popen, "Git.AutoInterrupt"]], None]] = None,
finalizer: Union[None, Callable[[Union[Popen, "Git.AutoInterrupt"]], None]] = None,
decode_streams: bool = True,
kill_after_timeout: Union[None, float] = None,
) -> None:
Expand Down Expand Up @@ -207,6 +208,68 @@ def pump_stream(
finalizer(process)


def _safer_popen_windows(
command: Union[str, Sequence[Any]],
*,
shell: bool = False,
env: Optional[Mapping[str, str]] = None,
**kwargs: Any,
) -> Popen:
"""Call :class:`subprocess.Popen` on Windows but don't include a CWD in the search.

This avoids an untrusted search path condition where a file like ``git.exe`` in a
malicious repository would be run when GitPython operates on the repository. The
process using GitPython may have an untrusted repository's working tree as its
current working directory. Some operations may temporarily change to that directory
before running a subprocess. In addition, while by default GitPython does not run
external commands with a shell, it can be made to do so, in which case the CWD of
the subprocess, which GitPython usually sets to a repository working tree, can
itself be searched automatically by the shell. This wrapper covers all those cases.

:note: This currently works by setting the ``NoDefaultCurrentDirectoryInExePath``
environment variable during subprocess creation. It also takes care of passing
Windows-specific process creation flags, but that is unrelated to path search.

:note: The current implementation contains a race condition on :attr:`os.environ`.
GitPython isn't thread-safe, but a program using it on one thread should ideally
be able to mutate :attr:`os.environ` on another, without unpredictable results.
See comments in https://github.com/gitpython-developers/GitPython/pull/1650.
"""
# CREATE_NEW_PROCESS_GROUP is needed for some ways of killing it afterwards. See:
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
# https://docs.python.org/3/library/subprocess.html#subprocess.CREATE_NEW_PROCESS_GROUP
creationflags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP

# When using a shell, the shell is the direct subprocess, so the variable must be
# set in its environment, to affect its search behavior. (The "1" can be any value.)
if shell:
safer_env = {} if env is None else dict(env)
safer_env["NoDefaultCurrentDirectoryInExePath"] = "1"
else:
safer_env = env

# When not using a shell, the current process does the search in a CreateProcessW
# API call, so the variable must be set in our environment. With a shell, this is
# unnecessary, in versions where https://github.com/python/cpython/issues/101283 is
# patched. If not, in the rare case the ComSpec environment variable is unset, the
# shell is searched for unsafely. Setting NoDefaultCurrentDirectoryInExePath in all
# cases, as here, is simpler and protects against that. (The "1" can be any value.)
with patch_env("NoDefaultCurrentDirectoryInExePath", "1"):
return Popen(
command,
shell=shell,
env=safer_env,
creationflags=creationflags,
**kwargs,
)


if os.name == "nt":
safer_popen = _safer_popen_windows
else:
safer_popen = Popen


def dashify(string: str) -> str:
return string.replace("_", "-")

Expand All @@ -225,14 +288,6 @@ def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], exc
## -- End Utilities -- @}


if os.name == "nt":
# CREATE_NEW_PROCESS_GROUP is needed to allow killing it afterwards. See:
# https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
PROC_CREATIONFLAGS = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP
else:
PROC_CREATIONFLAGS = 0


class Git(LazyMixin):
"""The Git class manages communication with the Git binary.

Expand Down Expand Up @@ -992,11 +1047,8 @@ def execute(
redacted_command,
'"kill_after_timeout" feature is not supported on Windows.',
)
# Only search PATH, not CWD. This must be in the *caller* environment. The "1" can be any value.
maybe_patch_caller_env = patch_env("NoDefaultCurrentDirectoryInExePath", "1")
else:
cmd_not_found_exception = FileNotFoundError
maybe_patch_caller_env = contextlib.nullcontext()
# END handle

stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb")
Expand All @@ -1011,20 +1063,18 @@ def execute(
universal_newlines,
)
try:
with maybe_patch_caller_env:
proc = Popen(
command,
env=env,
cwd=cwd,
bufsize=-1,
stdin=(istream or DEVNULL),
stderr=PIPE,
stdout=stdout_sink,
shell=shell,
universal_newlines=universal_newlines,
creationflags=PROC_CREATIONFLAGS,
**subprocess_kwargs,
)
proc = safer_popen(
command,
env=env,
cwd=cwd,
bufsize=-1,
stdin=(istream or DEVNULL),
stderr=PIPE,
stdout=stdout_sink,
shell=shell,
universal_newlines=universal_newlines,
**subprocess_kwargs,
)
except cmd_not_found_exception as err:
raise GitCommandNotFound(redacted_command, err) from err
else:
Expand Down
5 changes: 2 additions & 3 deletions git/index/fun.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)
import subprocess

from git.cmd import PROC_CREATIONFLAGS, handle_process_output
from git.cmd import handle_process_output, safer_popen
from git.compat import defenc, force_bytes, force_text, safe_decode
from git.exc import HookExecutionError, UnmergedEntriesError
from git.objects.fun import (
Expand Down Expand Up @@ -98,13 +98,12 @@ def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
cmd = ["bash.exe", relative_hp]

process = subprocess.Popen(
process = safer_popen(
cmd + list(args),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=index.repo.working_dir,
creationflags=PROC_CREATIONFLAGS,
)
except Exception as ex:
raise HookExecutionError(hp, ex) from ex
Expand Down
11 changes: 11 additions & 0 deletions git/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,17 @@ def _get_exe_extensions() -> Sequence[str]:


def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
"""Perform a path search to assist :func:`is_cygwin_git`.
This is not robust for general use. It is an implementation detail of
:func:`is_cygwin_git`. When a search following all shell rules is needed,
:func:`shutil.which` can be used instead.
:note: Neither this function nor :func:`shutil.which` will predict the effect of an
executable search on a native Windows system due to a :class:`subprocess.Popen`
call without ``shell=True``, because shell and non-shell executable search on
Windows differ considerably.
"""
# From: http://stackoverflow.com/a/377028/548792
winprog_exts = _get_exe_extensions()

Expand Down
8 changes: 8 additions & 0 deletions test/fixtures/polyglot
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env sh
# Valid script in both Bash and Python, but with different behavior.
""":"
echo 'Ran intended hook.' >output.txt
exit
" """
from pathlib import Path
Path('payload.txt').write_text('Ran impostor hook!', encoding='utf-8')
49 changes: 47 additions & 2 deletions test/lib/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import textwrap
import time
import unittest
import venv

import gitdb

Expand All @@ -36,6 +37,7 @@
"with_rw_repo",
"with_rw_and_rw_remote_repo",
"TestBase",
"VirtualEnvironment",
"TestCase",
"SkipTest",
"skipIf",
Expand Down Expand Up @@ -88,11 +90,11 @@ def with_rw_directory(func):
test succeeds, but leave it otherwise to aid additional debugging."""

@wraps(func)
def wrapper(self):
def wrapper(self, *args, **kwargs):
path = tempfile.mkdtemp(prefix=func.__name__)
keep = False
try:
return func(self, path)
return func(self, path, *args, **kwargs)
except Exception:
log.info(
"Test %s.%s failed, output is at %r\n",
Expand Down Expand Up @@ -390,3 +392,46 @@ def _make_file(self, rela_path, data, repo=None):
with open(abs_path, "w") as fp:
fp.write(data)
return abs_path


class VirtualEnvironment:
"""A newly created Python virtual environment for use in a test."""

__slots__ = ("_env_dir",)

def __init__(self, env_dir, *, with_pip):
if os.name == "nt":
self._env_dir = osp.realpath(env_dir)
venv.create(self.env_dir, symlinks=False, with_pip=with_pip)
else:
self._env_dir = env_dir
venv.create(self.env_dir, symlinks=True, with_pip=with_pip)

@property
def env_dir(self):
"""The top-level directory of the environment."""
return self._env_dir

@property
def python(self):
"""Path to the Python executable in the environment."""
return self._executable("python")

@property
def pip(self):
"""Path to the pip executable in the environment, or RuntimeError if absent."""
return self._executable("pip")

@property
def sources(self):
"""Path to a src directory in the environment, which may not exist yet."""
return os.path.join(self.env_dir, "src")

def _executable(self, basename):
if os.name == "nt":
path = osp.join(self.env_dir, "Scripts", basename + ".exe")
else:
path = osp.join(self.env_dir, "bin", basename)
if osp.isfile(path) or osp.islink(path):
return path
raise RuntimeError(f"no regular file or symlink {path!r}")