okay fine

This commit is contained in:
pacnpal
2024-11-03 17:47:26 +00:00
parent 387c4740e7
commit 27f3326e22
10020 changed files with 1935769 additions and 2364 deletions

View File

@@ -0,0 +1,7 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Twisted Cred: Support for verifying credentials, and providing services to user
based on those credentials.
"""

View File

@@ -0,0 +1,132 @@
# -*- test-case-name: twisted.cred.test.test_digestauth -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Calculations for HTTP Digest authentication.
@see: U{http://www.faqs.org/rfcs/rfc2617.html}
"""
from binascii import hexlify
from hashlib import md5, sha1
# The digest math
algorithms = {
b"md5": md5,
# md5-sess is more complicated than just another algorithm. It requires
# H(A1) state to be remembered from the first WWW-Authenticate challenge
# issued and re-used to process any Authorization header in response to
# that WWW-Authenticate challenge. It is *not* correct to simply
# recalculate H(A1) each time an Authorization header is received. Read
# RFC 2617, section 3.2.2.2 and do not try to make DigestCredentialFactory
# support this unless you completely understand it. -exarkun
b"md5-sess": md5,
b"sha": sha1,
}
# DigestCalcHA1
def calcHA1(
pszAlg, pszUserName, pszRealm, pszPassword, pszNonce, pszCNonce, preHA1=None
):
"""
Compute H(A1) from RFC 2617.
@param pszAlg: The name of the algorithm to use to calculate the digest.
Currently supported are md5, md5-sess, and sha.
@param pszUserName: The username
@param pszRealm: The realm
@param pszPassword: The password
@param pszNonce: The nonce
@param pszCNonce: The cnonce
@param preHA1: If available this is a str containing a previously
calculated H(A1) as a hex string. If this is given then the values for
pszUserName, pszRealm, and pszPassword must be L{None} and are ignored.
"""
if preHA1 and (pszUserName or pszRealm or pszPassword):
raise TypeError(
"preHA1 is incompatible with the pszUserName, "
"pszRealm, and pszPassword arguments"
)
if preHA1 is None:
# We need to calculate the HA1 from the username:realm:password
m = algorithms[pszAlg]()
m.update(pszUserName)
m.update(b":")
m.update(pszRealm)
m.update(b":")
m.update(pszPassword)
HA1 = hexlify(m.digest())
else:
# We were given a username:realm:password
HA1 = preHA1
if pszAlg == b"md5-sess":
m = algorithms[pszAlg]()
m.update(HA1)
m.update(b":")
m.update(pszNonce)
m.update(b":")
m.update(pszCNonce)
HA1 = hexlify(m.digest())
return HA1
def calcHA2(algo, pszMethod, pszDigestUri, pszQop, pszHEntity):
"""
Compute H(A2) from RFC 2617.
@param algo: The name of the algorithm to use to calculate the digest.
Currently supported are md5, md5-sess, and sha.
@param pszMethod: The request method.
@param pszDigestUri: The request URI.
@param pszQop: The Quality-of-Protection value.
@param pszHEntity: The hash of the entity body or L{None} if C{pszQop} is
not C{'auth-int'}.
@return: The hash of the A2 value for the calculation of the response
digest.
"""
m = algorithms[algo]()
m.update(pszMethod)
m.update(b":")
m.update(pszDigestUri)
if pszQop == b"auth-int":
m.update(b":")
m.update(pszHEntity)
return hexlify(m.digest())
def calcResponse(HA1, HA2, algo, pszNonce, pszNonceCount, pszCNonce, pszQop):
"""
Compute the digest for the given parameters.
@param HA1: The H(A1) value, as computed by L{calcHA1}.
@param HA2: The H(A2) value, as computed by L{calcHA2}.
@param pszNonce: The challenge nonce.
@param pszNonceCount: The (client) nonce count value for this response.
@param pszCNonce: The client nonce.
@param pszQop: The Quality-of-Protection value.
"""
m = algorithms[algo]()
m.update(HA1)
m.update(b":")
m.update(pszNonce)
m.update(b":")
if pszNonceCount and pszCNonce:
m.update(pszNonceCount)
m.update(b":")
m.update(pszCNonce)
m.update(b":")
m.update(pszQop)
m.update(b":")
m.update(HA2)
respHash = hexlify(m.digest())
return respHash

View File

