Skip to content

Commit

Permalink
#11626 fix twist conch --auth=sshkey (#11810)
Browse files Browse the repository at this point in the history
`twist conch --auth=sshkey` can now authenticate users without a traceback again, thanks to twisted.conch.unix.UnixConchUser no longer being incorrectly instantiated with `bytes`.  In the course of this fix, some type hinting has also been applied to `twisted.cred.portal`.
  • Loading branch information
glyph committed Feb 15, 2023
2 parents a60a32b + da98def commit 7f421f9
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 96 deletions.
27 changes: 16 additions & 11 deletions src/twisted/conch/test/test_tap.py
Expand Up @@ -5,9 +5,13 @@
Tests for L{twisted.conch.tap}.
"""

from typing import Any

from twisted.application.internet import StreamServerEndpointService
from twisted.cred import error
from twisted.cred.checkers import FilePasswordDB, ICredentialsChecker
from twisted.cred.credentials import ISSHPrivateKey, IUsernamePassword, UsernamePassword
from twisted.internet.defer import Deferred
from twisted.python.reflect import requireModule
from twisted.trial.unittest import TestCase

Expand Down Expand Up @@ -37,7 +41,7 @@ class MakeServiceTests(TestCase):

usernamePassword = (b"iamuser", b"thisispassword")

def setUp(self):
def setUp(self) -> None:
"""
Create a file with two users.
"""
Expand All @@ -46,7 +50,7 @@ def setUp(self):
f.write(b":".join(self.usernamePassword))
self.options = tap.Options()

def test_basic(self):
def test_basic(self) -> None:
"""
L{tap.makeService} returns a L{StreamServerEndpointService} instance
running on TCP port 22, and the linked protocol factory is an instance
Expand All @@ -58,7 +62,7 @@ def test_basic(self):
self.assertEqual(service.endpoint._port, 22)
self.assertIsInstance(service.factory, OpenSSHFactory)

def test_defaultAuths(self):
def test_defaultAuths(self) -> None:
"""
Make sure that if the C{--auth} command-line option is not passed,
the default checkers are (for backwards compatibility): SSH and UNIX
Expand All @@ -81,15 +85,15 @@ def test_defaultAuths(self):
"There should be %d checkers by default" % (numCheckers,),
)

def test_authAdded(self):
def test_authAdded(self) -> None:
"""
The C{--auth} command-line option will add a checker to the list of
checkers, and it should be the only auth checker
"""
self.options.parseOptions(["--auth", "file:" + self.filename])
self.assertEqual(len(self.options["credCheckers"]), 1)

def test_multipleAuthAdded(self):
def test_multipleAuthAdded(self) -> None:
"""
Multiple C{--auth} command-line options will add all checkers specified
to the list ofcheckers, and there should only be the specified auth
Expand All @@ -105,32 +109,33 @@ def test_multipleAuthAdded(self):
)
self.assertEqual(len(self.options["credCheckers"]), 2)

def test_authFailure(self):
def test_authFailure(self) -> Any:
"""
The checker created by the C{--auth} command-line option returns a
L{Deferred} that fails with L{UnauthorizedLogin} when
presented with credentials that are unknown to that checker.
"""
self.options.parseOptions(["--auth", "file:" + self.filename])
checker = self.options["credCheckers"][-1]
invalid = UsernamePassword(self.usernamePassword[0], "fake")
checker: FilePasswordDB = self.options["credCheckers"][-1]
self.assertIsInstance(checker, FilePasswordDB)
invalid = UsernamePassword(self.usernamePassword[0], b"fake")
# Wrong password should raise error
return self.assertFailure(
checker.requestAvatarId(invalid), error.UnauthorizedLogin
)

def test_authSuccess(self):
def test_authSuccess(self) -> Deferred[None]:
"""
The checker created by the C{--auth} command-line option returns a
L{Deferred} that returns the avatar id when presented with credentials
that are known to that checker.
"""
self.options.parseOptions(["--auth", "file:" + self.filename])
checker = self.options["credCheckers"][-1]
checker: ICredentialsChecker = self.options["credCheckers"][-1]
correct = UsernamePassword(*self.usernamePassword)
d = checker.requestAvatarId(correct)

def checkSuccess(username):
def checkSuccess(username: bytes) -> None:
self.assertEqual(username, correct.username)

return d.addCallback(checkSuccess)
Expand Down
82 changes: 55 additions & 27 deletions src/twisted/conch/test/test_unix.py
Expand Up @@ -5,13 +5,22 @@

from zope.interface import implementer

from twisted.conch.interfaces import IConchUser
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.cred.credentials import IUsernamePassword, UsernamePassword
from twisted.cred.portal import Portal
from twisted.internet.interfaces import IReactorProcess
from twisted.python.fakepwd import UserDatabase
from twisted.python.reflect import requireModule
from twisted.trial import unittest
from .test_session import StubClient, StubConnection

cryptography = requireModule("cryptography")
unix = requireModule("twisted.conch.unix")

if unix is not None:
from twisted.conch.unix import UnixConchUser, UnixSSHRealm