@@ -0,0 +1,334 @@
# -*- test-case-name: twisted.cred.test.test_cred -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Basic credential checkers
@var ANONYMOUS: An empty tuple used to represent the anonymous avatar ID.
"""
import os
from typing import Any, Dict, Optional, Tuple, Union
from zope.interface import Attribute, Interface, implementer
from twisted.cred import error
from twisted.cred.credentials import (
IAnonymous,
IUsernameHashedPassword,
IUsernamePassword,
)
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.logger import Logger
from twisted.python import failure
# A note on anonymity - We do not want None as the value for anonymous
# because it is too easy to accidentally return it. We do not want the
# empty string, because it is too easy to mistype a password file. For
# example, an .htpasswd file may contain the lines: ['hello:asdf',
# 'world:asdf', 'goodbye', ':world']. This misconfiguration will have an
# ill effect in any case, but accidentally granting anonymous access is a
# worse failure mode than simply granting access to an untypeable
# username. We do not want an instance of 'object', because that would
# create potential problems with persistence.
ANONYMOUS: Tuple[()] = ()
class ICredentialsChecker(Interface):
"""
An object that can check sub-interfaces of L{ICredentials}.
"""
credentialInterfaces = Attribute(
"A list of sub-interfaces of L{ICredentials} which specifies which I "
"may check."
)
def requestAvatarId(credentials: Any) -> Deferred[Union[bytes, Tuple[()]]]:
"""
Validate credentials and produce an avatar ID.
@param credentials: something which implements one of the interfaces in
C{credentialInterfaces}.
@return: a L{Deferred} which will fire with a L{bytes} that identifies
an avatar, an empty tuple to specify an authenticated anonymous
user (provided as L{twisted.cred.checkers.ANONYMOUS}) or fail with
L{UnauthorizedLogin}. Alternatively, return the result itself.
@see: L{twisted.cred.credentials}
"""
@implementer(ICredentialsChecker)
class AllowAnonymousAccess:
"""
A credentials checker that unconditionally grants anonymous access.
@cvar credentialInterfaces: Tuple containing L{IAnonymous}.
"""
credentialInterfaces = (IAnonymous,)
def requestAvatarId(self, credentials):
"""
Succeed with the L{ANONYMOUS} avatar ID.
@return: L{Deferred} that fires with L{twisted.cred.checkers.ANONYMOUS}
"""
return defer.succeed(ANONYMOUS)
@implementer(ICredentialsChecker)
class InMemoryUsernamePasswordDatabaseDontUse:
"""
An extremely simple credentials checker.
This is only of use in one-off test programs or examples which don't
want to focus too much on how credentials are verified.
You really don't want to use this for anything else. It is, at best, a
toy. If you need a simple credentials checker for a real application,
see L{FilePasswordDB}.
@cvar credentialInterfaces: Tuple of L{IUsernamePassword} and
L{IUsernameHashedPassword}.
@ivar users: Mapping of usernames to passwords.
@type users: L{dict} mapping L{bytes} to L{bytes}
"""
credentialInterfaces = (
IUsernamePassword,
IUsernameHashedPassword,
)
def __init__(self, **users: bytes) -> None:
"""
Initialize the in-memory database.
For example::
db = InMemoryUsernamePasswordDatabaseDontUse(
user1=b'sesame',
user2=b'hunter2',
)
@param users: Usernames and passwords to seed the database with.
Each username given as a keyword is encoded to L{bytes} as ASCII.
Passwords must be given as L{bytes}.
@type users: L{dict} of L{str} to L{bytes}
"""
self.users = {x.encode("ascii"): y for x, y in users.items()}
def addUser(self, username: bytes, password: bytes) -> None:
"""
Set a user's password.
@param username: Name of the user.
@type username: L{bytes}
@param password: Password to associate with the username.
@type password: L{bytes}
"""
self.users[username] = password
def _cbPasswordMatch(self, matched, username):
if matched:
return username
else:
return failure.Failure(error.UnauthorizedLogin())
def requestAvatarId(self, credentials):
if credentials.username in self.users:
return defer.maybeDeferred(
credentials.checkPassword, self.users[credentials.username]
).addCallback(self._cbPasswordMatch, credentials.username)
else:
return defer.fail(error.UnauthorizedLogin())
@implementer(ICredentialsChecker)
class FilePasswordDB:
"""
A file-based, text-based username/password database.
Records in the datafile for this class are delimited by a particular
string. The username appears in a fixed field of the columns delimited
by this string, as does the password. Both fields are specifiable. If
the passwords are not stored plaintext, a hash function must be supplied
to convert plaintext passwords to the form stored on disk and this
CredentialsChecker will only be able to check L{IUsernamePassword}
credentials. If the passwords are stored plaintext,
L{IUsernameHashedPassword} credentials will be checkable as well.
"""
cache = False
_credCache: Optional[Dict[bytes, bytes]] = None
_cacheTimestamp: float = 0
_log = Logger()
def __init__(
self,
filename,
delim=b":",
usernameField=0,
passwordField=1,
caseSensitive=True,
hash=None,
cache=False,
):
"""
@type filename: L{str}
@param filename: The name of the file from which to read username and
password information.
@type delim: L{bytes}
@param delim: The field delimiter used in the file.
@type usernameField: L{int}
@param usernameField: The index of the username after splitting a
line on the delimiter.
@type passwordField: L{int}
@param passwordField: The index of the password after splitting a
line on the delimiter.
@type caseSensitive: L{bool}
@param caseSensitive: If true, consider the case of the username when
performing a lookup. Ignore it otherwise.
@type hash: Three-argument callable or L{None}
@param hash: A function used to transform the plaintext password
received over the network to a format suitable for comparison
against the version stored on disk. The arguments to the callable
are the username, the network-supplied password, and the in-file
version of the password. If the return value compares equal to the
version stored on disk, the credentials are accepted.
@type cache: L{bool}
@param cache: If true, maintain an in-memory cache of the
contents of the password file. On lookups, the mtime of the
file will be checked, and the file will only be re-parsed if
the mtime is newer than when the cache was generated.
"""
self.filename = filename
self.delim = delim
self.ufield = usernameField
self.pfield = passwordField
self.caseSensitive = caseSensitive
self.hash = hash
self.cache = cache
if self.hash is None:
# The passwords are stored plaintext. We can support both
# plaintext and hashed passwords received over the network.
self.credentialInterfaces = (
IUsernamePassword,
IUsernameHashedPassword,
)
else:
# The passwords are hashed on disk. We can support only
# plaintext passwords received over the network.
self.credentialInterfaces = (IUsernamePassword,)
def __getstate__(self):
d = dict(vars(self))
for k in "_credCache", "_cacheTimestamp":
try:
del d[k]
except KeyError:
pass
return d
def _cbPasswordMatch(self, matched, username):
if matched:
return username
else:
return failure.Failure(error.UnauthorizedLogin())
def _loadCredentials(self):
"""
Loads the credentials from the configured file.
@return: An iterable of C{username, password} couples.
@rtype: C{iterable}
@raise UnauthorizedLogin: when failing to read the credentials from the
file.
"""
try:
with open(self.filename, "rb") as f:
for line in f:
line = line.rstrip()
parts = line.split(self.delim)
if self.ufield >= len(parts) or self.pfield >= len(parts):
continue
if self.caseSensitive:
yield parts[self.ufield], parts[self.pfield]
else:
yield parts[self.ufield].lower(), parts[self.pfield]
except OSError as e:
self._log.error("Unable to load credentials db: {e!r}", e=e)
raise error.UnauthorizedLogin()
def getUser(self, username: bytes) -> Tuple[bytes, bytes]:
"""
Look up the credentials for a username.
@param username: The username to look up.
@type username: L{bytes}
@returns: Two-tuple of the canonicalicalized username (i.e. lowercase
if the database is not case sensitive) and the associated password
value, both L{bytes}.
@rtype: L{tuple}
@raises KeyError: When lookup of the username fails.
"""
if not self.caseSensitive:
username = username.lower()
if self.cache:
if (
self._credCache is None
or os.path.getmtime(self.filename) > self._cacheTimestamp
):
self._cacheTimestamp = os.path.getmtime(self.filename)
self._credCache = dict(self._loadCredentials())
return username, self._credCache[username]
else:
for u, p in self._loadCredentials():
if u == username:
return u, p
raise KeyError(username)
def requestAvatarId(
self, credentials: IUsernamePassword
) -> Deferred[Union[bytes, Tuple[()]]]:
try:
u, p = self.getUser(credentials.username)
except KeyError:
return defer.fail(error.UnauthorizedLogin())
else:
up = IUsernamePassword(credentials, None)
if self.hash:
if up is not None:
h = self.hash(up.username, up.password, p)
if h == p:
return defer.succeed(u)
return defer.fail(error.UnauthorizedLogin())
else:
return defer.maybeDeferred(credentials.checkPassword, p).addCallback(
self._cbPasswordMatch, u
)
# For backwards compatibility
# Allow access as the old name.
OnDiskUsernamePasswordDatabase = FilePasswordDB

View File

@@ -0,0 +1,508 @@
# -*- test-case-name: twisted.cred.test.test_cred-*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
This module defines L{ICredentials}, an interface for objects that represent
authentication credentials to provide, and also includes a number of useful
implementations of that interface.
"""
import base64
import hmac
import random
import re
import time
from binascii import hexlify
from hashlib import md5
from zope.interface import Interface, implementer
from twisted.cred import error
from twisted.cred._digest import calcHA1, calcHA2, calcResponse
from twisted.python.compat import nativeString, networkString
from twisted.python.deprecate import deprecatedModuleAttribute
from twisted.python.randbytes import secureRandom
from twisted.python.versions import Version
class ICredentials(Interface):
"""
I check credentials.
Implementors I{must} specify the sub-interfaces of ICredentials
to which it conforms, using L{zope.interface.implementer}.
"""
class IUsernameDigestHash(ICredentials):
"""
This credential is used when a CredentialChecker has access to the hash
of the username:realm:password as in an Apache .htdigest file.
"""
def checkHash(digestHash):
"""
@param digestHash: The hashed username:realm:password to check against.
@return: C{True} if the credentials represented by this object match
the given hash, C{False} if they do not, or a L{Deferred} which
will be called back with one of these values.
"""
class IUsernameHashedPassword(ICredentials):
"""
I encapsulate a username and a hashed password.
This credential is used when a hashed password is received from the
party requesting authentication. CredentialCheckers which check this
kind of credential must store the passwords in plaintext (or as
password-equivalent hashes) form so that they can be hashed in a manner
appropriate for the particular credentials class.
@type username: L{bytes}
@ivar username: The username associated with these credentials.
"""
def checkPassword(password):
"""
Validate these credentials against the correct password.
@type password: L{bytes}
@param password: The correct, plaintext password against which to
check.
@rtype: C{bool} or L{Deferred}
@return: C{True} if the credentials represented by this object match the
given password, C{False} if they do not, or a L{Deferred} which will
be called back with one of these values.
"""
class IUsernamePassword(ICredentials):
"""
I encapsulate a username and a plaintext password.
This encapsulates the case where the password received over the network
has been hashed with the identity function (That is, not at all). The
CredentialsChecker may store the password in whatever format it desires,
it need only transform the stored password in a similar way before
performing the comparison.
@type username: L{bytes}
@ivar username: The username associated with these credentials.
@type password: L{bytes}
@ivar password: The password associated with these credentials.
"""
username: bytes
password: bytes
def checkPassword(password: bytes) -> bool:
"""
Validate these credentials against the correct password.
@type password: L{bytes}
@param password: The correct, plaintext password against which to
check.
@rtype: C{bool} or L{Deferred}
@return: C{True} if the credentials represented by this object match the
given password, C{False} if they do not, or a L{Deferred} which will
be called back with one of these values.
"""
class IAnonymous(ICredentials):
"""
I am an explicitly anonymous request for access.
@see: L{twisted.cred.checkers.AllowAnonymousAccess}
"""
@implementer(IUsernameHashedPassword, IUsernameDigestHash)
class DigestedCredentials:
"""
Yet Another Simple HTTP Digest authentication scheme.
"""
def __init__(self, username, method, realm, fields):
self.username = username
self.method = method
self.realm = realm
self.fields = fields
def checkPassword(self, password):
"""
Verify that the credentials represented by this object agree with the
given plaintext C{password} by hashing C{password} in the same way the
response hash represented by this object was generated and comparing
the results.
"""
response = self.fields.get("response")
uri = self.fields.get("uri")
nonce = self.fields.get("nonce")
cnonce = self.fields.get("cnonce")
nc = self.fields.get("nc")
algo = self.fields.get("algorithm", b"md5").lower()
qop = self.fields.get("qop", b"auth")
expected = calcResponse(
calcHA1(algo, self.username, self.realm, password, nonce, cnonce),
calcHA2(algo, self.method, uri, qop, None),
algo,
nonce,
nc,
cnonce,
qop,
)
return expected == response
def checkHash(self, digestHash):
"""
Verify that the credentials represented by this object agree with the
credentials represented by the I{H(A1)} given in C{digestHash}.
@param digestHash: A precomputed H(A1) value based on the username,
realm, and password associate with this credentials object.
"""
response = self.fields.get("response")
uri = self.fields.get("uri")
nonce = self.fields.get("nonce")
cnonce = self.fields.get("cnonce")
nc = self.fields.get("nc")
algo = self.fields.get("algorithm", b"md5").lower()
qop = self.fields.get("qop", b"auth")
expected = calcResponse(
calcHA1(algo, None, None, None, nonce, cnonce, preHA1=digestHash),
calcHA2(algo, self.method, uri, qop, None),
algo,
nonce,
nc,
cnonce,
qop,
)
return expected == response
class DigestCredentialFactory:
"""
Support for RFC2617 HTTP Digest Authentication
@cvar CHALLENGE_LIFETIME_SECS: The number of seconds for which an
opaque should be valid.
@type privateKey: L{bytes}
@ivar privateKey: A random string used for generating the secure opaque.
@type algorithm: L{bytes}
@param algorithm: Case insensitive string specifying the hash algorithm to
use. Must be either C{'md5'} or C{'sha'}. C{'md5-sess'} is B{not}
supported.
@type authenticationRealm: L{bytes}
@param authenticationRealm: case sensitive string that specifies the realm
portion of the challenge
"""
_parseparts = re.compile(
b"([^= ]+)" # The key
b"=" # Conventional key/value separator (literal)
b"(?:" # Group together a couple options
b'"([^"]*)"' # A quoted string of length 0 or more
b"|" # The other option in the group is coming
b"([^,]+)" # An unquoted string of length 1 or more, up to a comma
b")" # That non-matching group ends
b",?"
) # There might be a comma at the end (none on last pair)
CHALLENGE_LIFETIME_SECS = 15 * 60 # 15 minutes
scheme = b"digest"
def __init__(self, algorithm, authenticationRealm):
self.algorithm = algorithm
self.authenticationRealm = authenticationRealm
self.privateKey = secureRandom(12)
def getChallenge(self, address):
"""
Generate the challenge for use in the WWW-Authenticate header.
@param address: The client address to which this challenge is being
sent.
@return: The L{dict} that can be used to generate a WWW-Authenticate
header.
"""
c = self._generateNonce()
o = self._generateOpaque(c, address)
return {
"nonce": c,
"opaque": o,
"qop": b"auth",
"algorithm": self.algorithm,
"realm": self.authenticationRealm,
}
def _generateNonce(self):
"""
Create a random value suitable for use as the nonce parameter of a
WWW-Authenticate challenge.
@rtype: L{bytes}
"""
return hexlify(secureRandom(12))
def _getTime(self):
"""
Parameterize the time based seed used in C{_generateOpaque}
so we can deterministically unittest it's behavior.
"""
return time.time()
def _generateOpaque(self, nonce, clientip):
"""
Generate an opaque to be returned to the client. This is a unique
string that can be returned to us and verified.
"""
# Now, what we do is encode the nonce, client ip and a timestamp in the
# opaque value with a suitable digest.
now = b"%d" % (int(self._getTime()),)
if not clientip:
clientip = b""
elif isinstance(clientip, str):
clientip = clientip.encode("ascii")
key = b",".join((nonce, clientip, now))
digest = hexlify(md5(key + self.privateKey).digest())
ekey = base64.b64encode(key)
return b"-".join((digest, ekey.replace(b"\n", b"")))
def _verifyOpaque(self, opaque, nonce, clientip):
"""
Given the opaque and nonce from the request, as well as the client IP
that made the request, verify that the opaque was generated by us.
And that it's not too old.
@param opaque: The opaque value from the Digest response
@param nonce: The nonce value from the Digest response
@param clientip: The remote IP address of the client making the request
or L{None} if the request was submitted over a channel where this
does not make sense.
@return: C{True} if the opaque was successfully verified.
@raise error.LoginFailed: if C{opaque} could not be parsed or
contained the wrong values.
"""
# First split the digest from the key
opaqueParts = opaque.split(b"-")
if len(opaqueParts) != 2:
raise error.LoginFailed("Invalid response, invalid opaque value")
if not clientip:
clientip = b""
elif isinstance(clientip, str):
clientip = clientip.encode("ascii")
# Verify the key
key = base64.b64decode(opaqueParts[1])
keyParts = key.split(b",")
if len(keyParts) != 3:
raise error.LoginFailed("Invalid response, invalid opaque value")
if keyParts[0] != nonce:
raise error.LoginFailed(
"Invalid response, incompatible opaque/nonce values"
)
if keyParts[1] != clientip:
raise error.LoginFailed(
"Invalid response, incompatible opaque/client values"
)
try:
when = int(keyParts[2])
except ValueError:
raise error.LoginFailed("Invalid response, invalid opaque/time values")
if (
int(self._getTime()) - when
> DigestCredentialFactory.CHALLENGE_LIFETIME_SECS
):
raise error.LoginFailed(
"Invalid response, incompatible opaque/nonce too old"
)
# Verify the digest
digest = hexlify(md5(key + self.privateKey).digest())
if digest != opaqueParts[0]:
raise error.LoginFailed("Invalid response, invalid opaque value")
return True
def decode(self, response, method, host):
"""
Decode the given response and attempt to generate a
L{DigestedCredentials} from it.
@type response: L{bytes}
@param response: A string of comma separated key=value pairs
@type method: L{bytes}
@param method: The action requested to which this response is addressed
(GET, POST, INVITE, OPTIONS, etc).
@type host: L{bytes}
@param host: The address the request was sent from.
@raise error.LoginFailed: If the response does not contain a username,
a nonce, an opaque, or if the opaque is invalid.
@return: L{DigestedCredentials}
"""
response = b" ".join(response.splitlines())
parts = self._parseparts.findall(response)
auth = {}
for key, bare, quoted in parts:
value = (quoted or bare).strip()
auth[nativeString(key.strip())] = value
username = auth.get("username")
if not username:
raise error.LoginFailed("Invalid response, no username given.")
if "opaque" not in auth:
raise error.LoginFailed("Invalid response, no opaque given.")
if "nonce" not in auth:
raise error.LoginFailed("Invalid response, no nonce given.")
# Now verify the nonce/opaque values for this client
if self._verifyOpaque(auth.get("opaque"), auth.get("nonce"), host):
return DigestedCredentials(username, method, self.authenticationRealm, auth)
@implementer(IUsernameHashedPassword)
class CramMD5Credentials:
"""
An encapsulation of some CramMD5 hashed credentials.
@ivar challenge: The challenge to be sent to the client.
@type challenge: L{bytes}
@ivar response: The hashed response from the client.
@type response: L{bytes}
@ivar username: The username from the response from the client.
@type username: L{bytes} or L{None} if not yet provided.
"""
username = None
challenge = b""
response = b""
def __init__(self, host=None):
self.host = host
def getChallenge(self):
if self.challenge:
return self.challenge
# The data encoded in the first ready response contains an
# presumptively arbitrary string of random digits, a timestamp, and
# the fully-qualified primary host name of the server. The syntax of
# the unencoded form must correspond to that of an RFC 822 'msg-id'
# [RFC822] as described in [POP3].
# -- RFC 2195
r = random.randrange(0x7FFFFFFF)
t = time.time()
self.challenge = networkString(
"<%d.%d@%s>" % (r, t, nativeString(self.host) if self.host else None)
)
return self.challenge
def setResponse(self, response):
self.username, self.response = response.split(None, 1)
def moreChallenges(self):
return False
def checkPassword(self, password):
verify = hexlify(hmac.HMAC(password, self.challenge, digestmod=md5).digest())
return verify == self.response
@implementer(IUsernameHashedPassword)
class UsernameHashedPassword:
deprecatedModuleAttribute(
Version("Twisted", 21, 2, 0),
"Use twisted.cred.credentials.UsernamePassword instead.",
"twisted.cred.credentials",
"UsernameHashedPassword",
)
def __init__(self, username, hashed):
self.username = username
self.hashed = hashed
def checkPassword(self, password):
return self.hashed == password
@implementer(IUsernamePassword)
class UsernamePassword:
def __init__(self, username: bytes, password: bytes) -> None:
self.username = username
self.password = password
def checkPassword(self, password: bytes) -> bool:
return self.password == password
@implementer(IAnonymous)
class Anonymous:
pass
class ISSHPrivateKey(ICredentials):
"""
L{ISSHPrivateKey} credentials encapsulate an SSH public key to be checked
against a user's private key.
@ivar username: The username associated with these credentials.
@type username: L{bytes}
@ivar algName: The algorithm name for the blob.
@type algName: L{bytes}
@ivar blob: The public key blob as sent by the client.
@type blob: L{bytes}
@ivar sigData: The data the signature was made from.
@type sigData: L{bytes}
@ivar signature: The signed data. This is checked to verify that the user
owns the private key.
@type signature: L{bytes} or L{None}
"""
@implementer(ISSHPrivateKey)
class SSHPrivateKey:
def __init__(self, username, algName, blob, sigData, signature):
self.username = username
self.algName = algName
self.blob = blob
self.sigData = sigData
self.signature = signature

View File

@@ -0,0 +1,38 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Cred errors.
"""
class Unauthorized(Exception):
"""Standard unauthorized error."""
class LoginFailed(Exception):
"""
The user's request to log in failed for some reason.
"""
class UnauthorizedLogin(LoginFailed, Unauthorized):
"""The user was not authorized to log in."""
class UnhandledCredentials(LoginFailed):
"""A type of credentials were passed in with no knowledge of how to check
them. This is a server configuration error - it means that a protocol was
connected to a Portal without a CredentialChecker that can check all of its
potential authentication strategies.
"""
class LoginDenied(LoginFailed):
"""
The realm rejected this login for some reason.
Examples of reasons this might be raised include an avatar logging in
too frequently, a quota having been fully used, or the overall server
load being too high.
"""

View File

@@ -0,0 +1,154 @@
# -*- test-case-name: twisted.cred.test.test_cred -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
The point of integration of application and authentication.
"""
from typing import Callable, Dict, Iterable, List, Tuple, Type, Union
from zope.interface import Interface, providedBy
from twisted.cred import error
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.credentials import ICredentials
from twisted.internet import defer
from twisted.internet.defer import Deferred, maybeDeferred
from twisted.python import failure, reflect
# To say 'we need an Interface object', we have to say Type[Interface];
# although zope.interface has no type/instance distinctions within the
# implementation of Interface itself (subclassing it actually instantiates it),
# since mypy-zope treats Interface objects *as* types, this is how you have to
# treat it.
_InterfaceItself = Type[Interface]
# This is the result shape for both IRealm.requestAvatar and Portal.login,
# although the former is optionally allowed to return synchronously and the
# latter must be Deferred.
_requestResult = Tuple[_InterfaceItself, object, Callable[[], None]]
class IRealm(Interface):
"""
The realm connects application-specific objects to the
authentication system.
"""
def requestAvatar(
avatarId: Union[bytes, Tuple[()]], mind: object, *interfaces: _InterfaceItself
) -> Union[Deferred[_requestResult], _requestResult]:
"""
Return avatar which provides one of the given interfaces.
@param avatarId: a string that identifies an avatar, as returned by
L{ICredentialsChecker.requestAvatarId<twisted.cred.checkers.ICredentialsChecker.requestAvatarId>}
(via a Deferred). Alternatively, it may be
C{twisted.cred.checkers.ANONYMOUS}.
@param mind: usually None. See the description of mind in
L{Portal.login}.
@param interfaces: the interface(s) the returned avatar should
implement, e.g. C{IMailAccount}. See the description of
L{Portal.login}.
@returns: a deferred which will fire a tuple of (interface,
avatarAspect, logout), or the tuple itself. The interface will be
one of the interfaces passed in the 'interfaces' argument. The
'avatarAspect' will implement that interface. The 'logout' object
is a callable which will detach the mind from the avatar.
"""
class Portal:
"""
A mediator between clients and a realm.
A portal is associated with one Realm and zero or more credentials checkers.
When a login is attempted, the portal finds the appropriate credentials
checker for the credentials given, invokes it, and if the credentials are
valid, retrieves the appropriate avatar from the Realm.
This class is not intended to be subclassed. Customization should be done
in the realm object and in the credentials checker objects.
"""
checkers: Dict[Type[Interface], ICredentialsChecker]
def __init__(
self, realm: IRealm, checkers: Iterable[ICredentialsChecker] = ()
) -> None:
"""
Create a Portal to a L{IRealm}.
"""
self.realm = realm
self.checkers = {}
for checker in checkers:
self.registerChecker(checker)
def listCredentialsInterfaces(self) -> List[Type[Interface]]:
"""
Return list of credentials interfaces that can be used to login.
"""
return list(self.checkers.keys())
def registerChecker(
self, checker: ICredentialsChecker, *credentialInterfaces: Type[Interface]
) -> None:
if not credentialInterfaces:
credentialInterfaces = checker.credentialInterfaces
for credentialInterface in credentialInterfaces:
self.checkers[credentialInterface] = checker
def login(
self, credentials: ICredentials, mind: object, *interfaces: Type[Interface]
) -> Deferred[_requestResult]:
"""
@param credentials: an implementor of
L{twisted.cred.credentials.ICredentials}
@param mind: an object which implements a client-side interface for
your particular realm. In many cases, this may be None, so if the
word 'mind' confuses you, just ignore it.
@param interfaces: list of interfaces for the perspective that the mind
wishes to attach to. Usually, this will be only one interface, for
example IMailAccount. For highly dynamic protocols, however, this
may be a list like (IMailAccount, IUserChooser, IServiceInfo). To
expand: if we are speaking to the system over IMAP, any information
that will be relayed to the user MUST be returned as an
IMailAccount implementor; IMAP clients would not be able to
understand anything else. Any information about unusual status
would have to be relayed as a single mail message in an
otherwise-empty mailbox. However, in a web-based mail system, or a
PB-based client, the ``mind'' object inside the web server
(implemented with a dynamic page-viewing mechanism such as a
Twisted Web Resource) or on the user's client program may be
intelligent enough to respond to several ``server''-side
interfaces.
@return: A deferred which will fire a tuple of (interface,
avatarAspect, logout). The interface will be one of the interfaces
passed in the 'interfaces' argument. The 'avatarAspect' will
implement that interface. The 'logout' object is a callable which
will detach the mind from the avatar. It must be called when the
user has conceptually disconnected from the service. Although in
some cases this will not be in connectionLost (such as in a
web-based session), it will always be at the end of a user's
interactive session.
"""
for i in self.checkers:
if i.providedBy(credentials):
return maybeDeferred(
self.checkers[i].requestAvatarId, credentials
).addCallback(self.realm.requestAvatar, mind, *interfaces)
ifac = providedBy(credentials)
return defer.fail(
failure.Failure(
error.UnhandledCredentials(
"No checker for %s" % ", ".join(map(reflect.qual, ifac))
)
)
)

View File

@@ -0,0 +1,250 @@
# -*- test-case-name: twisted.cred.test.test_strcred -*-
#
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
#
"""
Support for resolving command-line strings that represent different
checkers available to cred.
Examples:
- passwd:/etc/passwd
- memory:admin:asdf:user:lkj
- unix
"""
import sys
from typing import Optional, Sequence, Type
from zope.interface import Attribute, Interface
from twisted.plugin import getPlugins
from twisted.python import usage
class ICheckerFactory(Interface):
"""
A factory for objects which provide
L{twisted.cred.checkers.ICredentialsChecker}.
It's implemented by twistd plugins creating checkers.
"""
authType = Attribute("A tag that identifies the authentication method.")
authHelp = Attribute(
"A detailed (potentially multi-line) description of precisely "
"what functionality this CheckerFactory provides."
)
argStringFormat = Attribute(
"A short (one-line) description of the argument string format."
)
credentialInterfaces = Attribute(
"A list of credentials interfaces that this factory will support."
)
def generateChecker(argstring):
"""
Return an L{twisted.cred.checkers.ICredentialsChecker} provider using the supplied
argument string.
"""
class StrcredException(Exception):
"""
Base exception class for strcred.
"""
class InvalidAuthType(StrcredException):
"""
Raised when a user provides an invalid identifier for the
authentication plugin (known as the authType).
"""
class InvalidAuthArgumentString(StrcredException):
"""
Raised by an authentication plugin when the argument string
provided is formatted incorrectly.
"""
class UnsupportedInterfaces(StrcredException):
"""
Raised when an application is given a checker to use that does not
provide any of the application's supported credentials interfaces.
"""
# This will be used to warn the users whenever they view help for an
# authType that is not supported by the application.
notSupportedWarning = "WARNING: This authType is not supported by " "this application."
def findCheckerFactories():
"""
Find all objects that implement L{ICheckerFactory}.
"""
return getPlugins(ICheckerFactory)
def findCheckerFactory(authType):
"""
Find the first checker factory that supports the given authType.
"""
for factory in findCheckerFactories():
if factory.authType == authType:
return factory
raise InvalidAuthType(authType)
def makeChecker(description):
"""
Returns an L{twisted.cred.checkers.ICredentialsChecker} based on the
contents of a descriptive string. Similar to
L{twisted.application.strports}.
"""
if ":" in description:
authType, argstring = description.split(":", 1)
else:
authType = description
argstring = ""
return findCheckerFactory(authType).generateChecker(argstring)
class AuthOptionMixin:
"""
Defines helper methods that can be added on to any
L{usage.Options} subclass that needs authentication.
This mixin implements three new options methods:
The opt_auth method (--auth) will write two new values to the
'self' dictionary: C{credInterfaces} (a dict of lists) and
C{credCheckers} (a list).
The opt_help_auth method (--help-auth) will search for all
available checker plugins and list them for the user; it will exit
when finished.
The opt_help_auth_type method (--help-auth-type) will display
detailed help for a particular checker plugin.
@cvar supportedInterfaces: An iterable object that returns
credential interfaces which this application is able to support.
@cvar authOutput: A writeable object to which this options class
will send all help-related output. Default: L{sys.stdout}
"""
supportedInterfaces: Optional[Sequence[Type[Interface]]] = None
authOutput = sys.stdout
def supportsInterface(self, interface):
"""
Returns whether a particular credentials interface is supported.
"""
return self.supportedInterfaces is None or interface in self.supportedInterfaces
def supportsCheckerFactory(self, factory):
"""
Returns whether a checker factory will provide at least one of
the credentials interfaces that we care about.
"""
for interface in factory.credentialInterfaces:
if self.supportsInterface(interface):
return True
return False
def addChecker(self, checker):
"""
Supply a supplied credentials checker to the Options class.
"""
# First figure out which interfaces we're willing to support.
supported = []
if self.supportedInterfaces is None:
supported = checker.credentialInterfaces
else:
for interface in checker.credentialInterfaces:
if self.supportsInterface(interface):
supported.append(interface)
if not supported:
raise UnsupportedInterfaces(checker.credentialInterfaces)
# If we get this far, then we know we can use this checker.
if "credInterfaces" not in self:
self["credInterfaces"] = {}
if "credCheckers" not in self:
self["credCheckers"] = []
self["credCheckers"].append(checker)
for interface in supported:
self["credInterfaces"].setdefault(interface, []).append(checker)
def opt_auth(self, description):
"""
Specify an authentication method for the server.
"""
try:
self.addChecker(makeChecker(description))
except UnsupportedInterfaces as e:
raise usage.UsageError("Auth plugin not supported: %s" % e.args[0])
except InvalidAuthType as e:
raise usage.UsageError("Auth plugin not recognized: %s" % e.args[0])
except Exception as e:
raise usage.UsageError("Unexpected error: %s" % e)
def _checkerFactoriesForOptHelpAuth(self):
"""
Return a list of which authTypes will be displayed by --help-auth.
This makes it a lot easier to test this module.
"""
for factory in findCheckerFactories():
for interface in factory.credentialInterfaces:
if self.supportsInterface(interface):
yield factory
break
def opt_help_auth(self):
"""
Show all authentication methods available.
"""
self.authOutput.write("Usage: --auth AuthType[:ArgString]\n")
self.authOutput.write("For detailed help: --help-auth-type AuthType\n")
self.authOutput.write("\n")
# Figure out the right width for our columns
firstLength = 0
for factory in self._checkerFactoriesForOptHelpAuth():
if len(factory.authType) > firstLength:
firstLength = len(factory.authType)
formatString = " %%-%is\t%%s\n" % firstLength
self.authOutput.write(formatString % ("AuthType", "ArgString format"))
self.authOutput.write(formatString % ("========", "================"))
for factory in self._checkerFactoriesForOptHelpAuth():
self.authOutput.write(
formatString % (factory.authType, factory.argStringFormat)
)
self.authOutput.write("\n")
raise SystemExit(0)
def opt_help_auth_type(self, authType):
"""
Show help for a particular authentication type.
"""
try:
cf = findCheckerFactory(authType)
except InvalidAuthType:
raise usage.UsageError("Invalid auth type: %s" % authType)
self.authOutput.write("Usage: --auth %s[:ArgString]\n" % authType)
self.authOutput.write("ArgString format: %s\n" % cf.argStringFormat)
self.authOutput.write("\n")
for line in cf.authHelp.strip().splitlines():
self.authOutput.write(" %s\n" % line.rstrip())
self.authOutput.write("\n")
if not self.supportsCheckerFactory(cf):
self.authOutput.write(" %s\n" % notSupportedWarning)
self.authOutput.write("\n")
raise SystemExit(0)