@implementer(IReactorProcess)
class MockProcessSpawner:
Expand Down Expand Up @@ -52,46 +61,65 @@ def spawnProcess(
)


class StubUnixConchUser:
"""
Enough of UnixConchUser to exercise SSHSessionForUnixConchUser in the
tests below.
"""

def __init__(self, homeDirectory):
from .test_session import StubClient, StubConnection

self._homeDirectory = homeDirectory
self.conn = StubConnection(transport=StubClient())

def getUserGroupId(self):
return (None, None)

def getHomeDir(self):
return self._homeDirectory

def getShell(self):
pass
shouldSkip = (
"Cannot run without cryptography"
if cryptography is None
else "Unix system required"
if unix is None
else None
)


class TestSSHSessionForUnixConchUser(unittest.TestCase):

if cryptography is None:
skip = "Cannot run without cryptography"
elif unix is None:
skip = "Unix system required"
skip = shouldSkip

def testExecCommandEnvironment(self):
def testExecCommandEnvironment(self) -> None:
"""
C{execCommand} sets the C{HOME} environment variable to the avatar's home
directory.
"""
mockReactor = MockProcessSpawner()
userdb = UserDatabase()
homeDirectory = "/made/up/path/"
avatar = StubUnixConchUser(homeDirectory)
userName = "user"
userdb.addUser(userName, home=homeDirectory)
self.patch(unix, "pwd", userdb)
mockReactor = MockProcessSpawner()
avatar = UnixConchUser(userName)
avatar.conn = StubConnection(transport=StubClient())
session = unix.SSHSessionForUnixConchUser(avatar, reactor=mockReactor)
protocol = None
command = ["not-actually-executed"]
session.execCommand(protocol, command)
[call] = mockReactor._spawnProcessCalls
self.assertEqual(homeDirectory, call["env"]["HOME"])


class TestUnixSSHRealm(unittest.TestCase):
"""
Tests for L{UnixSSHRealm}.
"""

skip = shouldSkip

def test_unixSSHRealm(self) -> None:
"""
L{UnixSSHRealm} is an L{IRealm} whose C{.requestAvatar} method returns
a L{UnixConchUser}.
"""
userdb = UserDatabase()
home = "/testing/home/value"
userdb.addUser("user", home=home)
self.patch(unix, "pwd", userdb)
pwdb = InMemoryUsernamePasswordDatabaseDontUse(user=b"password")
p = Portal(UnixSSHRealm(), [pwdb])

# there seems to be a bug in mypy-zope where sometimes things don't
# implement their superinterfaces; 0.3.11, when we upgrade to 0.9.0
# this type declaration will be extraneous
creds: IUsernamePassword = UsernamePassword(b"user", b"password")
result = p.login(creds, None, IConchUser)
resultInterface, avatar, logout = self.successResultOf(result)
self.assertIsInstance(avatar, UnixConchUser)
assert isinstance(avatar, UnixConchUser) # legibility for mypy
self.assertEqual(avatar.getHomeDir(), home)
21 changes: 16 additions & 5 deletions src/twisted/conch/unix.py
Expand Up @@ -5,6 +5,8 @@
A UNIX SSH server.
"""

from __future__ import annotations

import fcntl
import grp
import os
Expand All @@ -14,6 +16,7 @@
import struct
import time
import tty
from typing import Callable, Dict, Tuple

from zope.interface import implementer

Expand All @@ -33,6 +36,7 @@
)
from twisted.cred import portal
from twisted.internet.error import ProcessExitedAlready
from twisted.internet.interfaces import IListeningPort
from twisted.logger import Logger
from twisted.python import components
from twisted.python.compat import nativeString
Expand All @@ -45,13 +49,18 @@

@implementer(portal.IRealm)
class UnixSSHRealm:
def requestAvatar(self, username, mind, *interfaces):
user = UnixConchUser(username)
def requestAvatar(
self,
username: bytes,
mind: object,
*interfaces: portal._InterfaceItself,
) -> Tuple[portal._InterfaceItself, UnixConchUser, Callable[[], None]]:
user = UnixConchUser(username.decode())
return interfaces[0], user, user.logout


class UnixConchUser(ConchUser):
def __init__(self, username):
def __init__(self, username: str) -> None:
ConchUser.__init__(self)
self.username = username
self.pwdData = pwd.getpwnam(self.username)
Expand All @@ -60,7 +69,9 @@ def __init__(self, username):
if username in userlist:
l.append(gid)
self.otherGroups = l
self.listeners = {} # Dict mapping (interface, port) -> listener
self.listeners: Dict[
str, IListeningPort
] = {} # Dict mapping (interface, port) -> listener
self.channelLookup.update(
{
b"session": session.SSHSession,
Expand Down Expand Up @@ -116,7 +127,7 @@ def global_cancel_tcpip_forward(self, data):
self._runAsUser(listener.stopListening)
return 1

def logout(self):
def logout(self) -> None:
# Remove all listeners.
for listener in self.listeners.values():
self._runAsUser(listener.stopListening)
Expand Down

0 comments on commit 7f421f9

Please sign in to comment.