View File

@@ -0,0 +1,7 @@
# -*- test-case-name: twisted.cred.test -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Unit tests for C{twisted.cred}.
"""

View File

@@ -0,0 +1,89 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.cred}'s implementation of CRAM-MD5.
"""
import hashlib
from binascii import hexlify
from hmac import HMAC
from twisted.cred.credentials import CramMD5Credentials, IUsernameHashedPassword
from twisted.trial.unittest import TestCase
class CramMD5CredentialsTests(TestCase):
"""
Tests for L{CramMD5Credentials}.
"""
def test_idempotentChallenge(self) -> None:
"""
The same L{CramMD5Credentials} will always provide the same challenge,
no matter how many times it is called.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
self.assertEqual(chal, c.getChallenge())
def test_checkPassword(self) -> None:
"""
When a valid response (which is a hex digest of the challenge that has
been encrypted by the user's shared secret) is set on the
L{CramMD5Credentials} that created the challenge, and C{checkPassword}
is called with the user's shared secret, it will return L{True}.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
c.response = hexlify(HMAC(b"secret", chal, digestmod=hashlib.md5).digest())
self.assertTrue(c.checkPassword(b"secret"))
def test_noResponse(self) -> None:
"""
When there is no response set, calling C{checkPassword} will return
L{False}.
"""
c = CramMD5Credentials()
self.assertFalse(c.checkPassword(b"secret"))
def test_wrongPassword(self) -> None:
"""
When an invalid response is set on the L{CramMD5Credentials} (one that
is not the hex digest of the challenge, encrypted with the user's shared
secret) and C{checkPassword} is called with the user's correct shared
secret, it will return L{False}.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
c.response = hexlify(
HMAC(b"thewrongsecret", chal, digestmod=hashlib.md5).digest()
)
self.assertFalse(c.checkPassword(b"secret"))
def test_setResponse(self) -> None:
"""
When C{setResponse} is called with a string that is the username and
the hashed challenge separated with a space, they will be set on the
L{CramMD5Credentials}.
"""
c = CramMD5Credentials()
chal = c.getChallenge()
c.setResponse(
b" ".join(
(
b"squirrel",
hexlify(HMAC(b"supersecret", chal, digestmod=hashlib.md5).digest()),
)
)
)
self.assertTrue(c.checkPassword(b"supersecret"))
self.assertEqual(c.username, b"squirrel")
def test_interface(self) -> None:
"""
L{CramMD5Credentials} implements the L{IUsernameHashedPassword}
interface.
"""
self.assertTrue(IUsernameHashedPassword.implementedBy(CramMD5Credentials))

View File

@@ -0,0 +1,461 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.cred}, now with 30% more starch.
"""
from binascii import hexlify, unhexlify
from zope.interface import Interface, implementer
from twisted.cred import checkers, credentials, error, portal
from twisted.internet import defer
from twisted.python import components
from twisted.python.versions import Version
from twisted.trial import unittest
try:
from crypt import crypt as _crypt
except ImportError:
crypt = None
else:
crypt = _crypt
# The Twisted version in which UsernameHashedPassword is first deprecated.
_uhpVersion = Version("Twisted", 21, 2, 0)
class ITestable(Interface):
"""
An interface for a theoretical protocol.
"""
pass
class TestAvatar:
"""
A test avatar.
"""
def __init__(self, name):
self.name = name
self.loggedIn = False
self.loggedOut = False
def login(self):
assert not self.loggedIn
self.loggedIn = True
def logout(self):
self.loggedOut = True
@implementer(ITestable)
class Testable(components.Adapter):
"""
A theoretical protocol for testing.
"""
pass
components.registerAdapter(Testable, TestAvatar, ITestable)
class IDerivedCredentials(credentials.IUsernamePassword):
pass
@implementer(IDerivedCredentials, ITestable)
class DerivedCredentials:
def __init__(self, username, password):
self.username = username
self.password = password
def checkPassword(self, password):
return password == self.password
@implementer(portal.IRealm)
class TestRealm:
"""
A basic test realm.
"""
def __init__(self):
self.avatars = {}
def requestAvatar(self, avatarId, mind, *interfaces):
if avatarId in self.avatars:
avatar = self.avatars[avatarId]
else:
avatar = TestAvatar(avatarId)
self.avatars[avatarId] = avatar
avatar.login()
return (interfaces[0], interfaces[0](avatar), avatar.logout)
class CredTests(unittest.TestCase):
"""
Tests for the meat of L{twisted.cred} -- realms, portals, avatars, and
checkers.
"""
def setUp(self):
self.realm = TestRealm()
self.portal = portal.Portal(self.realm)
self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
self.checker.addUser(b"bob", b"hello")
self.portal.registerChecker(self.checker)
def test_listCheckers(self):
"""
The checkers in a portal can check only certain types of credentials.
Since this portal has
L{checkers.InMemoryUsernamePasswordDatabaseDontUse} registered, it
"""
expected = [credentials.IUsernamePassword, credentials.IUsernameHashedPassword]
got = self.portal.listCredentialsInterfaces()
self.assertEqual(sorted(got), sorted(expected))
def test_basicLogin(self):
"""
Calling C{login} on a portal with correct credentials and an interface
that the portal's realm supports works.
"""
login = self.successResultOf(
self.portal.login(
credentials.UsernamePassword(b"bob", b"hello"), self, ITestable
)
)
iface, impl, logout = login
# whitebox
self.assertEqual(iface, ITestable)
self.assertTrue(iface.providedBy(impl), f"{impl} does not implement {iface}")
# greybox
self.assertTrue(impl.original.loggedIn)
self.assertTrue(not impl.original.loggedOut)
logout()
self.assertTrue(impl.original.loggedOut)
def test_derivedInterface(self):
"""
Logging in with correct derived credentials and an interface
that the portal's realm supports works.
"""
login = self.successResultOf(
self.portal.login(DerivedCredentials(b"bob", b"hello"), self, ITestable)
)
iface, impl, logout = login
# whitebox
self.assertEqual(iface, ITestable)
self.assertTrue(iface.providedBy(impl), f"{impl} does not implement {iface}")
# greybox
self.assertTrue(impl.original.loggedIn)
self.assertTrue(not impl.original.loggedOut)
logout()
self.assertTrue(impl.original.loggedOut)
def test_failedLoginPassword(self):
"""
Calling C{login} with incorrect credentials (in this case a wrong
password) causes L{error.UnauthorizedLogin} to be raised.
"""
login = self.failureResultOf(
self.portal.login(
credentials.UsernamePassword(b"bob", b"h3llo"), self, ITestable
)
)
self.assertTrue(login)
self.assertEqual(error.UnauthorizedLogin, login.type)
def test_failedLoginName(self):
"""
Calling C{login} with incorrect credentials (in this case no known
user) causes L{error.UnauthorizedLogin} to be raised.
"""
login = self.failureResultOf(
self.portal.login(
credentials.UsernamePassword(b"jay", b"hello"), self, ITestable
)
)
self.assertTrue(login)
self.assertEqual(error.UnauthorizedLogin, login.type)
class OnDiskDatabaseTests(unittest.TestCase):
users = [
(b"user1", b"pass1"),
(b"user2", b"pass2"),
(b"user3", b"pass3"),
]
def setUp(self):
self.dbfile = self.mktemp()
with open(self.dbfile, "wb") as f:
for u, p in self.users:
f.write(u + b":" + p + b"\n")
def test_getUserNonexistentDatabase(self):
"""
A missing db file will cause a permanent rejection of authorization
attempts.
"""
self.db = checkers.FilePasswordDB("test_thisbetternoteverexist.db")
self.assertRaises(error.UnauthorizedLogin, self.db.getUser, "user")
def testUserLookup(self):
self.db = checkers.FilePasswordDB(self.dbfile)
for u, p in self.users:
self.assertRaises(KeyError, self.db.getUser, u.upper())
self.assertEqual(self.db.getUser(u), (u, p))
def testCaseInSensitivity(self):
self.db = checkers.FilePasswordDB(self.dbfile, caseSensitive=False)
for u, p in self.users:
self.assertEqual(self.db.getUser(u.upper()), (u, p))
def testRequestAvatarId(self):
self.db = checkers.FilePasswordDB(self.dbfile)
creds = [credentials.UsernamePassword(u, p) for u, p in self.users]
d = defer.gatherResults(
[defer.maybeDeferred(self.db.requestAvatarId, c) for c in creds]
)
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
def testRequestAvatarId_hashed(self):
self.db = checkers.FilePasswordDB(self.dbfile)
UsernameHashedPassword = self.getDeprecatedModuleAttribute(
"twisted.cred.credentials", "UsernameHashedPassword", _uhpVersion
)
creds = [UsernameHashedPassword(u, p) for u, p in self.users]
d = defer.gatherResults(
[defer.maybeDeferred(self.db.requestAvatarId, c) for c in creds]
)
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
class HashedPasswordOnDiskDatabaseTests(unittest.TestCase):
users = [
(b"user1", b"pass1"),
(b"user2", b"pass2"),
(b"user3", b"pass3"),
]
def setUp(self):
dbfile = self.mktemp()
self.db = checkers.FilePasswordDB(dbfile, hash=self.hash)
with open(dbfile, "wb") as f:
for u, p in self.users:
f.write(u + b":" + self.hash(u, p, u[:2]) + b"\n")
r = TestRealm()
self.port = portal.Portal(r)
self.port.registerChecker(self.db)
def hash(self, u: bytes, p: bytes, s: bytes) -> bytes:
hashed_password = crypt(p.decode("ascii"), s.decode("ascii")) # type: ignore[misc]
return hashed_password.encode("ascii")
def testGoodCredentials(self):
goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
d = defer.gatherResults([self.db.requestAvatarId(c) for c in goodCreds])
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
def testGoodCredentials_login(self):
goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
d = defer.gatherResults(
[self.port.login(c, None, ITestable) for c in goodCreds]
)
d.addCallback(lambda x: [a.original.name for i, a, l in x])
d.addCallback(self.assertEqual, [u for u, p in self.users])
return d
def testBadCredentials(self):
badCreds = [
credentials.UsernamePassword(u, b"wrong password") for u, p in self.users
]
d = defer.DeferredList(
[self.port.login(c, None, ITestable) for c in badCreds], consumeErrors=True
)
d.addCallback(self._assertFailures, error.UnauthorizedLogin)
return d
def testHashedCredentials(self):
UsernameHashedPassword = self.getDeprecatedModuleAttribute(
"twisted.cred.credentials", "UsernameHashedPassword", _uhpVersion
)
hashedCreds = [
UsernameHashedPassword(u, self.hash(None, p, u[:2])) for u, p in self.users
]
d = defer.DeferredList(
[self.port.login(c, None, ITestable) for c in hashedCreds],
consumeErrors=True,
)
d.addCallback(self._assertFailures, error.UnhandledCredentials)
return d
def _assertFailures(self, failures, *expectedFailures):
for flag, failure in failures:
self.assertEqual(flag, defer.FAILURE)
failure.trap(*expectedFailures)
return None
if crypt is None:
skip = "crypt module not available"
class CheckersMixin:
"""
L{unittest.TestCase} mixin for testing that some checkers accept
and deny specified credentials.
Subclasses must provide
- C{getCheckers} which returns a sequence of
L{checkers.ICredentialChecker}
- C{getGoodCredentials} which returns a list of 2-tuples of
credential to check and avaterId to expect.
- C{getBadCredentials} which returns a list of credentials
which are expected to be unauthorized.
"""
@defer.inlineCallbacks
def test_positive(self):
"""
The given credentials are accepted by all the checkers, and give
the expected C{avatarID}s
"""
for chk in self.getCheckers():
for cred, avatarId in self.getGoodCredentials():
r = yield chk.requestAvatarId(cred)
self.assertEqual(r, avatarId)
@defer.inlineCallbacks
def test_negative(self):
"""
The given credentials are rejected by all the checkers.
"""
for chk in self.getCheckers():
for cred in self.getBadCredentials():
d = chk.requestAvatarId(cred)
yield self.assertFailure(d, error.UnauthorizedLogin)
class HashlessFilePasswordDBMixin:
credClass = credentials.UsernamePassword
diskHash = None
@staticmethod
def networkHash(x: bytes) -> bytes:
return x
_validCredentials = [
(b"user1", b"password1"),
(b"user2", b"password2"),
(b"user3", b"password3"),
]
def getGoodCredentials(self):
for u, p in self._validCredentials:
yield self.credClass(u, self.networkHash(p)), u
def getBadCredentials(self):
for u, p in [
(b"user1", b"password3"),
(b"user2", b"password1"),
(b"bloof", b"blarf"),
]:
yield self.credClass(u, self.networkHash(p))
def getCheckers(self):
diskHash = self.diskHash or (lambda x: x)
hashCheck = self.diskHash and (
lambda username, password, stored: self.diskHash(password)
)
for cache in True, False:
fn = self.mktemp()
with open(fn, "wb") as fObj:
for u, p in self._validCredentials:
fObj.write(u + b":" + diskHash(p) + b"\n")
yield checkers.FilePasswordDB(fn, cache=cache, hash=hashCheck)
fn = self.mktemp()
with open(fn, "wb") as fObj:
for u, p in self._validCredentials:
fObj.write(diskHash(p) + b" dingle dongle " + u + b"\n")
yield checkers.FilePasswordDB(fn, b" ", 3, 0, cache=cache, hash=hashCheck)
fn = self.mktemp()
with open(fn, "wb") as fObj:
for u, p in self._validCredentials:
fObj.write(
b"zip,zap," + u.title() + b",zup," + diskHash(p) + b"\n",
)
yield checkers.FilePasswordDB(
fn, b",", 2, 4, False, cache=cache, hash=hashCheck
)
class LocallyHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
@staticmethod
def diskHash(x):
return hexlify(x)
class NetworkHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
@staticmethod
def networkHash(x: bytes) -> bytes:
return hexlify(x)
class credClass(credentials.UsernamePassword):
def checkPassword(self, password):
return unhexlify(self.password) == password
class HashlessFilePasswordDBCheckerTests(
HashlessFilePasswordDBMixin, CheckersMixin, unittest.TestCase
):
pass
class LocallyHashedFilePasswordDBCheckerTests(
LocallyHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase
):
pass
class NetworkHashedFilePasswordDBCheckerTests(
NetworkHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase
):
pass
class UsernameHashedPasswordTests(unittest.TestCase):
"""
UsernameHashedPassword is a deprecated class that is functionally
equivalent to UsernamePassword.
"""
def test_deprecation(self):
"""
Tests that UsernameHashedPassword is deprecated.
"""
self.getDeprecatedModuleAttribute(
"twisted.cred.credentials",
"UsernameHashedPassword",
_uhpVersion,
"Use twisted.cred.credentials.UsernamePassword instead.",
)

View File

@@ -0,0 +1,694 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.cred._digest} and the associated bits in
L{twisted.cred.credentials}.
"""
import base64
from binascii import hexlify
from hashlib import md5, sha1
from zope.interface.verify import verifyObject
from twisted.cred.credentials import (
DigestCredentialFactory,
IUsernameDigestHash,
calcHA1,
calcHA2,
calcResponse,
)
from twisted.cred.error import LoginFailed
from twisted.internet.address import IPv4Address
from twisted.python.compat import networkString
from twisted.trial.unittest import TestCase
def b64encode(s):
return base64.b64encode(s).strip()
class FakeDigestCredentialFactory(DigestCredentialFactory):
"""
A Fake Digest Credential Factory that generates a predictable
nonce and opaque
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.privateKey = b"0"
def _generateNonce(self):
"""
Generate a static nonce
"""
return b"178288758716122392881254770685"
def _getTime(self):
"""
Return a stable time
"""
return 0
class DigestAuthTests(TestCase):
"""
L{TestCase} mixin class which defines a number of tests for
L{DigestCredentialFactory}. Because this mixin defines C{setUp}, it
must be inherited before L{TestCase}.
"""
def setUp(self):
"""
Create a DigestCredentialFactory for testing
"""
self.username = b"foobar"
self.password = b"bazquux"
self.realm = b"test realm"
self.algorithm = b"md5"
self.cnonce = b"29fc54aa1641c6fa0e151419361c8f23"
self.qop = b"auth"
self.uri = b"/write/"
self.clientAddress = IPv4Address("TCP", "10.2.3.4", 43125)
self.method = b"GET"
self.credentialFactory = DigestCredentialFactory(self.algorithm, self.realm)
def test_MD5HashA1(self, _algorithm=b"md5", _hash=md5):
"""
L{calcHA1} accepts the C{'md5'} algorithm and returns an MD5 hash of
its parameters, excluding the nonce and cnonce.
"""
nonce = b"abc123xyz"
hashA1 = calcHA1(
_algorithm, self.username, self.realm, self.password, nonce, self.cnonce
)
a1 = b":".join((self.username, self.realm, self.password))
expected = hexlify(_hash(a1).digest())
self.assertEqual(hashA1, expected)
def test_MD5SessionHashA1(self):
"""
L{calcHA1} accepts the C{'md5-sess'} algorithm and returns an MD5 hash
of its parameters, including the nonce and cnonce.
"""
nonce = b"xyz321abc"
hashA1 = calcHA1(
b"md5-sess", self.username, self.realm, self.password, nonce, self.cnonce
)
a1 = self.username + b":" + self.realm + b":" + self.password
ha1 = hexlify(md5(a1).digest())
a1 = ha1 + b":" + nonce + b":" + self.cnonce
expected = hexlify(md5(a1).digest())
self.assertEqual(hashA1, expected)
def test_SHAHashA1(self):
"""
L{calcHA1} accepts the C{'sha'} algorithm and returns a SHA hash of its
parameters, excluding the nonce and cnonce.
"""
self.test_MD5HashA1(b"sha", sha1)
def test_MD5HashA2Auth(self, _algorithm=b"md5", _hash=md5):
"""
L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
its arguments, excluding the entity hash for QOP other than
C{'auth-int'}.
"""
method = b"GET"
hashA2 = calcHA2(_algorithm, method, self.uri, b"auth", None)
a2 = method + b":" + self.uri
expected = hexlify(_hash(a2).digest())
self.assertEqual(hashA2, expected)
def test_MD5HashA2AuthInt(self, _algorithm=b"md5", _hash=md5):
"""
L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of
its arguments, including the entity hash for QOP of C{'auth-int'}.
"""
method = b"GET"
hentity = b"foobarbaz"
hashA2 = calcHA2(_algorithm, method, self.uri, b"auth-int", hentity)
a2 = method + b":" + self.uri + b":" + hentity
expected = hexlify(_hash(a2).digest())
self.assertEqual(hashA2, expected)
def test_MD5SessHashA2Auth(self):
"""
L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth'} and
returns the same value as it does for the C{'md5'} algorithm.
"""
self.test_MD5HashA2Auth(b"md5-sess")
def test_MD5SessHashA2AuthInt(self):
"""
L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth-int'}
and returns the same value as it does for the C{'md5'} algorithm.
"""
self.test_MD5HashA2AuthInt(b"md5-sess")
def test_SHAHashA2Auth(self):
"""
L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
its arguments, excluding the entity hash for QOP other than
C{'auth-int'}.
"""
self.test_MD5HashA2Auth(b"sha", sha1)
def test_SHAHashA2AuthInt(self):
"""
L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of
its arguments, including the entity hash for QOP of C{'auth-int'}.
"""
self.test_MD5HashA2AuthInt(b"sha", sha1)
def test_MD5HashResponse(self, _algorithm=b"md5", _hash=md5):
"""
L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
of its parameters, excluding the nonce count, client nonce, and QoP
value if the nonce count and client nonce are L{None}
"""
hashA1 = b"abc123"
hashA2 = b"789xyz"
nonce = b"lmnopq"
response = hashA1 + b":" + nonce + b":" + hashA2
expected = hexlify(_hash(response).digest())
digest = calcResponse(hashA1, hashA2, _algorithm, nonce, None, None, None)
self.assertEqual(expected, digest)
def test_MD5SessionHashResponse(self):
"""
L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
hash of its parameters, excluding the nonce count, client nonce, and
QoP value if the nonce count and client nonce are L{None}
"""
self.test_MD5HashResponse(b"md5-sess")
def test_SHAHashResponse(self):
"""
L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
of its parameters, excluding the nonce count, client nonce, and QoP
value if the nonce count and client nonce are L{None}
"""
self.test_MD5HashResponse(b"sha", sha1)
def test_MD5HashResponseExtra(self, _algorithm=b"md5", _hash=md5):
"""
L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash
of its parameters, including the nonce count, client nonce, and QoP
value if they are specified.
"""
hashA1 = b"abc123"
hashA2 = b"789xyz"
nonce = b"lmnopq"
nonceCount = b"00000004"
clientNonce = b"abcxyz123"
qop = b"auth"
response = (
hashA1
+ b":"
+ nonce
+ b":"
+ nonceCount
+ b":"
+ clientNonce
+ b":"
+ qop
+ b":"
+ hashA2
)
expected = hexlify(_hash(response).digest())
digest = calcResponse(
hashA1, hashA2, _algorithm, nonce, nonceCount, clientNonce, qop
)
self.assertEqual(expected, digest)
def test_MD5SessionHashResponseExtra(self):
"""
L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5
hash of its parameters, including the nonce count, client nonce, and
QoP value if they are specified.
"""
self.test_MD5HashResponseExtra(b"md5-sess")
def test_SHAHashResponseExtra(self):
"""
L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash
of its parameters, including the nonce count, client nonce, and QoP
value if they are specified.
"""
self.test_MD5HashResponseExtra(b"sha", sha1)
def formatResponse(self, quotes=True, **kw):
"""
Format all given keyword arguments and their values suitably for use as
the value of an HTTP header.
@types quotes: C{bool}
@param quotes: A flag indicating whether to quote the values of each
field in the response.
@param **kw: Keywords and C{bytes} values which will be treated as field
name/value pairs to include in the result.
@rtype: C{bytes}
@return: The given fields formatted for use as an HTTP header value.
"""
if "username" not in kw:
kw["username"] = self.username
if "realm" not in kw:
kw["realm"] = self.realm
if "algorithm" not in kw:
kw["algorithm"] = self.algorithm
if "qop" not in kw:
kw["qop"] = self.qop
if "cnonce" not in kw:
kw["cnonce"] = self.cnonce
if "uri" not in kw:
kw["uri"] = self.uri
if quotes:
quote = b'"'
else:
quote = b""
return b", ".join(
[
b"".join((networkString(k), b"=", quote, v, quote))
for (k, v) in kw.items()
if v is not None
]
)
def getDigestResponse(self, challenge, ncount):
"""
Calculate the response for the given challenge
"""
nonce = challenge.get("nonce")
algo = challenge.get("algorithm").lower()
qop = challenge.get("qop")
ha1 = calcHA1(
algo, self.username, self.realm, self.password, nonce, self.cnonce
)
ha2 = calcHA2(algo, b"GET", self.uri, qop, None)
expected = calcResponse(ha1, ha2, algo, nonce, ncount, self.cnonce, qop)
return expected
def test_response(self, quotes=True):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
and parses it into an L{IUsernameHashedPassword} provider.
"""
challenge = self.credentialFactory.getChallenge(self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
quotes=quotes,
nonce=challenge["nonce"],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge["opaque"],
)
creds = self.credentialFactory.decode(
clientResponse, self.method, self.clientAddress.host
)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b"wrong"))
def test_responseWithoutQuotes(self):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
which does not quote the values of its fields and parses it into an
L{IUsernameHashedPassword} provider in the same way it would a
response which included quoted field values.
"""
self.test_response(False)
def test_responseWithCommaURI(self):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
which quotes the values of its fields and includes a C{b","} in the URI
field.
"""
self.uri = b"/some,path/"
self.test_response(True)
def test_caseInsensitiveAlgorithm(self):
"""
The case of the algorithm value in the response is ignored when
checking the credentials.
"""
self.algorithm = b"MD5"
self.test_response()
def test_md5DefaultAlgorithm(self):
"""
The algorithm defaults to MD5 if it is not supplied in the response.
"""
self.algorithm = None
self.test_response()
def test_responseWithoutClientIP(self):
"""
L{DigestCredentialFactory.decode} accepts a digest challenge response
even if the client address it is passed is L{None}.
"""
challenge = self.credentialFactory.getChallenge(None)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge["nonce"],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge["opaque"],
)
creds = self.credentialFactory.decode(clientResponse, self.method, None)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b"wrong"))
def test_multiResponse(self):
"""
L{DigestCredentialFactory.decode} handles multiple responses to a
single challenge.
"""
challenge = self.credentialFactory.getChallenge(self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge["nonce"],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge["opaque"],
)
creds = self.credentialFactory.decode(
clientResponse, self.method, self.clientAddress.host
)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b"wrong"))
nc = b"00000002"
clientResponse = self.formatResponse(
nonce=challenge["nonce"],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge["opaque"],
)
creds = self.credentialFactory.decode(
clientResponse, self.method, self.clientAddress.host
)
self.assertTrue(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b"wrong"))
def test_failsWithDifferentMethod(self):
"""
L{DigestCredentialFactory.decode} returns an L{IUsernameHashedPassword}
provider which rejects a correct password for the given user if the
challenge response request is made using a different HTTP method than
was used to request the initial challenge.
"""
challenge = self.credentialFactory.getChallenge(self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge["nonce"],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge["opaque"],
)
creds = self.credentialFactory.decode(
clientResponse, b"POST", self.clientAddress.host
)
self.assertFalse(creds.checkPassword(self.password))
self.assertFalse(creds.checkPassword(self.password + b"wrong"))
def test_noUsername(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
has no username field or if the username field is empty.
"""
# Check for no username
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(username=None),
self.method,
self.clientAddress.host,
)
self.assertEqual(str(e), "Invalid response, no username given.")
# Check for an empty username
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(username=b""),
self.method,
self.clientAddress.host,
)
self.assertEqual(str(e), "Invalid response, no username given.")
def test_noNonce(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
has no nonce.
"""
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(opaque=b"abc123"),
self.method,
self.clientAddress.host,
)
self.assertEqual(str(e), "Invalid response, no nonce given.")
def test_noOpaque(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response
has no opaque.
"""
e = self.assertRaises(
LoginFailed,
self.credentialFactory.decode,
self.formatResponse(),
self.method,
self.clientAddress.host,
)
self.assertEqual(str(e), "Invalid response, no opaque given.")
def test_checkHash(self):
"""
L{DigestCredentialFactory.decode} returns an L{IUsernameDigestHash}
provider which can verify a hash of the form 'username:realm:password'.
"""
challenge = self.credentialFactory.getChallenge(self.clientAddress.host)
nc = b"00000001"
clientResponse = self.formatResponse(
nonce=challenge["nonce"],
response=self.getDigestResponse(challenge, nc),
nc=nc,
opaque=challenge["opaque"],
)
creds = self.credentialFactory.decode(
clientResponse, self.method, self.clientAddress.host
)
self.assertTrue(verifyObject(IUsernameDigestHash, creds))
cleartext = self.username + b":" + self.realm + b":" + self.password
hash = md5(cleartext)
self.assertTrue(creds.checkHash(hexlify(hash.digest())))
hash.update(b"wrong")
self.assertFalse(creds.checkHash(hexlify(hash.digest())))
def test_invalidOpaque(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
value does not contain all the required parts.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
b"badOpaque",
challenge["nonce"],
self.clientAddress.host,
)
self.assertEqual(str(exc), "Invalid response, invalid opaque value")
badOpaque = b"foo-" + b64encode(b"nonce,clientip")
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badOpaque,
challenge["nonce"],
self.clientAddress.host,
)
self.assertEqual(str(exc), "Invalid response, invalid opaque value")
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
b"",
challenge["nonce"],
self.clientAddress.host,
)
self.assertEqual(str(exc), "Invalid response, invalid opaque value")
badOpaque = b"foo-" + b64encode(
b",".join(
(challenge["nonce"], networkString(self.clientAddress.host), b"foobar")
)
)
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badOpaque,
challenge["nonce"],
self.clientAddress.host,
)
self.assertEqual(str(exc), "Invalid response, invalid opaque/time values")
def test_incompatibleNonce(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
nonce from the response does not match the nonce encoded in the opaque.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
badNonceOpaque = credentialFactory._generateOpaque(
b"1234567890", self.clientAddress.host
)
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badNonceOpaque,
challenge["nonce"],
self.clientAddress.host,
)
self.assertEqual(str(exc), "Invalid response, incompatible opaque/nonce values")
exc = self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badNonceOpaque,
b"",
self.clientAddress.host,
)
self.assertEqual(str(exc), "Invalid response, incompatible opaque/nonce values")
def test_incompatibleClientIP(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the
request comes from a client IP other than what is encoded in the
opaque.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
badAddress = "10.0.0.1"
# Sanity check
self.assertNotEqual(self.clientAddress.host, badAddress)
badNonceOpaque = credentialFactory._generateOpaque(
challenge["nonce"], badAddress
)
self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badNonceOpaque,
challenge["nonce"],
self.clientAddress.host,
)
def test_oldNonce(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given
opaque is older than C{DigestCredentialFactory.CHALLENGE_LIFETIME_SECS}
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
key = b",".join(
(challenge["nonce"], networkString(self.clientAddress.host), b"-137876876")
)
digest = hexlify(md5(key + credentialFactory.privateKey).digest())
ekey = b64encode(key)
oldNonceOpaque = b"-".join((digest, ekey.strip(b"\n")))
self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
oldNonceOpaque,
challenge["nonce"],
self.clientAddress.host,
)
def test_mismatchedOpaqueChecksum(self):
"""
L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque
checksum fails verification.
"""
credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm)
challenge = credentialFactory.getChallenge(self.clientAddress.host)
key = b",".join(
(challenge["nonce"], networkString(self.clientAddress.host), b"0")
)
digest = hexlify(md5(key + b"this is not the right pkey").digest())
badChecksum = b"-".join((digest, b64encode(key)))
self.assertRaises(
LoginFailed,
credentialFactory._verifyOpaque,
badChecksum,
challenge["nonce"],
self.clientAddress.host,
)
def test_incompatibleCalcHA1Options(self):
"""
L{calcHA1} raises L{TypeError} when any of the pszUsername, pszRealm,
or pszPassword arguments are specified with the preHA1 keyword
argument.
"""
arguments = (
(b"user", b"realm", b"password", b"preHA1"),
(None, b"realm", None, b"preHA1"),
(None, None, b"password", b"preHA1"),
)
for pszUsername, pszRealm, pszPassword, preHA1 in arguments:
self.assertRaises(
TypeError,
calcHA1,
b"md5",
pszUsername,
pszRealm,
pszPassword,
b"nonce",
b"cnonce",
preHA1=preHA1,
)
def test_noNewlineOpaque(self):
"""
L{DigestCredentialFactory._generateOpaque} returns a value without
newlines, regardless of the length of the nonce.
"""
opaque = self.credentialFactory._generateOpaque(b"long nonce " * 10, None)
self.assertNotIn(b"\n", opaque)

View File

@@ -0,0 +1,101 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for basic constructs of L{twisted.cred.credentials}.
"""
from twisted.cred.credentials import (
IUsernameHashedPassword,
IUsernamePassword,
UsernamePassword,
)
from twisted.cred.test.test_cred import _uhpVersion
from twisted.trial.unittest import TestCase
class UsernamePasswordTests(TestCase):
"""
Tests for L{UsernamePassword}.
"""
def test_initialisation(self) -> None:
"""
The initialisation of L{UsernamePassword} will set C{username} and
C{password} on it.
"""
creds = UsernamePassword(b"foo", b"bar")
self.assertEqual(creds.username, b"foo")
self.assertEqual(creds.password, b"bar")
def test_correctPassword(self) -> None:
"""
Calling C{checkPassword} on a L{UsernamePassword} will return L{True}
when the password given is the password on the object.
"""
creds = UsernamePassword(b"user", b"pass")
self.assertTrue(creds.checkPassword(b"pass"))
def test_wrongPassword(self) -> None:
"""
Calling C{checkPassword} on a L{UsernamePassword} will return L{False}
when the password given is NOT the password on the object.
"""
creds = UsernamePassword(b"user", b"pass")
self.assertFalse(creds.checkPassword(b"someotherpass"))
def test_interface(self) -> None:
"""
L{UsernamePassword} implements L{IUsernamePassword}.
"""
self.assertTrue(IUsernamePassword.implementedBy(UsernamePassword))
class UsernameHashedPasswordTests(TestCase):
"""
Tests for L{UsernameHashedPassword}.
"""
def test_initialisation(self) -> None:
"""
The initialisation of L{UsernameHashedPassword} will set C{username}
and C{hashed} on it.
"""
UsernameHashedPassword = self.getDeprecatedModuleAttribute(
"twisted.cred.credentials", "UsernameHashedPassword", _uhpVersion
)
creds = UsernameHashedPassword(b"foo", b"bar")
self.assertEqual(creds.username, b"foo")
self.assertEqual(creds.hashed, b"bar")
def test_correctPassword(self) -> None:
"""
Calling C{checkPassword} on a L{UsernameHashedPassword} will return
L{True} when the password given is the password on the object.
"""
UsernameHashedPassword = self.getDeprecatedModuleAttribute(
"twisted.cred.credentials", "UsernameHashedPassword", _uhpVersion
)
creds = UsernameHashedPassword(b"user", b"pass")
self.assertTrue(creds.checkPassword(b"pass"))
def test_wrongPassword(self) -> None:
"""
Calling C{checkPassword} on a L{UsernameHashedPassword} will return
L{False} when the password given is NOT the password on the object.
"""
UsernameHashedPassword = self.getDeprecatedModuleAttribute(
"twisted.cred.credentials", "UsernameHashedPassword", _uhpVersion
)
creds = UsernameHashedPassword(b"user", b"pass")
self.assertFalse(creds.checkPassword(b"someotherpass"))
def test_interface(self) -> None:
"""
L{UsernameHashedPassword} implements L{IUsernameHashedPassword}.
"""
UsernameHashedPassword = self.getDeprecatedModuleAttribute(
"twisted.cred.credentials", "UsernameHashedPassword", _uhpVersion
)
self.assertTrue(IUsernameHashedPassword.implementedBy(UsernameHashedPassword))

View File

@@ -0,0 +1,699 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
L{twisted.cred.strcred}.
"""
import os
from io import StringIO
from typing import Sequence, Type
from unittest import skipIf
from zope.interface import Interface
from twisted import plugin
from twisted.cred import checkers, credentials, error, strcred
from twisted.plugins import cred_anonymous, cred_file, cred_unix
from twisted.python import usage
from twisted.python.fakepwd import UserDatabase
from twisted.python.filepath import FilePath
from twisted.python.reflect import requireModule
from twisted.trial.unittest import TestCase
crypt = requireModule("crypt")
pwd = requireModule("pwd")
spwd = requireModule("spwd")
def getInvalidAuthType():
"""
Helper method to produce an auth type that doesn't exist.
"""
invalidAuthType = "ThisPluginDoesNotExist"
while invalidAuthType in [
factory.authType for factory in strcred.findCheckerFactories()
]:
invalidAuthType += "_"
return invalidAuthType
class PublicAPITests(TestCase):
def test_emptyDescription(self):
"""
The description string cannot be empty.
"""
iat = getInvalidAuthType()
self.assertRaises(strcred.InvalidAuthType, strcred.makeChecker, iat)
self.assertRaises(strcred.InvalidAuthType, strcred.findCheckerFactory, iat)
def test_invalidAuthType(self):
"""
An unrecognized auth type raises an exception.
"""
iat = getInvalidAuthType()
self.assertRaises(strcred.InvalidAuthType, strcred.makeChecker, iat)
self.assertRaises(strcred.InvalidAuthType, strcred.findCheckerFactory, iat)
class StrcredFunctionsTests(TestCase):
def test_findCheckerFactories(self):
"""
L{strcred.findCheckerFactories} returns all available plugins.
"""
availablePlugins = list(strcred.findCheckerFactories())
for plg in plugin.getPlugins(strcred.ICheckerFactory):
self.assertIn(plg, availablePlugins)
def test_findCheckerFactory(self):
"""
L{strcred.findCheckerFactory} returns the first plugin
available for a given authentication type.
"""
self.assertIdentical(
strcred.findCheckerFactory("file"), cred_file.theFileCheckerFactory
)
class MemoryCheckerTests(TestCase):
def setUp(self):
self.admin = credentials.UsernamePassword("admin", "asdf")
self.alice = credentials.UsernamePassword("alice", "foo")
self.badPass = credentials.UsernamePassword("alice", "foobar")
self.badUser = credentials.UsernamePassword("x", "yz")
self.checker = strcred.makeChecker("memory:admin:asdf:alice:foo")
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('memory') returns an object
that implements the L{ICredentialsChecker} interface.
"""
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker))
self.assertIn(credentials.IUsernamePassword, self.checker.credentialInterfaces)
def test_badFormatArgString(self):
"""
An argument string which does not contain user:pass pairs
(i.e., an odd number of ':' characters) raises an exception.
"""
self.assertRaises(
strcred.InvalidAuthArgumentString, strcred.makeChecker, "memory:a:b:c"
)
def test_memoryCheckerSucceeds(self):
"""
The checker works with valid credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.admin.username)
return self.checker.requestAvatarId(self.admin).addCallback(_gotAvatar)
def test_memoryCheckerFailsUsername(self):
"""
The checker fails with an invalid username.
"""
return self.assertFailure(
self.checker.requestAvatarId(self.badUser), error.UnauthorizedLogin
)
def test_memoryCheckerFailsPassword(self):
"""
The checker fails with an invalid password.
"""
return self.assertFailure(
self.checker.requestAvatarId(self.badPass), error.UnauthorizedLogin
)
class AnonymousCheckerTests(TestCase):
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('anonymous') returns an object
that implements the L{ICredentialsChecker} interface.
"""
checker = strcred.makeChecker("anonymous")
self.assertTrue(checkers.ICredentialsChecker.providedBy(checker))
self.assertIn(credentials.IAnonymous, checker.credentialInterfaces)
def testAnonymousAccessSucceeds(self):
"""
We can log in anonymously using this checker.
"""
checker = strcred.makeChecker("anonymous")
request = checker.requestAvatarId(credentials.Anonymous())
def _gotAvatar(avatar):
self.assertIdentical(checkers.ANONYMOUS, avatar)
return request.addCallback(_gotAvatar)
@skipIf(not pwd, "Required module not available: pwd")
@skipIf(not crypt, "Required module not available: crypt")
@skipIf(not spwd, "Required module not available: spwd")
class UnixCheckerTests(TestCase):
users = {
"admin": "asdf",
"alice": "foo",
}
def _spwd_getspnam(self, username):
return spwd.struct_spwd(
(
username,
crypt.crypt(self.users[username], "F/"),
0,
0,
99999,
7,
-1,
-1,
-1,
)
)
def setUp(self):
self.admin = credentials.UsernamePassword("admin", "asdf")
self.alice = credentials.UsernamePassword("alice", "foo")
self.badPass = credentials.UsernamePassword("alice", "foobar")
self.badUser = credentials.UsernamePassword("x", "yz")
self.checker = strcred.makeChecker("unix")
self.adminBytes = credentials.UsernamePassword(b"admin", b"asdf")
self.aliceBytes = credentials.UsernamePassword(b"alice", b"foo")
self.badPassBytes = credentials.UsernamePassword(b"alice", b"foobar")
self.badUserBytes = credentials.UsernamePassword(b"x", b"yz")
self.checkerBytes = strcred.makeChecker("unix")
# Hack around the pwd and spwd modules, since we can't really
# go about reading your /etc/passwd or /etc/shadow files
if pwd:
database = UserDatabase()
for username, password in self.users.items():
database.addUser(
username,
crypt.crypt(password, "F/"),
1000,
1000,
username,
"/home/" + username,
"/bin/sh",
)
self.patch(pwd, "getpwnam", database.getpwnam)
if spwd:
self.patch(spwd, "getspnam", self._spwd_getspnam)
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('unix') returns an object
that implements the L{ICredentialsChecker} interface.
"""
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker))
self.assertIn(credentials.IUsernamePassword, self.checker.credentialInterfaces)
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checkerBytes))
self.assertIn(
credentials.IUsernamePassword, self.checkerBytes.credentialInterfaces
)
def test_unixCheckerSucceeds(self):
"""
The checker works with valid credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.admin.username)
return self.checker.requestAvatarId(self.admin).addCallback(_gotAvatar)
def test_unixCheckerSucceedsBytes(self):
"""
The checker works with valid L{bytes} credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.adminBytes.username.decode("utf-8"))
return self.checkerBytes.requestAvatarId(self.adminBytes).addCallback(
_gotAvatar
)
def test_unixCheckerFailsUsername(self):
"""
The checker fails with an invalid username.
"""
return self.assertFailure(
self.checker.requestAvatarId(self.badUser), error.UnauthorizedLogin
)
def test_unixCheckerFailsUsernameBytes(self):
"""
The checker fails with an invalid L{bytes} username.
"""
return self.assertFailure(
self.checkerBytes.requestAvatarId(self.badUserBytes),
error.UnauthorizedLogin,
)
def test_unixCheckerFailsPassword(self):
"""
The checker fails with an invalid password.
"""
return self.assertFailure(
self.checker.requestAvatarId(self.badPass), error.UnauthorizedLogin
)
def test_unixCheckerFailsPasswordBytes(self):
"""
The checker fails with an invalid L{bytes} password.
"""
return self.assertFailure(
self.checkerBytes.requestAvatarId(self.badPassBytes),
error.UnauthorizedLogin,
)
@skipIf(not crypt, "Required module is unavailable: crypt")
class CryptTests(TestCase):
"""
L{crypt} has functions for encrypting password.
"""
def test_verifyCryptedPassword(self):
"""
L{cred_unix.verifyCryptedPassword}
"""
password = "sample password ^%$"
for salt in (None, "ab"):
try:
cryptedCorrect = crypt.crypt(password, salt)
if isinstance(cryptedCorrect, bytes):
cryptedCorrect = cryptedCorrect.decode("utf-8")
except TypeError:
# Older Python versions would throw a TypeError if
# a value of None was is used for the salt.
# Newer Python versions allow it.
continue
cryptedIncorrect = "$1x1234"
self.assertTrue(cred_unix.verifyCryptedPassword(cryptedCorrect, password))
self.assertFalse(
cred_unix.verifyCryptedPassword(cryptedIncorrect, password)
)
# Python 3.3+ has crypt.METHOD_*, but not all
# platforms implement all methods.
for method in ("METHOD_SHA512", "METHOD_SHA256", "METHOD_MD5", "METHOD_CRYPT"):
cryptMethod = getattr(crypt, method, None)
if not cryptMethod:
continue
password = "interesting password xyz"
crypted = crypt.crypt(password, cryptMethod)
if isinstance(crypted, bytes):
crypted = crypted.decode("utf-8")
incorrectCrypted = crypted + "blahfooincorrect"
result = cred_unix.verifyCryptedPassword(crypted, password)
self.assertTrue(result)
# Try to pass in bytes
result = cred_unix.verifyCryptedPassword(
crypted.encode("utf-8"), password.encode("utf-8")
)
self.assertTrue(result)
result = cred_unix.verifyCryptedPassword(incorrectCrypted, password)
self.assertFalse(result)
# Try to pass in bytes
result = cred_unix.verifyCryptedPassword(
incorrectCrypted.encode("utf-8"), password.encode("utf-8")
)
self.assertFalse(result)
def test_verifyCryptedPasswordOSError(self):
"""
L{cred_unix.verifyCryptedPassword} when OSError is raised
"""
def mockCrypt(password, salt):
raise OSError("")
password = "sample password ^%$"
cryptedCorrect = crypt.crypt(password, "ab")
self.patch(crypt, "crypt", mockCrypt)
self.assertFalse(cred_unix.verifyCryptedPassword(cryptedCorrect, password))
class FileDBCheckerTests(TestCase):
"""
C{--auth=file:...} file checker.
"""
def setUp(self):
self.admin = credentials.UsernamePassword(b"admin", b"asdf")
self.alice = credentials.UsernamePassword(b"alice", b"foo")
self.badPass = credentials.UsernamePassword(b"alice", b"foobar")
self.badUser = credentials.UsernamePassword(b"x", b"yz")
self.filename = self.mktemp()
FilePath(self.filename).setContent(b"admin:asdf\nalice:foo\n")
self.checker = strcred.makeChecker("file:" + self.filename)
def _fakeFilename(self):
filename = "/DoesNotExist"
while os.path.exists(filename):
filename += "_"
return filename
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('memory') returns an object
that implements the L{ICredentialsChecker} interface.
"""
self.assertTrue(checkers.ICredentialsChecker.providedBy(self.checker))
self.assertIn(credentials.IUsernamePassword, self.checker.credentialInterfaces)
def test_fileCheckerSucceeds(self):
"""
The checker works with valid credentials.
"""
def _gotAvatar(username):
self.assertEqual(username, self.admin.username)
return self.checker.requestAvatarId(self.admin).addCallback(_gotAvatar)
def test_fileCheckerFailsUsername(self):
"""
The checker fails with an invalid username.
"""
return self.assertFailure(
self.checker.requestAvatarId(self.badUser), error.UnauthorizedLogin
)
def test_fileCheckerFailsPassword(self):
"""
The checker fails with an invalid password.
"""
return self.assertFailure(
self.checker.requestAvatarId(self.badPass), error.UnauthorizedLogin
)
def test_failsWithEmptyFilename(self):
"""
An empty filename raises an error.
"""
self.assertRaises(ValueError, strcred.makeChecker, "file")
self.assertRaises(ValueError, strcred.makeChecker, "file:")
def test_warnWithBadFilename(self):
"""
When the file auth plugin is given a file that doesn't exist, it
should produce a warning.
"""
oldOutput = cred_file.theFileCheckerFactory.errorOutput
newOutput = StringIO()
cred_file.theFileCheckerFactory.errorOutput = newOutput
strcred.makeChecker("file:" + self._fakeFilename())
cred_file.theFileCheckerFactory.errorOutput = oldOutput
self.assertIn(cred_file.invalidFileWarning, newOutput.getvalue())
@skipIf(not pwd, "Required module not available: pwd")
@skipIf(not requireModule("cryptography"), "cryptography is not available")
class SSHCheckerTests(TestCase):
"""
Tests for the C{--auth=sshkey:...} checker. The majority of the
tests for the ssh public key database checker are in
L{twisted.conch.test.test_checkers.SSHPublicKeyCheckerTestCase}.
"""
def test_isChecker(self):
"""
Verifies that strcred.makeChecker('sshkey') returns an object
that implements the L{ICredentialsChecker} interface.
"""
sshChecker = strcred.makeChecker("sshkey")
self.assertTrue(checkers.ICredentialsChecker.providedBy(sshChecker))
self.assertIn(credentials.ISSHPrivateKey, sshChecker.credentialInterfaces)
class DummyOptions(usage.Options, strcred.AuthOptionMixin):
"""
Simple options for testing L{strcred.AuthOptionMixin}.
"""
class CheckerOptionsTests(TestCase):
def test_createsList(self):
"""
The C{--auth} command line creates a list in the
Options instance and appends values to it.
"""
options = DummyOptions()
options.parseOptions(["--auth", "memory"])
self.assertEqual(len(options["credCheckers"]), 1)
options = DummyOptions()
options.parseOptions(["--auth", "memory", "--auth", "memory"])
self.assertEqual(len(options["credCheckers"]), 2)
def test_invalidAuthError(self):
"""
The C{--auth} command line raises an exception when it
gets a parameter it doesn't understand.
"""
options = DummyOptions()
# If someone adds a 'ThisPluginDoesNotExist' then this unit
# test should still run.
invalidParameter = getInvalidAuthType()
self.assertRaises(
usage.UsageError, options.parseOptions, ["--auth", invalidParameter]
)
self.assertRaises(
usage.UsageError,
options.parseOptions,
["--help-auth-type", invalidParameter],
)
def test_createsDictionary(self):
"""
The C{--auth} command line creates a dictionary mapping supported
interfaces to the list of credentials checkers that support it.
"""
options = DummyOptions()
options.parseOptions(["--auth", "memory", "--auth", "anonymous"])
chd = options["credInterfaces"]
self.assertEqual(len(chd[credentials.IAnonymous]), 1)
self.assertEqual(len(chd[credentials.IUsernamePassword]), 1)
chdAnonymous = chd[credentials.IAnonymous][0]
chdUserPass = chd[credentials.IUsernamePassword][0]
self.assertTrue(checkers.ICredentialsChecker.providedBy(chdAnonymous))
self.assertTrue(checkers.ICredentialsChecker.providedBy(chdUserPass))
self.assertIn(credentials.IAnonymous, chdAnonymous.credentialInterfaces)
self.assertIn(credentials.IUsernamePassword, chdUserPass.credentialInterfaces)
def test_credInterfacesProvidesLists(self):
"""
When two C{--auth} arguments are passed along which support the same
interface, a list with both is created.
"""
options = DummyOptions()
options.parseOptions(["--auth", "memory", "--auth", "unix"])
self.assertEqual(
options["credCheckers"],
options["credInterfaces"][credentials.IUsernamePassword],
)
def test_listDoesNotDisplayDuplicates(self):
"""
The list for C{--help-auth} does not duplicate items.
"""
authTypes = []
options = DummyOptions()
for cf in options._checkerFactoriesForOptHelpAuth():
self.assertNotIn(cf.authType, authTypes)
authTypes.append(cf.authType)
def test_displaysListCorrectly(self):
"""
The C{--help-auth} argument correctly displays all
available authentication plugins, then exits.
"""
newStdout = StringIO()
options = DummyOptions()
options.authOutput = newStdout
self.assertRaises(SystemExit, options.parseOptions, ["--help-auth"])
for checkerFactory in strcred.findCheckerFactories():
self.assertIn(checkerFactory.authType, newStdout.getvalue())
def test_displaysHelpCorrectly(self):
"""
The C{--help-auth-for} argument will correctly display the help file
for a particular authentication plugin.
"""
newStdout = StringIO()
options = DummyOptions()
options.authOutput = newStdout
self.assertRaises(
SystemExit, options.parseOptions, ["--help-auth-type", "file"]
)
for line in cred_file.theFileCheckerFactory.authHelp:
if line.strip():
self.assertIn(line.strip(), newStdout.getvalue())
def test_unexpectedException(self):
"""
When the checker specified by C{--auth} raises an unexpected error, it
should be caught and re-raised within a L{usage.UsageError}.
"""
options = DummyOptions()
err = self.assertRaises(
usage.UsageError, options.parseOptions, ["--auth", "file"]
)
self.assertEqual(str(err), "Unexpected error: 'file' requires a filename")
class OptionsForUsernamePassword(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = (credentials.IUsernamePassword,)
class OptionsForUsernameHashedPassword(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = (credentials.IUsernameHashedPassword,)
class OptionsSupportsAllInterfaces(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces = None
class OptionsSupportsNoInterfaces(usage.Options, strcred.AuthOptionMixin):
supportedInterfaces: Sequence[Type[Interface]] = []
class LimitingInterfacesTests(TestCase):
"""
Tests functionality that allows an application to limit the
credential interfaces it can support. For the purposes of this
test, we use IUsernameHashedPassword, although this will never
really be used by the command line.
(I have, to date, not thought of a half-decent way for a user to
specify a hash algorithm via the command-line. Nor do I think it's
very useful.)
I should note that, at first, this test is counter-intuitive,
because we're using the checker with a pre-defined hash function
as the 'bad' checker. See the documentation for
L{twisted.cred.checkers.FilePasswordDB.hash} for more details.
"""
def setUp(self):
self.filename = self.mktemp()
with open(self.filename, "wb") as f:
f.write(b"admin:asdf\nalice:foo\n")
self.goodChecker = checkers.FilePasswordDB(self.filename)
self.badChecker = checkers.FilePasswordDB(self.filename, hash=self._hash)
self.anonChecker = checkers.AllowAnonymousAccess()
def _hash(self, networkUsername, networkPassword, storedPassword):
"""
A dumb hash that doesn't really do anything.
"""
return networkPassword
def test_supportsInterface(self):
"""
The supportsInterface method behaves appropriately.
"""
options = OptionsForUsernamePassword()
self.assertTrue(options.supportsInterface(credentials.IUsernamePassword))
self.assertFalse(options.supportsInterface(credentials.IAnonymous))
self.assertRaises(
strcred.UnsupportedInterfaces, options.addChecker, self.anonChecker
)
def test_supportsAllInterfaces(self):
"""
The supportsInterface method behaves appropriately
when the supportedInterfaces attribute is None.
"""
options = OptionsSupportsAllInterfaces()
self.assertTrue(options.supportsInterface(credentials.IUsernamePassword))
self.assertTrue(options.supportsInterface(credentials.IAnonymous))
def test_supportsCheckerFactory(self):
"""
The supportsCheckerFactory method behaves appropriately.
"""
options = OptionsForUsernamePassword()
fileCF = cred_file.theFileCheckerFactory
anonCF = cred_anonymous.theAnonymousCheckerFactory
self.assertTrue(options.supportsCheckerFactory(fileCF))
self.assertFalse(options.supportsCheckerFactory(anonCF))
def test_canAddSupportedChecker(self):
"""
When addChecker is called with a checker that implements at least one
of the interfaces our application supports, it is successful.
"""
options = OptionsForUsernamePassword()
options.addChecker(self.goodChecker)
iface = options.supportedInterfaces[0]
# Test that we did get IUsernamePassword
self.assertIdentical(options["credInterfaces"][iface][0], self.goodChecker)
self.assertIdentical(options["credCheckers"][0], self.goodChecker)
# Test that we didn't get IUsernameHashedPassword
self.assertEqual(len(options["credInterfaces"][iface]), 1)
self.assertEqual(len(options["credCheckers"]), 1)
def test_failOnAddingUnsupportedChecker(self):
"""
When addChecker is called with a checker that does not implement any
supported interfaces, it fails.
"""
options = OptionsForUsernameHashedPassword()
self.assertRaises(
strcred.UnsupportedInterfaces, options.addChecker, self.badChecker
)
def test_unsupportedInterfaceError(self):
"""
The C{--auth} command line raises an exception when it
gets a checker we don't support.
"""
options = OptionsSupportsNoInterfaces()
authType = cred_anonymous.theAnonymousCheckerFactory.authType
self.assertRaises(usage.UsageError, options.parseOptions, ["--auth", authType])
def test_helpAuthLimitsOutput(self):
"""
C{--help-auth} will only list checkers that purport to
supply at least one of the credential interfaces our
application can use.
"""
options = OptionsForUsernamePassword()
for factory in options._checkerFactoriesForOptHelpAuth():
invalid = True
for interface in factory.credentialInterfaces:
if options.supportsInterface(interface):
invalid = False
if invalid:
raise strcred.UnsupportedInterfaces()
def test_helpAuthTypeLimitsOutput(self):
"""
C{--help-auth-type} will display a warning if you get
help for an authType that does not supply at least one of the
credential interfaces our application can use.
"""
options = OptionsForUsernamePassword()
# Find an interface that we can use for our test
invalidFactory = None
for factory in strcred.findCheckerFactories():
if not options.supportsCheckerFactory(factory):
invalidFactory = factory
break
self.assertNotIdentical(invalidFactory, None)
# Capture output and make sure the warning is there
newStdout = StringIO()
options.authOutput = newStdout
self.assertRaises(
SystemExit, options.parseOptions, ["--help-auth-type", "anonymous"]
)
self.assertIn(strcred.notSupportedWarning, newStdout.getvalue())