# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file from __future__ import unicode_literals import copy import unittest from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import rsa from jwcrypto import jwa from jwcrypto import jwe from jwcrypto import jwk from jwcrypto import jws from jwcrypto import jwt from jwcrypto.common import InvalidJWSERegOperation from jwcrypto.common import JWKeyNotFound from jwcrypto.common import JWSEHeaderParameter from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import json_decode, json_encode jwe_algs_and_rsa1_5 = jwe.default_allowed_algs + ['RSA1_5'] # RFC 7517 - A.1 PublicKeys = {"keys": [ {"kty": "EC", "crv": "P-256", "x": "[AWS-SECRET-REMOVED]7D4", "y": "[AWS-SECRET-REMOVED]FyM", "use": "enc", "kid": "1"}, {"kty": "RSA", "n": "[AWS-SECRET-REMOVED]FbWhM78LhWx4cbbf" "AAtVT86zwu1RK7aPFFxuhDR1L6tSoc_BJECPebWKRXjBZCiFV4n3oknj" "hMstn64tZ_2W-5JsGY4Hc5n9yBXArwl93lqt7_RN5w6Cf0h4QyQ5v-65" "YGjQR0_[AWS-SECRET-REMOVED]AZHzu6qMQ" "vRL5hajrn1n91CbOpbISD08qNLyrdkt-bFTWhAI4vMQFh6WeZu0fM4lF" "d2NcRwr3XPksINHaQ-G_xBniIqbw0Ls1jF44-csFCur-kEgU8awapJzK" "nqDKgw", "e": "AQAB", "alg": "RS256", "kid": "2011-04-29"}], "thumbprints": ["cn-I_WNMClehiVp51i_0VpOENW1upEerA8sEam5hn-s", "NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs"]} # RFC 7517 - A.2 PrivateKeys = {"keys": [ {"kty": "EC", "crv": "P-256", "x": "[AWS-SECRET-REMOVED]7D4", "y": "[AWS-SECRET-REMOVED]FyM", "d": "[AWS-SECRET-REMOVED]eAE", "use": "enc", "kid": "1"}, {"kty": "RSA", "n": "[AWS-SECRET-REMOVED]FbWhM78LhWx4cbb" "fAAtVT86zwu1RK7aPFFxuhDR1L6tSoc_BJECPebWKRXjBZCiFV4n3ok" "njhMstn64tZ_2W-5JsGY4Hc5n9yBXArwl93lqt7_RN5w6Cf0h4QyQ5v" "-65YGjQR0_[AWS-SECRET-REMOVED]AZHzu" "6qMQvRL5hajrn1n91CbOpbISD08qNLyrdkt-bFTWhAI4vMQFh6WeZu0" "fM4lFd2NcRwr3XPksINHaQ-G_xBniIqbw0Ls1jF44-csFCur-kEgU8a" "wapJzKnqDKgw", "e": "AQAB", "d": "X4cTteJY_gn4FYPsXB8rdXix5vwsg1FLN5E3EaG6RJoVH-HLLKD9M7d" "[AWS-SECRET-REMOVED]v7B6NqXSzUvxT0_" "YSfqijwp3RTzlBaCxWp4doFk5N2o8Gy_nHNKroADIkJ46pRUohsXywb" "ReAdYaMwFs9tv8d_cPVY3i07a3t8MN6TNwm0dSawm9v47UiCl3Sk5Zi" "[AWS-SECRET-REMOVED]uqsk0GjskDJk19Z" "4qwjwbsnn4j2WBii3RL-Us2lGVkY8fkFzme1z0HbIkfz0Y6mqnOYtqc" "0X4jfcKoAC8Q", "p": "83i-[AWS-SECRET-REMOVED]yLPQBQxtPVn" "wD20R-[AWS-SECRET-REMOVED]XicFHj7XO" "[AWS-SECRET-REMOVED]zzOGVQzXvNEvn7O" "0nVbfs", "q": "3dfOR9cuYq-0S-[AWS-SECRET-REMOVED]q" "jVZQO1dIrdwgTnCdpYzBcOfW5r370AFXjiWft_NGEiovonizhKpo9VV" "S78TzFgxkIdrecRezsZ-1kYd_s1qDbxtkDEgfAITAG9LUnADun4vIcb" "6yelxk", "dp": "G4sPXkc6Ya9y8oJW9_ILj4xuppu0lzi_H7VTkS8xj5SdX3coE0oimY" "[AWS-SECRET-REMOVED]DTP8akYhFo5tAA" "77Qe_[AWS-SECRET-REMOVED]s6rY3Ob8Y" "eiKkTiBj0", "dq": "s9lAH9fggBsoFR8Oac2R_E2gw282rT2kGOAhvIllETE1efrA6huUUv" "MfBcMpn8lqeW6vzznYY5SSQF7pMdC_agI3nG8Ibp1BUb0JUiraRNqU" "fLhcQb_[AWS-SECRET-REMOVED]-L18txX" "w494Q_cgk", "qi": "GyM_p6JrXySiz1toFgKbWV-JdI3jQ4ypu9rbMWx3rQJBfmt0FoYzgU" "IZEVFEcOqwemRN81zoDAaa-Bk0KWNGDjJHZDdDmFhW3AN7lI-puxk_" "[AWS-SECRET-REMOVED]YItdldUKGzO6Ia" "6zTKhAVRU", "alg": "RS256", "kid": "2011-04-29"}]} # RFC 7517 - A.3 SymmetricKeys = {"keys": [ {"kty": "oct", "alg": "A128KW", "k": "GawgguFyGrWKav7AX4VKUg"}, {"kty": "oct", "k": "AyM1SysPpbyDfgZld3umj1qzKObwVMkoqQ-EstJQLr_T-1qS0gZH7" "5aKtMN3Yj0iPS4hcgUuTwjAzZr1Z9CAow", "kid": "HMAC key used in JWS A.1 example"}]} # RFC 7517 - B Useofx5c = {"kty": "RSA", "use": "sig", "kid": "1b94c", "n": "vrjOfz9Ccdgx5nQudyhdoR17V-IubWMeOZCwX_jj0hgAsz2J_pqYW08PLbK" "_[AWS-SECRET-REMOVED]_-7dYbsr4iJmG0Qu2j" "8DsVyT1azpJC_[AWS-SECRET-REMOVED]Wq4aYW" "Achc8t-emd9qOvWtVMDC2BXksRngh6X5bUYLy6AyHKvj-nUy1wgzjYQDwHM" "TplCoLtU-o-[AWS-SECRET-REMOVED]-qo-meMv" "VfJb6jJVWRpl2SUtCnYG2C32qvbWbjZ_jBPD5eunqsIo1vQ", "e": "AQAB", "x5c": ["[AWS-SECRET-REMOVED]DQEBBQUAMGIxCzAJ" "[AWS-SECRET-REMOVED]BxMGRGVudmVyMRww" "[AWS-SECRET-REMOVED]FQYDVQQDEw5Ccmlh" "[AWS-SECRET-REMOVED]ODA4MTQyMjI5MTVa" "[AWS-SECRET-REMOVED]MA0GA1UEBxMGRGVu" "[AWS-SECRET-REMOVED]cnAuMRcwFQYDVQQD" "[AWS-SECRET-REMOVED]AQEBBQADggEPADCC" "[AWS-SECRET-REMOVED]HjmQsF/449IYALM9" "[AWS-SECRET-REMOVED]bgizcAblIgjtdf/u" "[AWS-SECRET-REMOVED]gmj6He4iO8NCyvaK" "[AWS-SECRET-REMOVED]V5LEZ4Iel+W1GC8u" "[AWS-SECRET-REMOVED]kaBhPbiZAS3YeYBR" "[AWS-SECRET-REMOVED]Qp2Btgt9qr21m42f" "[AWS-SECRET-REMOVED]AAOCAQEAh8zGlfSl" "[AWS-SECRET-REMOVED]+9gGlqCz5iWLOgWs" "[AWS-SECRET-REMOVED]Oe1zFo+Owb1zxtp3" "[AWS-SECRET-REMOVED]k+fbEL2Bo3UPGrps" "[AWS-SECRET-REMOVED]9JiSlYhzo4tpzd5r" "[AWS-SECRET-REMOVED]dgQTxkumGmTqgawR" "[AWS-SECRET-REMOVED]4xSvdXK3IVfOWA==" ]} # RFC 7517 - C.1 RSAPrivateKey = {"kty": "RSA", "kid": "juliet@capulet.lit", "use": "enc", "n": "[AWS-SECRET-REMOVED]J66w7nyoK1gPNq" "FMSQRyO125Gp-[AWS-SECRET-REMOVED]R" "0-Iqom-QFcNP8Sjg086MwoqQU_LYywlAGZ21WSdS_PERyGFiNnj3QQ" "[AWS-SECRET-REMOVED]YnDyGxvjYGDSM-" "AqWS9zIQ2ZilgT-GqUmipg0XOC0Cc20rgLe2ymLHjpHciCKVAbY5-L" "32-lSeZO-Os6U15_aXrk9Gw8cPUaX1_I8sLGuSiVdt3C_Fn2PZ3Z8i" "744FPFGGcG1qs2Wz-Q", "e": "AQAB", "d": "GRtbIQmhOZtyszfgKdg4u_N-R_mZGU_9k7JQ_jn1DnfTuMdSNprTea" "STyWfSNkuaAwnOEbIQVy1IQbWVV25NY3ybc_IhUJtfri7bAXYEReWa" "Cl3hdlPKXy9UvqPYGR0kIXTQRqns-dVJ7jahlI7LyckrpTmrM8dWBo" "4_[AWS-SECRET-REMOVED]9CsCZ2vsUDms" "XOfUENOyMqADC6p1M3h33tsurY15k9qMSpG9OX_IJAXmxzAh_tWiZO" "wk2K4yxH9tS3Lq1yX8C1EWmeRDkK2ahecG85-oLKQt5VEpWHKmjOi_" "gJSdSgqcN96X52esAQ", "p": "[AWS-SECRET-REMOVED]GRxrTmQdl1ZK9u" "w-PIHfQP0FkxXVrx-WE-ZEbrqivH_2iCLUS7wAl6XvARt1KkIaUxPP" "SYB9yk31s0Q8UK96E3_OrADAYtAJs-M3JxCLfNgqh56HDnETTQhH3r" "CT5T3yJws", "q": "1u_[AWS-SECRET-REMOVED]gQxaj10RWjs" "Zu0c6Iedis4S7B_coSKB0Kj9PaPaBzg-IySRvvcQuPamQu66riMhjV" "tG6TlV8CLCYKrYl52ziqK0E_ym2QnkwsUX7eYTB7LbAHRK9GqocDE5" "B0f808I4s", "dp": "KkMTWqBUefVwZ2_Dbj1pPQqyHSHjj90L5x_MOzqYAJMcLMZtbUtwK" "[AWS-SECRET-REMOVED]OOf_a_HgMXK_l" "hqigI4y_kqS1wY52IwjUn5rgRrJ-yYo1h41KR-vz2pYhEAeYrhttW" "txVqLCRViD6c", "dq": "AvfS0-[AWS-SECRET-REMOVED]Z1Er7t1" "[AWS-SECRET-REMOVED]5KKS3TQtQ5qCz" "[AWS-SECRET-REMOVED]Y63TmmEAu_lRF" "COJ3xDea-ots", "qi": "lSQi-[AWS-SECRET-REMOVED]BUczScEo" "PwmUqqabu9V0-Py4dQ57_bapoKRu1R90bvuFnU63SHWEFglZQvJDM" "eAvmj4sm-Fp0oYu_neotgQ0hzbI5gry7ajdYy9-2lNx_76aBZoOUu" "9HCJ-UsfSOI8"} # From # vectors/cryptography_vectors/asymmetric/PEM_Serialization/rsa_private_key.pem RSAPrivatePEM = b"""[RSA-PRIVATE-KEY-REMOVED] Proc-Type: 4,ENCRYPTED DEK-Info: DES-EDE3-CBC,B4B3C8C536E57CBE [AWS-SECRET-REMOVED]Cuju/HQZoWXtok1xoi5e2Ovw [AWS-SECRET-REMOVED]GFG6MY2uZaVtLSiAFXf1N8PL [AWS-SECRET-REMOVED]RxF7M1XVQPGhrU0OfxX9VODe [AWS-SECRET-REMOVED]X07n+yw3qeyNQNYdmDNP7iCZ [AWS-SECRET-REMOVED]ITjKgEny7MKqU1/jTrOJWu85 [AWS-SECRET-REMOVED]hzzsabX7KuANKRn3fgUN+yZS [AWS-SECRET-REMOVED]tDReCkfzgnyIdjaIp3Pdp5yN [AWS-SECRET-REMOVED]4SfZN9Aw57woi8eSRpLBEn+C [AWS-SECRET-REMOVED]LJuM8sU8QyB2oir1M/qYFhTC [AWS-SECRET-REMOVED]AjxeD/9PhiKSoMEX8Q/8Npti [AWS-SECRET-REMOVED]LD+S3QH8VCwTB2AuKq7zyuD6 [AWS-SECRET-REMOVED]QpBsPsI2ssZn/MptNqyEN9TW [AWS-SECRET-REMOVED]CY3dhvvoxPaXahyAxBfpKArB [AWS-SECRET-REMOVED]Imjw6RXM3QtuzbZ71zp+reL8 [AWS-SECRET-REMOVED]Ios5KXmYkWoU5hEuGM3tK4Lx [AWS-SECRET-REMOVED]OmvMOsv3RVpyfgJ07z5b9X1B [AWS-SECRET-REMOVED]pvqgnkEwUSdbY6bRknLETmfo [AWS-SECRET-REMOVED]X/kmZK0IJVBpikL8SuM/ZJLK [AWS-SECRET-REMOVED]2DtMx9dPoCqYhPHsIGc/td0/ [AWS-SECRET-REMOVED]lqVJsuxP75pbdiKS/hCKRf8D [AWS-SECRET-REMOVED]5BLu8vyRqFduhg98GVXJmD7k [AWS-SECRET-REMOVED]8CL6ZagU6K1utfHIMrqKkJME [AWS-SECRET-REMOVED]+6Nln4ilbBJIi8W/yzeM/WFj [AWS-SECRET-REMOVED]yaLdLrSn1/SzuLL6I7pYZ0Um [AWS-SECRET-REMOVED]0GH/1WVuHeQ93A== -----END RSA PRIVATE KEY----- """ RSAPrivatePassword = b"123456" # From # vectors/cryptography_vectors/asymmetric/PEM_Serialization/rsa_public_key.pem RSAPublicPEM = b"""-----BEGIN PUBLIC KEY----- [AWS-SECRET-REMOVED]AQEAnR4AZ+tgWYql+S3MaTQ6 [AWS-SECRET-REMOVED]GL1NpuuS9CpbR5EQ3d71bD0v [AWS-SECRET-REMOVED]was6KGof2BA4bFNCw/fPbuhk [AWS-SECRET-REMOVED]/q/pu85YLAVOxq6wlUCzP4FM [AWS-SECRET-REMOVED]nVoQbo5EQYkUMXE8WmadSyHl [AWS-SECRET-REMOVED]wHdqKGkuR1kQ0XqokK1yGKFZ 8wIDAQAB -----END PUBLIC KEY----- """ # From cryptography/vectors/cryptography_vectors/x509/v1_cert.pem PublicCert = b"""-----BEGIN CERTIFICATE----- [AWS-SECRET-REMOVED]MAkGA1UEBhMCQVUxDDAKBgNV [AWS-SECRET-REMOVED]ZXN0IENBMB4XDTk1MDYxOTIz [AWS-SECRET-REMOVED]BhMCQVUxDDAKBgNVBAgTA1FM [AWS-SECRET-REMOVED]cnQwXDANBgkqhkiG9w0BAQEF [AWS-SECRET-REMOVED]2LNZbcPBp9/OfIcYAXBQn8hO [AWS-SECRET-REMOVED]MAwGCCqGSIb3DQIFBQADQQAE [AWS-SECRET-REMOVED]uioZniZp5lFzAw4+YzPQ7XKJ zl9HYIMxATFyqSiD9jsx -----END CERTIFICATE----- """ PublicCertThumbprint = '7KITkGJF74IZ9NKVvHfuJILbuIZny6j-roaNjB1vgiA' # RFC 8037 - A.2 PublicKeys_EdDsa = { "keys": [ { "kty": "OKP", "crv": "Ed25519", "x": "11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo" }, ], "thumbprints": ["kPrK_qmxVWaYVA9wwBF6Iuo3vVzz7TxHCTwXBygrS4k"] } # RFC 8037 - A.1 PrivateKeys_EdDsa = { "keys": [ { "kty": "OKP", "crv": "Ed25519", "d": "nWGxne_9WmC6hEr0kuwsxERJxWl7MmkZcDusAxyuf2A", "x": "11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo"}, ] } PublicKeys_secp256k1 = { "keys": [ { "kty": "EC", "crv": "secp256k1", "x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow", "y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8" }, { "kty": "EC", "crv": "P-256K", "x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow", "y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8" }, ] } PrivateKeys_secp256k1 = { "keys": [ { "kty": "EC", "crv": "secp256k1", "x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow", "y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8", "d": "[AWS-SECRET-REMOVED]9dA" }, { "kty": "EC", "crv": "P-256K", "x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow", "y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8", "d": "[AWS-SECRET-REMOVED]9dA" } ] } PublicKeys_brainpool = { "keys": [ { "kty": "EC", "crv": "BP-256", "x": "mpkJ29_CYAD0mzQ_MsrbjFMFYtcc9Oxpro37Fa4cLfI", "y": "[AWS-SECRET-REMOVED]Hl8", }, { "kty": "EC", "crv": "BP-384", "x": ("[AWS-SECRET-REMOVED]PM" "ytf4mRY9JaAITdItjvUTAh"), "y": ("KNLRTNdvUg66aB_TVW4POZkE3q8S0YoQrCzYUrExRDe" "_BXikkqIama-GYQ3UBOQL"), }, { "kty": "EC", "crv": "BP-512", "x": ("aQXpvz7DH9OK5eFNO9dY3BdPY1v0-8Rg9KC322PY1Jy" "BJq3EhT0uR_-tgbL2E_aGP6k56lF1xIOOtQxo8zziGA"), "y": ("l9XLHHncigOPr5Tvnj_mVzBFv6i7rdBQrLTq3RXZlCC" "_f_q6L2o79K9IrN_J2wWxAfS8ekuGPGlHZUzK-3D9sA"), } ] } PrivateKeys_brainpool = { "keys": [ { "kty": "EC", "crv": "BP-256", "x": "mpkJ29_CYAD0mzQ_MsrbjFMFYtcc9Oxpro37Fa4cLfI", "y": "[AWS-SECRET-REMOVED]Hl8", "d": "KdKRgq0WEM97BQw3jpW_fTOep6fn-Samv4DfDNb-4s4" }, { "kty": "EC", "crv": "BP-384", "x": ("[AWS-SECRET-REMOVED]PM" "ytf4mRY9JaAITdItjvUTAh"), "y": ("KNLRTNdvUg66aB_TVW4POZkE3q8S0YoQrCzYUrExRDe" "_BXikkqIama-GYQ3UBOQL"), "d": ("B5WeRV0-RztAPAhRbphSAUrsIzy-eSfWGSM5FxOQGlJ" "cq-ECLA_-SIlH7NdWIEJY") }, { "kty": "EC", "crv": "BP-512", "x": ("aQXpvz7DH9OK5eFNO9dY3BdPY1v0-8Rg9KC322PY1Jy" "BJq3EhT0uR_-tgbL2E_aGP6k56lF1xIOOtQxo8zziGA"), "y": ("l9XLHHncigOPr5Tvnj_mVzBFv6i7rdBQrLTq3RXZlCC" "_f_q6L2o79K9IrN_J2wWxAfS8ekuGPGlHZUzK-3D9sA"), "d": ("F_[AWS-SECRET-REMOVED]L" "[AWS-SECRET-REMOVED]KZA") } ] } Ed25519PrivatePEM = b"""[PRIVATE-KEY-REMOVED] [AWS-SECRET-REMOVED]5toHKh6LDO2MCbSYNZTkMXDU -----END PRIVATE KEY----- """ Ed25519PublicPEM = b"""-----BEGIN PUBLIC KEY----- [AWS-SECRET-REMOVED]XihH1EQDa/O3utHLtqc= -----END PUBLIC KEY----- """ X25519PrivatePEM = b"""[PRIVATE-KEY-REMOVED] [AWS-SECRET-REMOVED]xb1u5nSRokrNaQYEsgu9O+hP -----END PRIVATE KEY----- """ X25519PublicPEM = b"""-----BEGIN PUBLIC KEY----- [AWS-SECRET-REMOVED]JFP019S+oq4wyAoWPnQ= -----END PUBLIC KEY----- """ ECPublicPEM = b"""-----BEGIN PUBLIC KEY----- [AWS-SECRET-REMOVED]t82WMJxqTuXCZxnvwrx4enQj [AWS-SECRET-REMOVED]TAkthwTuixuojpGKmw== -----END PUBLIC KEY----- """ ECPublicJWK = { "crv": "P-256", "kid": "[AWS-SECRET-REMOVED]AWY", "kty": "EC", "x": "hvGzt82WMJxqTuXCZxnvwrx4enQj6xc-erlhbTq8gTM", "y": "ACQczUT26bo-DTsEwsI6Ia7WNEwJLYcE7osbqI6Rips" } X25519PublicJWK = { 'crv': 'X25519', 'kid': '9cgLEZD5VsaV9dUPNehs2pOwxtmH-EWHJY-pC74Wjak', 'kty': 'OKP', 'x': 'W-m9ugi1psQFx6dtTl6J_XZ4JFP019S-oq4wyAoWPnQ' } class TestJWK(unittest.TestCase): def test_create_pubKeys(self): keylist = PublicKeys['keys'] for key in keylist: jwk.JWK(**key) def test_create_priKeys(self): keylist = PrivateKeys['keys'] for key in keylist: jwk.JWK(**key) def test_create_symKeys(self): keylist = SymmetricKeys['keys'] for key in keylist: jwkey = jwk.JWK(**key) jwkey.get_op_key('sign') jwkey.get_op_key('verify') e = jwkey.export() self.assertEqual(json_decode(e), key) jwk.JWK(**Useofx5c) jwk.JWK(**RSAPrivateKey) def test_generate_keys(self): jwk.JWK.generate(kty='oct', size=256) jwk.JWK.generate(kty='RSA', size=4096) jwk.JWK.generate(kty='EC', curve='P-521') k = jwk.JWK.generate(kty='oct', alg='A192KW', kid='MySymmetricKey') self.assertEqual(k['kid'], 'MySymmetricKey') self.assertEqual(len(base64url_decode(k.get_op_key('encrypt'))), 24) jwk.JWK.generate(kty='RSA', alg='RS256') k = jwk.JWK.generate(kty='RSA', size=4096, alg='RS256') self.assertEqual(k.get_op_key('encrypt').key_size, 4096) def test_export_public_keys(self): k = jwk.JWK(**RSAPrivateKey) jk = k.export_public() self.assertFalse('d' in json_decode(jk)) k2 = jwk.JWK(**json_decode(jk)) self.assertEqual(k['kid'], k2['kid']) def test_generate_oct_key(self): key = jwk.JWK.generate(kty='oct', size=128) e = jwe.JWE('test', '{"alg":"A128KW","enc":"A128GCM"}') e.add_recipient(key) enc = e.serialize() e.deserialize(enc, key) self.assertEqual(e.payload.decode('utf-8'), 'test') # also test key generation with input_keysize != keysize key = jwk.JWK.generate(kty='oct', alg="A128CBC-HS256") self.assertEqual(len(base64url_decode(key['k'])), 32) e = jwe.JWE('test', '{"alg":"A256KW","enc":"A128CBC-HS256"}') e.add_recipient(key) enc = e.serialize() e.deserialize(enc, key) self.assertEqual(e.payload.decode('utf-8'), 'test') def test_generate_EC_key(self): # Backwards compat curve key = jwk.JWK.generate(kty='EC', curve='P-256') key.get_op_key('verify', 'P-256') # New param key = jwk.JWK.generate(kty='EC', crv='P-521') key.get_op_key('verify', 'P-521') # New param prevails key = jwk.JWK.generate(kty='EC', curve='P-256', crv='P-521') key.get_op_key('verify', 'P-521') # New secp256k curve key = jwk.JWK.generate(kty='EC', curve='secp256k1') key.get_op_key('verify', 'secp256k1') # Brainpool256R1 curve key = jwk.JWK.generate(kty='EC', crv='BP-256') key.get_op_key('verify', 'BP-256') # Brainpool384R1 curve key = jwk.JWK.generate(kty='EC', crv='BP-384') key.get_op_key('verify', 'BP-384') # Brainpool256R1 curve key = jwk.JWK.generate(kty='EC', crv='BP-512') key.get_op_key('verify', 'BP-512') def test_generate_OKP_keys(self): for crv in jwk.ImplementedOkpCurves: key = jwk.JWK.generate(kty='OKP', crv=crv) self.assertEqual(key['crv'], crv) def test_import_pyca_keys(self): rsa1 = rsa.generate_private_key(65537, 1024, default_backend()) krsa1 = jwk.JWK.from_pyca(rsa1) self.assertEqual(krsa1['kty'], 'RSA') krsa2 = jwk.JWK.from_pyca(rsa1.public_key()) self.assertEqual(krsa1.get_op_key('verify').public_numbers().n, krsa2.get_op_key('verify').public_numbers().n) ec1 = ec.generate_private_key(ec.SECP256R1(), default_backend()) kec1 = jwk.JWK.from_pyca(ec1) self.assertEqual(kec1['kty'], 'EC') kec2 = jwk.JWK.from_pyca(ec1.public_key()) self.assertEqual(kec1.get_op_key('verify').public_numbers().x, kec2.get_op_key('verify').public_numbers().x) self.assertRaises(jwk.InvalidJWKValue, jwk.JWK.from_pyca, {}) def test_jwk_from_json(self): k = jwk.JWK.generate(kty='oct', size=256) y = jwk.JWK.from_json(k.export()) self.assertEqual(k.export(), y.export()) def test_jwkset(self): k = jwk.JWK(**RSAPrivateKey) ks = jwk.JWKSet() ks.add(k) ks2 = jwk.JWKSet() ks2.import_keyset(ks.export()) self.assertEqual(len(ks), len(ks2)) self.assertEqual(len(ks), 1) k1 = ks.get_key(RSAPrivateKey['kid']) k2 = ks2.get_key(RSAPrivateKey['kid']) self.assertEqual(k1, k2) self.assertEqual(k1['d'], RSAPrivateKey['d']) # test class method import too ks3 = jwk.JWKSet.from_json(ks.export()) self.assertEqual(len(ks), len(ks3)) # Test key set with multiple keys ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys)) num = 0 for item in ksm: self.assertTrue(isinstance(item, jwk.JWK)) self.assertTrue(item in ksm) num += 1 self.assertEqual(num, len(PrivateKeys['keys'])) def test_jwkset_get_keys(self): # Test key set with multiple keys ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys)) k1 = jwk.JWK.from_json(json_encode(PrivateKeys['keys'][0])) kwargs = RSAPrivateKey.copy() kwargs['kid'] = '1' k2 = jwk.JWK(**kwargs) self.assertEqual(k1, ksm.get_key('1')) self.assertIsNone(ksm.get_key('not-there')) ksm.add(k2) self.assertEqual({k1, k2}, ksm.get_keys('1')) self.assertEqual(3, len(ksm['keys'])) # Expect that duplicate kids will # raise an exception when we use get_key with self.assertRaises(jwk.InvalidJWKValue): ksm.get_key('1') def test_jwkset_issue_208(self): ks = jwk.JWKSet() key1 = RSAPrivateKey.copy() key1['kid'] = 'kid_1' ks.add(jwk.JWK(**key1)) key2 = RSAPrivateKey.copy() key2['kid'] = 'kid_2' ks.add(jwk.JWK(**key2)) ks2 = jwk.JWKSet() ks2.import_keyset(ks.export()) self.assertEqual(len(ks['keys']), 2) self.assertEqual(len(ks['keys']), len(ks2['keys'])) def test_thumbprint(self): for i in range(0, len(PublicKeys['keys'])): k = jwk.JWK(**PublicKeys['keys'][i]) self.assertEqual( k.thumbprint(), PublicKeys['thumbprints'][i]) def test_import_from_pem(self): pubk = jwk.JWK.from_pem(RSAPublicPEM) self.assertEqual(pubk.export_to_pem(), RSAPublicPEM) rsapub = pubk.get_op_key('verify') prik = jwk.JWK.from_pem(RSAPrivatePEM, [PASSWORD-REMOVED]) rsapri = prik.get_op_key('sign') self.assertEqual(rsapri.public_key().public_numbers().n, rsapub.public_numbers().n) pubc = jwk.JWK.from_pem(PublicCert) self.assertEqual(pubc['kid'], PublicCertThumbprint) def test_import_ec_from_pem(self): pub_ec = jwk.JWK.from_pem(ECPublicPEM) self.assertEqual(pub_ec.export_to_pem(), ECPublicPEM) self.assertEqual(json_decode(pub_ec.export()), ECPublicJWK) def test_import_x25519_from_pem(self): pub_x25519 = jwk.JWK.from_pem(X25519PublicPEM) self.assertEqual(pub_x25519.export_to_pem(), X25519PublicPEM) self.assertEqual(json_decode(pub_x25519.export()), X25519PublicJWK) def test_export_symmetric(self): key = jwk.JWK(**SymmetricKeys['keys'][0]) self.assertTrue(key.is_symmetric) self.assertFalse(key.has_public) self.assertFalse(key.has_private) self.assertEqual(json_encode(SymmetricKeys['keys'][0]), key.export_symmetric()) def test_export_public(self): key = jwk.JWK.from_pem(PublicCert) self.assertFalse(key.is_symmetric) self.assertTrue(key.has_public) self.assertFalse(key.has_private) pubc = key.export_public() self.assertEqual(json_decode(pubc)["kid"], PublicCertThumbprint) def test_export_private(self): key = jwk.JWK.from_pem(RSAPrivatePEM, [PASSWORD-REMOVED]) self.assertFalse(key.is_symmetric) self.assertTrue(key.has_public) self.assertTrue(key.has_private) pri = key.export_private() prikey = jwk.JWK(**json_decode(pri)) self.assertTrue(prikey.has_private) pub = key.export_public() pubkey = jwk.JWK(**json_decode(pub)) self.assertFalse(pubkey.has_private) self.assertEqual(prikey['kid'], pubkey['kid']) def test_export_as_dict(self): key = jwk.JWK(**SymmetricKeys['keys'][1]) k = key.export_symmetric(as_dict=True) self.assertEqual(k['kid'], SymmetricKeys['keys'][1]['kid']) key = jwk.JWK.from_pem(PublicCert) k = key.export_public(as_dict=True) self.assertEqual(k['kid'], PublicCertThumbprint) key = jwk.JWK.from_pem(RSAPrivatePEM, [PASSWORD-REMOVED]) k = key.export_private(as_dict=True) self.assertEqual(k['kid'], 'x31vrbZceU2qOPLtrUwPkLa3PNakMn9tOsq_ntFVrJc') keyset = jwk.JWKSet.from_json(json_encode(PrivateKeys)) ks = keyset.export(as_dict=True) self.assertTrue('keys' in ks) def test_public(self): key = jwk.JWK.from_pem(RSAPrivatePEM, [PASSWORD-REMOVED]) self.assertTrue(key.has_public) self.assertTrue(key.has_private) pubkey = key.public() self.assertTrue(pubkey.has_public) self.assertFalse(pubkey.has_private) # finally check public works e = jwe.JWE('plaintext', '{"alg":"RSA-OAEP","enc":"A256GCM"}') e.add_recipient(pubkey) enc = e.serialize() d = jwe.JWE() d.deserialize(enc, key) self.assertEqual(d.payload, b'plaintext') def test_invalid_value(self): with self.assertRaises(jwk.InvalidJWKValue): jwk.JWK(kty='oct', k=b'\x01') def test_create_pubKeys_eddsa(self): keylist = PublicKeys_EdDsa['keys'] for key in keylist: jwk.JWK(**key) def test_create_priKeys_eddsa(self): keylist = PrivateKeys_EdDsa['keys'] for key in keylist: jwk.JWK(**key) def test_create_pubKeys_secp256k1(self): keylist = PublicKeys_secp256k1['keys'] for key in keylist: jwk.JWK(**key) def test_create_priKeys_secp256k1(self): keylist = PrivateKeys_secp256k1['keys'] for key in keylist: jwk.JWK(**key) def test_create_pubKeys_brainpool(self): keylist = PublicKeys_brainpool['keys'] for key in keylist: jwk.JWK(**key) def test_create_priKeys_brainpool(self): keylist = PrivateKeys_brainpool['keys'] for key in keylist: jwk.JWK(**key) def test_thumbprint_eddsa(self): for i in range(0, len(PublicKeys_EdDsa['keys'])): k = jwk.JWK(**PublicKeys_EdDsa['keys'][i]) self.assertEqual( k.thumbprint(), PublicKeys_EdDsa['thumbprints'][i]) def test_pem_okp(self): payload = b'Imported private Ed25519' prikey = jwk.JWK.from_pem(Ed25519PrivatePEM) self.assertTrue(prikey.has_private) self.assertTrue(prikey.has_public) s = jws.JWS(payload) s.add_signature(prikey, None, {'alg': 'EdDSA'}, None) sig = s.serialize() pubkey = jwk.JWK.from_pem(Ed25519PublicPEM) self.assertTrue(pubkey.has_public) self.assertFalse(pubkey.has_private) jws_token = jws.JWS() jws_token.deserialize(sig, pubkey, alg="EdDSA") self.assertTrue(jws_token.objects['valid']) self.assertEqual(jws_token.payload, payload) def test_jwk_as_dict(self): key = jwk.JWK(**PublicKeys['keys'][0]) self.assertEqual(key['kty'], 'EC') self.assertEqual(key.kty, 'EC') self.assertEqual(key.x, key['x']) self.assertEqual(key.kid, '1') key = jwk.JWK(**PublicKeys['keys'][1]) self.assertEqual(key['kty'], 'RSA') self.assertEqual(key.n, key['n']) with self.assertRaises(AttributeError): # pylint: disable=pointless-statement key.d with self.assertRaises(AttributeError): key.x = 'xyz' with self.assertRaises(jwk.InvalidJWKValue): key['n'] = '!!!' with self.assertRaises(jwk.InvalidJWKValue): key.e = '3' key.unknown = '1' key['unknown'] = 2 self.assertFalse(key.unknown == key['unknown']) def test_jwk_from_password(self): key = jwk.JWK.from_password('test password') self.assertEqual(key['kty'], 'oct') self.assertEqual(key['k'], 'dGVzdCBwYXNzd29yZA') def test_p256k_alias(self): key = jwk.JWK.generate(kty='EC', curve='P-256K') key.get_op_key('verify', 'secp256k1') pub_k = jwk.JWK(**PrivateKeys_secp256k1['keys'][0]) pri_k = jwk.JWK(**PrivateKeys_secp256k1['keys'][1]) payload = bytes(bytearray(A1_payload)) test = jws.JWS(payload) test.add_signature(pri_k, None, json_encode({"alg": "ES256K"}), None) test_serialization_compact = test.serialize(compact=True) verify = jws.JWS() verify.deserialize(test_serialization_compact) verify.verify(pub_k.public()) self.assertEqual(verify.payload, payload) def test_thumbprint_uri(self): k = jwk.JWK(**PublicKeys['keys'][1]) self.assertEqual( k.thumbprint_uri(), "urn:ietf:params:oauth:jwk-thumbprint:sha-256:{}".format( PublicKeys['thumbprints'][1])) # RFC 7515 - A.1 A1_protected = \ [123, 34, 116, 121, 112, 34, 58, 34, 74, 87, 84, 34, 44, 13, 10, 32, 34, 97, 108, 103, 34, 58, 34, 72, 83, 50, 53, 54, 34, 125] A1_payload = \ [123, 34, 105, 115, 115, 34, 58, 34, 106, 111, 101, 34, 44, 13, 10, 32, 34, 101, 120, 112, 34, 58, 49, 51, 48, 48, 56, 49, 57, 51, 56, 48, 44, 13, 10, 32, 34, 104, 116, 116, 112, 58, 47, 47, 101, 120, 97, 109, 112, 108, 101, 46, 99, 111, 109, 47, 105, 115, 95, 114, 111, 111, 116, 34, 58, 116, 114, 117, 101, 125] A1_signature = \ [116, 24, 223, 180, 151, 153, 224, 37, 79, 250, 96, 125, 216, 173, 187, 186, 22, 212, 37, 77, 105, 214, 191, 240, 91, 88, 5, 88, 83, 132, 141, 121] A1_example = {'key': SymmetricKeys['keys'][1], 'alg': 'HS256', 'protected': bytes(bytearray(A1_protected)).decode('utf-8'), 'payload': bytes(bytearray(A1_payload)), 'signature': bytes(bytearray(A1_signature))} # RFC 7515 - A.2 A2_protected = \ [123, 34, 97, 108, 103, 34, 58, 34, 82, 83, 50, 53, 54, 34, 125] A2_payload = A1_payload A2_key = \ {"kty": "RSA", "n": "ofgWCuLjybRlzo0tZWJjNiuSfb4p4fAkd_wWJcyQoTbji9k0l8W26mPddx" "HmfHQp-Vaw-4qPCJrcS2mJPMEzP1Pt0Bm4d4QlL-yRT-SFd2lZS-pCgNMs" "D1W_YpRPEwOWvG6b32690r2jZ47soMZo9wGzjb_7OMg0LOL-bSf63kpaSH" "SXndS5z5rexMdbBYUsLA9e-KXBdQOS-UTo7WTBEMa2R2CapHg665xsmtdV" "MTBQY4uDZlxvb3qCo5ZwKh9kG4LT6_I5IhlJH7aGhyxXFvUK-DWNmoudF8" "NAco9_[AWS-SECRET-REMOVED]yuXpoQ", "e": "AQAB", "d": "[AWS-SECRET-REMOVED]pXyWPEAeP88vLNO97I" "jlA7_GQ5sLKMgvfTeXZx9SE-7YwVol2NXOoAJe46sui395IW_GO-pWJ1O0" "BkTGoVEn2bKVRUCgu-[AWS-SECRET-REMOVED]" "439X0M_V51gfpRLI9JYanrC4D4qAdGcopV_0ZHHzQlBjudU2QvXt4ehNYT" "CBr6XCLQUShb1juUO1ZdiYoFaFQT5Tw8bGUl_x_jTj3ccPDVZFD9pIuhLh" "BOneufuBiB4cS98l2SR_RQyGWSeWjnczT0QU91p1DhOVRuOopznQ", "p": "4BzEEOtIpmVdVEZNCqS7baC4crd0pqnRH_5IB3jw3bcxGn6QLvnEtfdUdi" "YrqBdss1l58BQ3KhooKeQTa9AB0Hw_Py5PJdTJNPY8cQn7ouZ2KKDcmnPG" "BY5t7yLc1QlQ5xHdwW1VhvKn-nXqhJTBgIPgtldC-KDV5z-y2XDwGUc", "q": "uQPEfgmVtjL0Uyyx88GZFF1fOunH3-7cepKmtH4pxhtCoHqpWmT8YAmZxa" "ewHgHAjLYsp1ZSe7zFYHj7C6ul7TjeLQeZD_YwD66t62wDmpe_HlB-TnBA" "-[AWS-SECRET-REMOVED]9rjcJIDEz9RWdc", "dp": "BwKfV3Akq5_MFZDFZCnW-wzl-CCo83WoZvnLQwCTeDv8uzluRSnm71I3Q" "CLdhrqE2e9YkxvuxdBfpT_PI7Yz-FOKnu1R6HsJeDCjn12Sk3vmAktV2zb" "34MCdy7cpdTh_[AWS-SECRET-REMOVED]Yo0", "dq": "h_96-mK1R_[AWS-SECRET-REMOVED]OlODsWa" "7I9xXDoRwbKgB719rrmI2oKr6N3Do9U0ajaHF-NKJnwgjMd2w9cjz3_-ky" "NlxAr2v4IKhGNpmM5iIgOS1VZnOZ68m6_pbLBSp3nssTdlqvd0tIiTHU", "qi": "IYd7DHOhrWvxkwPQsRM2tOgrjbcrfvtQJipd-DlcxyVuuM9sQLdgjVk2o" "y26F0EmpScGLq2MowX7fhd_QJQ3ydy5cY7YIBi87w93IKLEdfnbJtoOPLU" "W0ITrJReOgo1cq9SbsxYawBgfp_gh6A5603k2-ZQwVK0JKSHuLFkuQ3U"} A2_signature = \ [112, 46, 33, 137, 67, 232, 143, 209, 30, 181, 216, 45, 191, 120, 69, 243, 65, 6, 174, 27, 129, 255, 247, 115, 17, 22, 173, 209, 113, 125, 131, 101, 109, 66, 10, 253, 60, 150, 238, 221, 115, 162, 102, 62, 81, 102, 104, 123, 0, 11, 135, 34, 110, 1, 135, 237, 16, 115, 249, 69, 229, 130, 173, 252, 239, 22, 216, 90, 121, 142, 232, 198, 109, 219, 61, 184, 151, 91, 23, 208, 148, 2, 190, 237, 213, 217, 217, 112, 7, 16, 141, 178, 129, 96, 213, 248, 4, 12, 167, 68, 87, 98, 184, 31, 190, 127, 249, 217, 46, 10, 231, 111, 36, 242, 91, 51, 187, 230, 244, 74, 230, 30, 177, 4, 10, 203, 32, 4, 77, 62, 249, 18, 142, 212, 1, 48, 121, 91, 212, 189, 59, 65, 238, 202, 208, 102, 171, 101, 25, 129, 253, 228, 141, 247, 127, 55, 45, 195, 139, 159, 175, 221, 59, 239, 177, 139, 93, 163, 204, 60, 46, 176, 47, 158, 58, 65, 214, 18, 202, 173, 21, 145, 18, 115, 160, 95, 35, 185, 232, 56, 250, 175, 132, 157, 105, 132, 41, 239, 90, 30, 136, 121, 130, 54, 195, 212, 14, 96, 69, 34, 165, 68, 200, 242, 122, 122, 45, 184, 6, 99, 209, 108, 247, 202, 234, 86, 222, 64, 92, 178, 33, 90, 69, 178, 194, 85, 102, 181, 90, 193, 167, 72, 160, 112, 223, 200, 163, 42, 70, 149, 67, 208, 25, 238, 251, 71] A2_example = {'key': A2_key, 'alg': 'RS256', 'protected': bytes(bytearray(A2_protected)).decode('utf-8'), 'payload': bytes(bytearray(A2_payload)), 'signature': bytes(bytearray(A2_signature))} # RFC 7515 - A.3 A3_protected = \ [123, 34, 97, 108, 103, 34, 58, 34, 69, 83, 50, 53, 54, 34, 125] A3_payload = A2_payload A3_key = \ {"kty": "EC", "crv": "P-256", "x": "[AWS-SECRET-REMOVED]VEU", "y": "x_FEzRu9m36HLN_tue659LNpXW6pCyStikYjKIWI5a0", "d": "jpsQnnGQmL-YBIffH1136cspYG6-0iY7X1fCE9-E9LI"} A3_signature = \ [14, 209, 33, 83, 121, 99, 108, 72, 60, 47, 127, 21, 88, 7, 212, 2, 163, 178, 40, 3, 58, 249, 124, 126, 23, 129, 154, 195, 22, 158, 166, 101] + \ [197, 10, 7, 211, 140, 60, 112, 229, 216, 241, 45, 175, 8, 74, 84, 128, 166, 101, 144, 197, 242, 147, 80, 154, 143, 63, 127, 138, 131, 163, 84, 213] A3_example = {'key': A3_key, 'alg': 'ES256', 'protected': bytes(bytearray(A3_protected)).decode('utf-8'), 'payload': bytes(bytearray(A3_payload)), 'signature': bytes(bytearray(A3_signature))} # RFC 7515 - A.4 A4_protected = \ [123, 34, 97, 108, 103, 34, 58, 34, 69, 83, 53, 49, 50, 34, 125] A4_payload = [80, 97, 121, 108, 111, 97, 100] A4_key = \ {"kty": "EC", "crv": "P-521", "x": "[AWS-SECRET-REMOVED]xrrjcBpXp5EOnYG_" "NjFZ6OvLFV1jSfS9tsz4qUxcWceqwQGk", "y": "ADSmRA43Z1DSNx_RvcLI87cdL07l6jQyyBXMoxVg_l2Th-x3S1WDhjDl" "y79ajL4Kkd0AZMaZmh9ubmf63e3kyMj2", "d": "AY5pb7A0UFiB3RELSD64fTLOSV_jazdF7fLYyuTw8lOfRhWg6Y6rUrPA" "xerEzgdRhajnu0ferB0d53vM9mE15j2C"} A4_signature = \ [1, 220, 12, 129, 231, 171, 194, 209, 232, 135, 233, 117, 247, 105, 122, 210, 26, 125, 192, 1, 217, 21, 82, 91, 45, 240, 255, 83, 19, 34, 239, 71, 48, 157, 147, 152, 105, 18, 53, 108, 163, 214, 68, 231, 62, 153, 150, 106, 194, 164, 246, 72, 143, 138, 24, 50, 129, 223, 133, 206, 209, 172, 63, 237, 119, 109] + \ [0, 111, 6, 105, 44, 5, 41, 208, 128, 61, 152, 40, 92, 61, 152, 4, 150, 66, 60, 69, 247, 196, 170, 81, 193, 199, 78, 59, 194, 169, 16, 124, 9, 143, 42, 142, 131, 48, 206, 238, 34, 175, 83, 203, 220, 159, 3, 107, 155, 22, 27, 73, 111, 68, 68, 21, 238, 144, 229, 232, 148, 188, 222, 59, 242, 103] A4_example = {'key': A4_key, 'alg': 'ES512', 'protected': bytes(bytearray(A4_protected)).decode('utf-8'), 'payload': bytes(bytearray(A4_payload)), 'signature': bytes(bytearray(A4_signature))} # RFC 7515 - A.4 A5_protected = 'eyJhbGciOiJub25lIn0' A5_payload = A2_payload A5_key = \ {"kty": "oct", "k": ""} A5_signature = b'' A5_example = {'key': A5_key, 'alg': 'none', 'protected': base64url_decode(A5_protected).decode('utf-8'), 'payload': bytes(bytearray(A5_payload)), 'signature': A5_signature} A6_serialized = \ '{' + \ '"payload":' + \ '"[AWS-SECRET-REMOVED]ODAsDQogImh0dHA6Ly9leGF' + \ 'tcGxlLmNvbS9pc19yb290Ijp0cnVlfQ",' + \ '"signatures":[' + \ '{"protected":"eyJhbGciOiJSUzI1NiJ9",' + \ '"header":' + \ '{"kid":"2010-12-29"},' + \ '"signature":' + \ '"cC4hiUPoj9Eetdgtv3hF80EGrhuB__dzERat0XF9g2VtQgr9PJbu3XOiZj5RZ' + \ 'mh7AAuHIm4Bh-0Qc_lF5YKt_O8W2Fp5jujGbds9uJdbF9CUAr7t1dnZcAcQjb' + \ 'KBYNX4BAynRFdiuB--f_[AWS-SECRET-REMOVED]l' + \ 'b1L07Qe7K0GarZRmB_eSN9383LcOLn6_dO--xi12jzDwusC-eOkHWEsqtFZES' + \ '[AWS-SECRET-REMOVED]8np6LbgGY9Fs98rqVt5AX' + \ 'LIhWkWywlVmtVrBp0igcN_IoypGlUPQGe77Rw"},' + \ '{"protected":"eyJhbGciOiJFUzI1NiJ9",' + \ '"header":' + \ '{"kid":"[HEROKU-API-KEY-REMOVED]"},' + \ '"signature":' + \ '"DtEhU3ljbEg8L38VWAfUAqOyKAM6-Xx-F4GawxaepmXFCgfTjDxw5djxLa8IS' + \ 'lSApmWQxfKTUJqPP3-Kg6NU1Q"}]' + \ '}' A6_example = { 'payload': bytes(bytearray(A2_payload)), 'key1': jwk.JWK(**A2_key), 'protected1': bytes(bytearray(A2_protected)).decode('utf-8'), 'header1': json_encode({"kid": "2010-12-29"}), 'key2': jwk.JWK(**A3_key), 'protected2': bytes(bytearray(A3_protected)).decode('utf-8'), 'header2': json_encode({"kid": "[HEROKU-API-KEY-REMOVED]"}), 'serialized': A6_serialized, 'jose_header': [{"kid": "2010-12-29", "alg": "RS256"}, {"kid": "[HEROKU-API-KEY-REMOVED]", "alg": "ES256"}]} A7_example = \ '{' + \ '"payload":' + \ '"[AWS-SECRET-REMOVED]ODAsDQogImh0dHA6Ly9leGF' + \ 'tcGxlLmNvbS9pc19yb290Ijp0cnVlfQ",' + \ '"protected":"eyJhbGciOiJFUzI1NiJ9",' + \ '"header":' + \ '{"kid":"[HEROKU-API-KEY-REMOVED]"},' + \ '"signature":' + \ '"DtEhU3ljbEg8L38VWAfUAqOyKAM6-Xx-F4GawxaepmXFCgfTjDxw5djxLa8IS' + \ 'lSApmWQxfKTUJqPP3-Kg6NU1Q"' + \ '}' E_negative = \ '[AWS-SECRET-REMOVED]Oi8vZXhhbXBsZS5jb20vVU5ERU' + \ '[AWS-SECRET-REMOVED]9VTkRFRklORUQiOnRydWUNCn0.' + \ 'RkFJTA.' customhdr_jws_example = \ '{' + \ '"payload":' + \ '"[AWS-SECRET-REMOVED]ODAsDQogImh0dHA6Ly9leGF' + \ 'tcGxlLmNvbS9pc19yb290Ijp0cnVlfQ",' + \ '"protected":"eyJhbGciOiJFUzI1NiJ9",' + \ '"header":' + \ '{"kid":"[HEROKU-API-KEY-REMOVED]", ' + \ '"custom1":"custom_val"},' + \ '"signature":' + \ '"DtEhU3ljbEg8L38VWAfUAqOyKAM6-Xx-F4GawxaepmXFCgfTjDxw5djxLa8IS' + \ 'lSApmWQxfKTUJqPP3-Kg6NU1Q"' + \ '}' class TestJWS(unittest.TestCase): def check_sign(self, test): s = jws.JWSCore(test['alg'], jwk.JWK(**test['key']), test['protected'], test['payload'], test.get('allowed_algs', None)) sig = s.sign() decsig = base64url_decode(sig['signature']) s.verify(decsig) # ECDSA signatures are always different every time # they are generated unlike RSA or symmetric ones if test['key']['kty'] != 'EC': self.assertEqual(decsig, test['signature']) else: # Check we can verify the test signature independently # this is so that we can test the ECDSA against a known # good signature s.verify(test['signature']) def test_A1(self): self.check_sign(A1_example) def test_A2(self): self.check_sign(A2_example) def test_A3(self): self.check_sign(A3_example) def test_A4(self): self.check_sign(A4_example) def test_A5(self): self.assertRaises(jws.InvalidJWSOperation, self.check_sign, A5_example) a5_bis = {'allowed_algs': ['none']} a5_bis.update(A5_example) self.check_sign(a5_bis) def test_A6(self): s = jws.JWS(A6_example['payload']) s.add_signature(A6_example['key1'], None, A6_example['protected1'], A6_example['header1']) s.add_signature(A6_example['key2'], None, A6_example['protected2'], A6_example['header2']) s.verify(A6_example['key1']) s.verify(A6_example['key2']) sig = s.serialize() s.deserialize(sig, A6_example['key1']) s.deserialize(A6_serialized, A6_example['key2']) self.assertEqual(A6_example['jose_header'], s.jose_header) def test_A7(self): s = jws.JWS(A6_example['payload']) s.deserialize(A7_example, A6_example['key2']) def test_E(self): s = jws.JWS(A6_example['payload']) with self.assertRaises(jws.InvalidJWSSignature): s.deserialize(E_negative) s.verify(None) def test_customhdr_jws(self): # Test pass header check def jws_chk1(jwobj): return jwobj.jose_header['custom1'] == 'custom_val' newhdr = JWSEHeaderParameter('Custom header 1', False, True, jws_chk1) newreg = {'custom1': newhdr} s = jws.JWS(A6_example['payload'], header_registry=newreg) s.deserialize(customhdr_jws_example, A6_example['key2']) # Test fail header check def jws_chk2(jwobj): return jwobj.jose_header['custom1'] == 'custom_not' newhdr = JWSEHeaderParameter('Custom header 1', False, True, jws_chk2) newreg = {'custom1': newhdr} s = jws.JWS(A6_example['payload'], header_registry=newreg) with self.assertRaises(jws.InvalidJWSSignature): s.deserialize(customhdr_jws_example, A6_example['key2']) def test_customhdr_jws_exists(self): newhdr = JWSEHeaderParameter('Custom header 1', False, True, None) newreg = {'alg': newhdr} with self.assertRaises(InvalidJWSERegOperation): jws.JWS(A6_example['payload'], header_registry=newreg) def test_EdDsa_signing_and_verification(self): examples = [] if 'Ed25519' in jwk.ImplementedOkpCurves: examples = [E_Ed25519] for curve_example in examples: key = jwk.JWK.from_json(curve_example['key_json']) payload = curve_example['payload'] protected_header = curve_example['protected_header'] jws_test = jws.JWS(payload) jws_test.add_signature(key, None, json_encode(protected_header), None) jws_test_serialization_compact = \ jws_test.serialize(compact=True) self.assertEqual(jws_test_serialization_compact, curve_example['jws_serialization_compact']) jws_verify = jws.JWS() jws_verify.deserialize(jws_test_serialization_compact) jws_verify.verify(key.public()) self.assertEqual(jws_verify.payload.decode('utf-8'), curve_example['payload']) def test_secp256k1_signing_and_verification(self): key = jwk.JWK(**PrivateKeys_secp256k1['keys'][0]) payload = bytes(bytearray(A1_payload)) jws_test = jws.JWS(payload) jws_test.add_signature(key, None, json_encode({"alg": "ES256K"}), None) jws_test_serialization_compact = jws_test.serialize(compact=True) jws_verify = jws.JWS() jws_verify.deserialize(jws_test_serialization_compact) jws_verify.verify(key.public()) self.assertEqual(jws_verify.payload, payload) def test_brainpool_signing_and_verification(self): for key_data in PrivateKeys_brainpool['keys']: key = jwk.JWK(**key_data) payload = bytes(bytearray(A1_payload)) jws_test = jws.JWS(payload) curve_name = key.get('crv') if curve_name == "BP-256": alg = "BP256R1" elif curve_name == "BP-384": alg = "BP384R1" else: alg = "BP512R1" jws_test.allowed_algs = [alg] jws_test.add_signature(key, None, json_encode({"alg": alg}), None) jws_test_serialization_compact = jws_test.serialize(compact=True) jws_verify = jws.JWS() jws_verify.allowed_algs = [alg] jws_verify.deserialize(jws_test_serialization_compact) jws_verify.verify(key.public()) self.assertEqual(jws_verify.payload, payload) def test_jws_issue_224(self): key = jwk.JWK().generate(kty='oct') # Test Empty payload is supported for creating and verifying signatures s = jws.JWS(payload='') s.add_signature(key, None, json_encode({"alg": "HS256"})) o1 = s.serialize(compact=True) self.assertTrue('..' in o1) o2 = json_decode(s.serialize()) self.assertEqual(o2['payload'], '') t = jws.JWS() t.deserialize(o1) t.verify(key) def test_jws_issue_281(self): header = {"alg": "HS256"} header_copy = copy.deepcopy(header) key = jwk.JWK().generate(kty='oct') s = jws.JWS(payload='test') s.add_signature(key, protected=header, header={"kid": key.thumbprint()}) self.assertEqual(header, header_copy) def test_decrypt_keyset(self): ks = jwk.JWKSet() key1 = jwk.JWK.generate(kty='oct', alg='HS256', kid='key1') key2 = jwk.JWK.generate(kty='oct', alg='HS384', kid='key2') key3 = jwk.JWK.generate(kty='oct', alg='HS512', kid='key3') ks.add(key1) ks.add(key2) s1 = jws.JWS(payload=b'secret') s1.add_signature(key1, protected='{"alg":"HS256"}') s2 = jws.JWS() s2.deserialize(s1.serialize(), ks) self.assertEqual(s2.payload, b'secret') s3 = jws.JWS(payload=b'secret') s3.add_signature(key3, protected='{"alg":"HS256"}') s4 = jws.JWS() with self.assertRaises(JWKeyNotFound): s4.deserialize(s3.serialize(), ks) E_A1_plaintext = \ [84, 104, 101, 32, 116, 114, 117, 101, 32, 115, 105, 103, 110, 32, 111, 102, 32, 105, 110, 116, 101, 108, 108, 105, 103, 101, 110, 99, 101, 32, 105, 115, 32, 110, 111, 116, 32, 107, 110, 111, 119, 108, 101, 100, 103, 101, 32, 98, 117, 116, 32, 105, 109, 97, 103, 105, 110, 97, 116, 105, 111, 110, 46] E_A1_protected = "[AWS-SECRET-REMOVED]Q00ifQ" E_A1_key = \ {"kty": "RSA", "n": "oahUIoWw0K0usKNuOR6H4wkf4oBUXHTxRvgb48E-BVvxkeDNjbC4he8rUW" "[AWS-SECRET-REMOVED]gjT9LewND8MW2Krf3S" "psk_ZkoFnilakGygTwpZ3uesH-PFABNIUYpOiN15dsQRkgr0vEhxN92i2a" "sbOenSZeyaxziK72UwxrrKoExv6kc5twXTq4h-QChLOln0_mtUZwfsRaMS" "tPs6mS6XrgxnxbWhojf663tuEQueGC-FCMfra36C9knDFGzKsNa7LZK2dj" "YgyD3JR_MB_4NUJW_TqOQtwHYbxevoJArm-L5StowjzGy-_bq6Gw", "e": "AQAB", "d": "kLdtIj6GbDks_ApCSTYQtelcNttlKiOyPzMrXHeI-yk1F7-kpDxY4-WY5N" "WV5KntaEeXS1j82E375xxhWMHXyvjYecPT9fpwR_M9gV8n9Hrh2anTpTD9" "[AWS-SECRET-REMOVED]A2xAwnm2X7uGR1hghk" "qDp0Vqj3kbSCz1XyfCs6_LehBwtxHIyh8Ripy40p24moOAbgxVw3rxT_vl" "t3UVe4WO3JkJOzlpUf-KTVI2Ptgm-dARxTEtE-id-4OJr0h-K-VFs3VSnd" "VTIznSxfyrj8ILL6MG_Uv8YAu7VILSB3lOW085-4qE3DzgrTjgyQ", "p": "1r52Xk46c-LsfB5P442p7atdPUrxQSy4mti_tZI3Mgf2EuFVbUoDBvaRQ-" "[AWS-SECRET-REMOVED]_hI2_1eTSPVZfzL0lf" "fNn03IXqWF5MDFuoUYE0hzb2vhrlN_rKrbfDIwUbTrjjgieRbwC6Cl0", "q": "wLb35x7hmQWZsWJmB_[AWS-SECRET-REMOVED]" "UDdaDYVTSNx_8Fyw1YYa9XGrGnDew00J28cRUoeBB_jKI1oma0Orv1T9aX" "[AWS-SECRET-REMOVED]AfwTs0g68UZHvtc", "dp": "ZK-YwE7diUh0qR1tR7w8WHtolDx3MZ_OTowiFvgfeQ3SiresXjm9gZ5KL" "hMXvo-uz-KUJWDxS5pFQ_M0evdo1dKiRTjVw_x4NyqyXPM5nULPkcpU827" "rnpZzAJKpdhWAgqrXGKAECQH0Xt4taznjnd_zVpAmZZq60WPMBMfKcuE", "dq": "[AWS-SECRET-REMOVED]ZYdBiMRAy86x0vHCj" "[AWS-SECRET-REMOVED]9Tt0cj8qqyFpz9vNDB" "UfCAiJVeESOjJDZPYHdHY8v1b-o-Z2X5tvLx-TCekf7oxyeKDUqKWjis", "qi": "VIMpMYbPf47dT1w_zDUXfPimsSegnMOA1zTaX7aGk_8urY6R8-ZW1FxU7" "[AWS-SECRET-REMOVED]anOsXGOAOv82cHq0E3" "eL4HrtZkUuKvnPrMnsUUFlfUdybVzxyjz9JF_XyaY14ardLSjf4L_FNY"} E_A1_vector = \ "[AWS-SECRET-REMOVED]Q00ifQ." \ "[AWS-SECRET-REMOVED]UmKoTCVJRgckCL9kiMT03JGe" \ "ipsEdY3mx_[AWS-SECRET-REMOVED]c9d-StnImGyFDb" \ "[AWS-SECRET-REMOVED]m_lDp5XnZAYpQdb76FdIKLaV" \ "mqgfwX7XWRxv2322i-vDxRfqNzo_tETKzpVLzfiwQyeyPGLBIO56YJ7eObdv0je8" \ "[AWS-SECRET-REMOVED]WXRcZ_ZT2LawVCWTIy3brGPi" \ "6UklfCpIMfIjf7iGdXKHzg." \ "48V1_ALb6US04U3b." \ "5eym8TW_[AWS-SECRET-REMOVED]tX_EFShS8iB7j6ji" \ "SdiwkIr3ajwQzaBtQD_A." \ "XFBoMYUZodetZdvTiFvSkQ" E_A1_ex = {'key': jwk.JWK(**E_A1_key), 'protected': base64url_decode(E_A1_protected), 'plaintext': bytes(bytearray(E_A1_plaintext)), 'vector': E_A1_vector} E_A2_plaintext = "Live long and prosper." E_A2_protected = "[AWS-SECRET-REMOVED]LUhTMjU2In0" E_A2_key = \ {"kty": "RSA", "n": "[AWS-SECRET-REMOVED]ueri23bOdgWp4Dy1Wl" "[AWS-SECRET-REMOVED]jU1hGzHHyXss8UDpre" "[AWS-SECRET-REMOVED]VRdhS8exSZEy_c4gs_" "7svlJJQ4H9_NxsiIoLwAEk7-Q3UXERGYw_75IDrGA84-lA_-Ct4eTlXHBI" "[AWS-SECRET-REMOVED]LuF_RJLqHpM2kgWFLU" "7-[AWS-SECRET-REMOVED]N-aEAw3vRw", "e": "AQAB", "d": "[AWS-SECRET-REMOVED]pTBMwhprMzWzpR8Sxq" "1OPThh_[AWS-SECRET-REMOVED]VOku0aZb-ry" "nq8cxjDTLZQ6Fz7jSjR1Klop-YKaUHc9GsEofQqYruPhzSA-QgajZGPbE_" "[AWS-SECRET-REMOVED]zWEEceUjNnTNo_CVSj" "-[AWS-SECRET-REMOVED]7reG_DUwg9Ch-Kyvj" "[AWS-SECRET-REMOVED]NdizK2zF2CWQ", "p": "[AWS-SECRET-REMOVED]uk4kLwBKcoe1x4HG68" "[AWS-SECRET-REMOVED]wBYnbTp87X-xcPWlEP" "krdoUKW60tgs1aNd_Nnc9LEVVPMS390zbFxt8TN_biaBgelNgbC95sM", "q": "uKlCKvKv_ZJMVcdIs5vVSU_6cPtYI1ljWytExV_skstvRSNi9r66jdd9-y" "[AWS-SECRET-REMOVED]iYhQXw1jIABfw-MVsN" "-[AWS-SECRET-REMOVED]UwNOjYelmkiNS0", "dp": "[AWS-SECRET-REMOVED]hG4EcL6FWbX5h9yuv" "ngs4iLEFk6eALoUS4vIWEwcL4txw9LsWH_zKI-hwoReoP77cOdSL4AVcra" "[AWS-SECRET-REMOVED]jZxbvk-RR3pEhnCs", "dq": "o_8V14SezckO6CNLKs_btPdFiO9_kC1DsuUTd2LAfIIVeMZ7jn1Gus_Ff" "7B7IVx3p5KuBGOVF8L-qifLb6nQnLysgHDh132NDioZkhH7mI7hPG-PYE_" "odApKdnqECHWw0J-F0JWnUd6D2B_1TvF9mXA2Qx-iGYn8OVV1Bsmp6qU", "qi": "eNho5yRBEBxhGBtQRww9QirZsB66TrfFReG_CcteI1aCneT0ELGhYlRlC" "tUkTRclIfuEPmNsNDPbLoLqqCVznFbvdB7x-Tl-m0l_eFTj2KiqwGqE9PZ" "B9nNTwMVvH3VRRSLWACvPnSiwP8N5Usy-WRXS-V7TbpxIhvepTfE0NNo"} E_A2_vector = \ "[AWS-SECRET-REMOVED]LUhTMjU2In0." \ "UGhIOguC7IuEvf_[AWS-SECRET-REMOVED]w7Zx0-kFm" \ "1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgNZ__deLKxGHZ7Pc" \ "HALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRirb6Y5Cl_p-ko3YvkkysZIF" \ "[AWS-SECRET-REMOVED]lV7elprCbuPhcCdZ6XDP0_F8" \ "rkXds2vE4X-ncOIM8hAYHHi29NX0mcKiRaD0-D-ljQTP-cFPgwCp6X-nZZd9OHBv" \ "-B3oWh2TbqmScqXMR4gp_A." \ "AxY8DCtDaGlsbGljb3RoZQ." \ "[AWS-SECRET-REMOVED]OGY." \ "9hH0vgRfYgPnAHOd8stkvw" E_A2_ex = {'key': jwk.JWK(**E_A2_key), 'protected': base64url_decode(E_A2_protected), 'plaintext': E_A2_plaintext, 'vector': E_A2_vector} E_A3_plaintext = "Live long and prosper." E_A3_protected = "[AWS-SECRET-REMOVED]LUhTMjU2In0" E_A3_key = {"kty": "oct", "k": "GawgguFyGrWKav7AX4VKUg"} E_A3_vector = \ "[AWS-SECRET-REMOVED]LUhTMjU2In0." \ "[AWS-SECRET-REMOVED]THzBC2IlrT1oOQ." \ "AxY8DCtDaGlsbGljb3RoZQ." \ "[AWS-SECRET-REMOVED]OGY." \ "U0m_YmjN04DJvceFICbCVQ" E_A3_ex = {'key': jwk.JWK(**E_A3_key), 'protected': base64url_decode(E_A3_protected).decode('utf-8'), 'plaintext': E_A3_plaintext, 'vector': E_A3_vector} E_A4_protected = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0" E_A4_unprotected = {"jku": "https://server.example.com/keys.jwks"} E_A4_vector = \ '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ '"recipients":[' \ '{"header":{"alg":"RSA1_5","kid":"2011-04-29"},' \ '"encrypted_key":'\ '"UGhIOguC7IuEvf_[AWS-SECRET-REMOVED]w7Zx0-' \ 'kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgNZ__deLKx' \ 'GHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRirb6Y5Cl_p-ko3' \ '[AWS-SECRET-REMOVED]I5ng8OtvzlV7elprCbuPh' \ 'cCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0mcKiRaD0-D-ljQTP-cFPg' \ 'wCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A"},' \ '{"header":{"alg":"A128KW","kid":"7"},' \ '"encrypted_key":' \ '"[AWS-SECRET-REMOVED]THzBC2IlrT1oOQ"}],' \ '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ '"ciphertext":"[AWS-SECRET-REMOVED]OGY",' \ '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' E_A4_ex = {'key1': jwk.JWK(**E_A2_key), 'header1': '{"alg":"RSA1_5","kid":"2011-04-29"}', 'key2': jwk.JWK(**E_A3_key), 'header2': '{"alg":"A128KW","kid":"7"}', 'protected': base64url_decode(E_A4_protected), 'unprotected': json_encode(E_A4_unprotected), 'plaintext': E_A3_plaintext, 'vector': E_A4_vector} E_A5_ex = \ '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ '"header":{"alg":"A128KW","kid":"7"},' \ '"encrypted_key":' \ '"[AWS-SECRET-REMOVED]THzBC2IlrT1oOQ",' \ '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ '"ciphertext":"[AWS-SECRET-REMOVED]OGY",' \ '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' customhdr_jwe_ex = \ '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ '"header":{"alg":"A128KW","kid":"7", "custom1":"custom_val"},' \ '"encrypted_key":' \ '"[AWS-SECRET-REMOVED]THzBC2IlrT1oOQ",' \ '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ '"ciphertext":"[AWS-SECRET-REMOVED]OGY",' \ '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' Issue_136_Protected_Header_no_epk = { "alg": "ECDH-ES+A256KW", "enc": "A256CBC-HS512"} Issue_136_Contributed_JWE = \ "[AWS-SECRET-REMOVED]IkEyNTZDQkMtSFM1MTIiLCJr" \ "[AWS-SECRET-REMOVED]Y3J2IjoiUC0yNTYiLCJ4Ijoi" \ "[AWS-SECRET-REMOVED]WDhvTE9EOXFrN2NhTSIsInki" \ "[AWS-SECRET-REMOVED]MEEtU0JicEQ5NFBOb3NNIn19" \ ".wG51hYE_Vma8tvFKVyeZs4lsHhXiarEw3-59eWHPmhRflDAKrMvnBw1urezo_Bz" \ "ZyPJ76m42ORQPbhEu5NvbJk3vgdgcp03j" \ ".lRttW8r6P6zM0uYDQt0EjQ.qnOnz7biCbqdLEdUH3acMamFm-cBRCSTFb83tNPrgDU" \ ".[AWS-SECRET-REMOVED]2hg" Issue_136_Contributed_Key = { "alg": "ECDH-ES+A128KW", "crv": "P-256", "d": "[AWS-SECRET-REMOVED]0l8", "kid": "key1", "kty": "EC", "x": "FPrb_xwxe8SBP3kO-e-WsofFp7n5-yc_tGgfAvqAP8g", "y": "lM3HuyKMYUVsYdGqiWlkwTZbGO3Fh-hyadq8lfkTgBc"} # RFC 8037 A.1 E_Ed25519 = { 'key_json': '{"kty": "OKP",' '"crv": "Ed25519", ' '"d": "nWGxne_9WmC6hEr0kuwsxERJxWl7MmkZcDusAxyuf2A", ' '"x": "11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo"}', 'payload': 'Example of Ed25519 signing', 'protected_header': {"alg": "EdDSA"}, 'jws_serialization_compact': 'eyJhbGciOiJFZERTQSJ9.RXhhbXBsZSBvZiBF' 'ZDI1NTE5IHNpZ25pbmc.hgyY0il_MGCjP0Jzl' 'nLWG1PPOt7-09PGcvMg3AIbQR6dWbhijcNR4ki' '4iylGjg5BhVsPt9g7sVvpAr_MuM0KAg'} X25519_Protected_Header_no_epk = { "alg": "ECDH-ES+A128KW", "enc": "A128GCM"} class TestJWE(unittest.TestCase): def check_enc(self, plaintext, protected, key, vector): e = jwe.JWE(plaintext, protected, algs=jwe_algs_and_rsa1_5) e.add_recipient(key) # Encrypt and serialize using compact enc = e.serialize() # And test that we can decrypt our own e.deserialize(enc, key) # Now test the Spec Test Vector e.deserialize(vector, key) def test_A1(self): self.check_enc(E_A1_ex['plaintext'], E_A1_ex['protected'], E_A1_ex['key'], E_A1_ex['vector']) def test_A2(self): self.check_enc(E_A2_ex['plaintext'], E_A2_ex['protected'], E_A2_ex['key'], E_A2_ex['vector']) def test_A3(self): self.check_enc(E_A3_ex['plaintext'], E_A3_ex['protected'], E_A3_ex['key'], E_A3_ex['vector']) def test_A4(self): e = jwe.JWE(E_A4_ex['plaintext'], E_A4_ex['protected'], algs=jwe_algs_and_rsa1_5) e.add_recipient(E_A4_ex['key1'], E_A4_ex['header1']) e.add_recipient(E_A4_ex['key2'], E_A4_ex['header2']) enc = e.serialize() e.deserialize(enc, E_A4_ex['key1']) e.deserialize(enc, E_A4_ex['key2']) # Now test the Spec Test Vector e.deserialize(E_A4_ex['vector'], E_A4_ex['key1']) e.deserialize(E_A4_ex['vector'], E_A4_ex['key2']) def test_A5(self): e = jwe.JWE(algs=jwe_algs_and_rsa1_5) e.deserialize(E_A5_ex, E_A4_ex['key2']) with self.assertRaises(jwe.InvalidJWEData): e = jwe.JWE(algs=['A256KW']) e.deserialize(E_A5_ex, E_A4_ex['key2']) def test_compact_protected_header(self): """Compact representation requires a protected header""" e = jwe.JWE(E_A1_ex['plaintext']) e.add_recipient(E_A1_ex['key'], E_A1_ex['protected']) with self.assertRaises(jwe.InvalidJWEOperation): e.serialize(compact=True) def test_compact_invalid_header(self): with self.assertRaises(jwe.InvalidJWEOperation): e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'], aad='XYZ', recipient=E_A1_ex['key']) e.serialize(compact=True) with self.assertRaises(jwe.InvalidJWEOperation): e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'], unprotected='{"jku":"https://example.com/keys.jwks"}', recipient=E_A1_ex['key']) e.serialize(compact=True) def test_JWE_Issue_136(self): plaintext = "plain" protected = json_encode(Issue_136_Protected_Header_no_epk) key = jwk.JWK.generate(kty='EC', crv='P-521') e = jwe.JWE(plaintext, protected) e.add_recipient(key) enc = e.serialize() e.deserialize(enc, key) self.assertEqual(e.payload, plaintext.encode('utf-8')) e = jwe.JWE() e.deserialize(Issue_136_Contributed_JWE, jwk.JWK(**Issue_136_Contributed_Key)) def test_customhdr_jwe(self): def jwe_chk1(jwobj): return jwobj.jose_header['custom1'] == 'custom_val' newhdr = JWSEHeaderParameter('Custom header 1', False, True, jwe_chk1) newreg = {'custom1': newhdr} e = jwe.JWE(header_registry=newreg) e.deserialize(customhdr_jwe_ex, E_A4_ex['key2']) def jwe_chk2(jwobj): return jwobj.jose_header['custom1'] == 'custom_not' newhdr = JWSEHeaderParameter('Custom header 1', False, True, jwe_chk2) newreg = {'custom1': newhdr} e = jwe.JWE(header_registry=newreg) with self.assertRaises(jwe.InvalidJWEData): e.deserialize(customhdr_jwe_ex, E_A4_ex['key2']) def test_customhdr_jwe_exists(self): newhdr = JWSEHeaderParameter('Custom header 1', False, True, None) newreg = {'alg': newhdr} with self.assertRaises(InvalidJWSERegOperation): jwe.JWE(header_registry=newreg) def test_X25519_ECDH(self): plaintext = b"plain" protected = json_encode(X25519_Protected_Header_no_epk) if 'X25519' in jwk.ImplementedOkpCurves: x25519key = jwk.JWK.generate(kty='OKP', crv='X25519') e1 = jwe.JWE(plaintext, protected) e1.add_recipient(x25519key) enc = e1.serialize() e2 = jwe.JWE() e2.deserialize(enc, x25519key) self.assertEqual(e2.payload, plaintext) def test_decrypt_keyset(self): ks = jwk.JWKSet() key1 = jwk.JWK.generate(kty='oct', alg='A128KW', kid='key1') key2 = jwk.JWK.generate(kty='oct', alg='A192KW', kid='key2') key3 = jwk.JWK.generate(kty='oct', alg='A256KW', kid='key3') ks.add(key1) ks.add(key2) e1 = jwe.JWE(plaintext=b'secret') e1.add_recipient(key1, '{"alg":"A128KW","enc":"A128GCM"}') e2 = jwe.JWE() e2.deserialize(e1.serialize(), ks) self.assertEqual(e2.payload, b'secret') e3 = jwe.JWE(plaintext=b'secret') e3.add_recipient(key3, '{"alg":"A256KW","enc":"A256GCM"}') e4 = jwe.JWE() with self.assertRaises(JWKeyNotFound): e4.deserialize(e3.serialize(), ks) MMA_vector_key = jwk.JWK(**E_A2_key) MMA_vector_ok_cek = \ '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ '"recipients":[' \ '{"header":{"alg":"RSA1_5","kid":"2011-04-29"},' \ '"encrypted_key":'\ '"UGhIOguC7IuEvf_[AWS-SECRET-REMOVED]w7Zx0-' \ 'kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgNZ__deLKx' \ 'GHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRirb6Y5Cl_p-ko3' \ '[AWS-SECRET-REMOVED]I5ng8OtvzlV7elprCbuPh' \ 'cCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0mcKiRaD0-D-ljQTP-cFPg' \ 'wCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A"}],' \ '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ '"ciphertext":"[AWS-SECRET-REMOVED]OGY",' \ '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' MMA_vector_ko_cek = \ '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ '"recipients":[' \ '{"header":{"alg":"RSA1_5","kid":"2011-04-29"},' \ '"encrypted_key":'\ '"UGhIOguC7IuEvf_[AWS-SECRET-REMOVED]w7Zx0-' \ 'kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgNZ__deLKx' \ 'GHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRirb6Y5Cl_p-ko3' \ '[AWS-SECRET-REMOVED]I5ng8OtvzlV7elprCbuPh' \ 'cCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0mcKiRaD0-D-ljQTP-cFPg' \ 'wCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A"}],' \ '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ '"ciphertext":"[AWS-SECRET-REMOVED]OGY",' \ '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' class TestMMA(unittest.TestCase): @classmethod def setUpClass(cls): import os cls.enableMMA = os***REMOVED***iron.get('JWCRYPTO_TESTS_ENABLE_MMA', False) cls.iterations = 500 cls.sub_iterations = 100 def test_MMA(self): if self.enableMMA: print('Testing MMA timing attacks') ok_cek = 0 ok_e = jwe.JWE(algs=jwe_algs_and_rsa1_5) ok_e.deserialize(MMA_vector_ok_cek) ko_cek = 0 ko_e = jwe.JWE(algs=jwe_algs_and_rsa1_5) ko_e.deserialize(MMA_vector_ko_cek) import time counter = getattr(time, 'perf_counter', time.time) for _ in range(self.iterations): start = counter() for _ in range(self.sub_iterations): with self.assertRaises(jwe.InvalidJWEData): ok_e.decrypt(MMA_vector_key) stop = counter() ok_cek += (stop - start) / self.sub_iterations start = counter() for _ in range(self.sub_iterations): with self.assertRaises(jwe.InvalidJWEData): ko_e.decrypt(MMA_vector_key) stop = counter() ko_cek += (stop - start) / self.sub_iterations ok_cek /= self.iterations ko_cek /= self.iterations deviation = ((ok_cek - ko_cek) / ok_cek) * 100 print('MMA ok cek: {}'.format(ok_cek)) print('MMA ko cek: {}'.format(ko_cek)) print('MMA deviation: {}% ({})'.format(int(deviation), deviation)) self.assertLess(deviation, 2) # RFC 7519 A1_header = { "alg": "RSA1_5", "enc": "A128CBC-HS256"} A1_claims = { "iss": "joe", "exp": 1300819380, "http://example.com/is_root": True} A1_token = \ "[AWS-SECRET-REMOVED]LUhTMjU2In0." + \ "[AWS-SECRET-REMOVED]JflSdv1iNqhWXaKH4MqAkQtM" + \ "oNfABIPJaZm0HaA415sv3aeuBWnD8J-Ui7Ah6cWafs3ZwwFKDFUUsWHSK-IPKxLG" + \ "TkND09XyjORj_CHAgOPJ-Sd8ONQRnJvWn_hXV1BNMHzUjPyYwEsRhDhzjAD26ima" + \ "sOTsgruobpYGoQcXUwFDn7moXPRfDE8-NoQX7N7ZYMmpUDkR-Cx9obNGwJQ3nM52" + \ "[AWS-SECRET-REMOVED]8v1K4krB8xgKvRU8kgFrEn_a" + \ "1rZgN5TiysnmzTROF869lQ." + \ "AxY8DCtDaGlsbGljb3RoZQ." + \ "[AWS-SECRET-REMOVED]Z72deHxz3roJDXQyhxx0wKaM" + \ "[AWS-SECRET-REMOVED]3J8." + \ "fiK51VwhsxJ-siBMR-YFiA" A2_token = \ "[AWS-SECRET-REMOVED]LUhTMjU2IiwiY3R5IjoiSldU" + \ "In0." + \ "g_hEwksO1Ax8Qn7HoN-BVeBoa8FXe0kpyk_XdcSmxvcM5_P296JXXtoHISr_DD_M" + \ "qewaQSH4dZOQHoUgKLeFly-9RI11TG-_Ge1bZFazBPwKC5lJ6OLANLMd0QSL4fYE" + \ "b9ERe-epKYE3xb2jfY1AltHqBO-PM6j23Guj2yDKnFv6WO72tteVzm_2n17SBFvh" + \ "DuR9a2nHTE67pe0XGBUS_TK7ecA-iVq5COeVdJR4U4VZGGlxRGPLRHvolVLEHx6D" + \ "YyLpw30Ay9R6d68YCLi9FYTq3hIXPK_-dmPlOUlKvPr1GgJzRoeC9G5qCvdcHWsq" + \ "JGTO_z3Wfo5zsqwkxruxwA." + \ "UmVkbW9uZCBXQSA5ODA1Mg." + \ "VwHERHPvCNcHHpTjkoigx3_ExK0Qc71RMEParpatm0X_qpg-w8kozSjfNIPPXiTB" + \ "BLXR65CIPkFqz4l1Ae9w_uowKiwyi9acgVztAi-pSL8GQSXnaamh9kX1mdh3M_TT" + \ "-FZGQFQsFhu0Z72gJKGdfGE-[AWS-SECRET-REMOVED]" + \ "l56pPfAtrjEYw-7ygeMkwBl6Z_mLS6w6xUgKlvW6ULmkV-uLC4FUiyKECK4e3WZY" + \ "Kw1bpgIqGYsw2v_grHjszJZ-_I5uM-9RA8ycX9KqPRp9gc6pXmoU_-27ATs9XCvr" + \ "ZXUtK2902AUzqpeEUJYjWWxSNsS-r1TJ1I-FMJ4XyAiGrfmo9hQPcNBYxPz3GQb2" + \ "[AWS-SECRET-REMOVED]JaKe_nNJgNliWtWpJ_ebuOpE" + \ "[AWS-SECRET-REMOVED]UN5wuqU955vOBvfkBOh5A11U" + \ "zBuo2WlgZ6hYi9-e3w29bR0C2-pp3jbqxEDw3iWaf2dc5b-LnR0FEYXvI_tYk5rd" + \ "_[AWS-SECRET-REMOVED]bQ." + \ "AVO9iT5AV4CzvDJCdhSFlQ" class TestJWT(unittest.TestCase): def test_A1(self): key = jwk.JWK(**E_A2_key) # first encode/decode ourselves t = jwt.JWT(A1_header, A1_claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() t.deserialize(token) # then try the test vector t = jwt.JWT(jwt=A1_token, key=key, check_claims=False, algs=jwe_algs_and_rsa1_5) # then try the test vector with explicit expiration date t = jwt.JWT(jwt=A1_token, key=key, check_claims={'exp': 1300819380}, algs=jwe_algs_and_rsa1_5) # Finally check it raises for expired tokens self.assertRaises(jwt.JWTExpired, jwt.JWT, jwt=A1_token, key=key, algs=jwe_algs_and_rsa1_5) def test_A2(self): sigkey = jwk.JWK(**A2_example['key']) touter = jwt.JWT(jwt=A2_token, key=E_A2_ex['key'], algs=jwe_algs_and_rsa1_5) tinner = jwt.JWT(jwt=touter.claims, key=sigkey, check_claims=False) self.assertEqual(A1_claims, json_decode(tinner.claims)) # Test Exception throwing when token is encrypted with # algorithms not in the allowed set with self.assertRaises(jwe.InvalidJWEData): jwt.JWT(jwt=A2_token, key=E_A2_ex['key'], algs=['A192KW', 'A192CBC-HS384', 'RSA1_5']) def test_decrypt_keyset(self): key = jwk.JWK(kid='testkey', **E_A2_key) keyset = jwk.JWKSet.from_json(json_encode(PrivateKeys)) # encrypt a new JWT with kid header = copy.copy(A1_header) header['kid'] = 'testkey' t = jwt.JWT(header, A1_claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() # try to decrypt without a matching key self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5, check_claims={'exp': 1300819380}) # now decrypt with key keyset.add(key) jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5, check_claims={'exp': 1300819380}) # encrypt a new JWT with wrong kid header = copy.copy(A1_header) header['kid'] = '1' t = jwt.JWT(header, A1_claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5) keyset = jwk.JWKSet.from_json(json_encode(PrivateKeys)) # encrypt a new JWT with no kid header = copy.copy(A1_header) t = jwt.JWT(header, A1_claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() # try to decrypt without a matching key self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5, check_claims={'exp': 1300819380}) # now decrypt with key keyset.add(key) jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5, check_claims={'exp': 1300819380}) def test_decrypt_keyset_dup_kid(self): keyset = jwk.JWKSet.from_json(json_encode(PrivateKeys)) # add wrong key with duplicate kid key = jwk.JWK(kid='testkey', **E_A3_key) keyset.add(key) # encrypt a new JWT with kid key = jwk.JWK(kid='testkey', **E_A2_key) header = copy.copy(A1_header) header['kid'] = 'testkey' t = jwt.JWT(header, A1_claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() # try to decrypt without a matching key with self.assertRaises(jwt.JWTMissingKey): jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5, check_claims={'exp': 1300819380}) # add right key keyset.add(key) # now decrypt with key jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5, check_claims={'exp': 1300819380}) def test_invalid_claim_type(self): key = jwk.JWK(**E_A2_key) claims = {"testclaim": "test"} claims.update(A1_claims) t = jwt.JWT(A1_header, claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() # Wrong string self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, key=key, algs=jwe_algs_and_rsa1_5, check_claims={"testclaim": "ijgi"}) # Wrong type self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, key=key, algs=jwe_algs_and_rsa1_5, check_claims={"testclaim": 123}) # Correct jwt.JWT(jwt=token, key=key, algs=jwe_algs_and_rsa1_5, check_claims={"testclaim": "test"}) def test_claim_params(self): key = jwk.JWK(**E_A2_key) default_claims = {"iss": "test", "exp": None} string_claims = '{"string_claim":"test"}' string_header = '{"alg":"RSA1_5","enc":"A128CBC-HS256"}' t = jwt.JWT(string_header, string_claims, default_claims=default_claims, algs=jwe_algs_and_rsa1_5) t.make_encrypted_token(key) token = t.serialize() # Check default_claims jwt.JWT(jwt=token, key=key, algs=jwe_algs_and_rsa1_5, check_claims={"iss": "test", "exp": None, "string_claim": "test"}) def test_claims_typ(self): key = jwk.JWK().generate(kty='oct') claims = '{"typ":"application/test"}' string_header = '{"alg":"HS256"}' t = jwt.JWT(string_header, claims) t.make_signed_token(key) token = t.serialize() # Same typ w/o application prefix jwt.JWT(jwt=token, key=key, check_claims={"typ": "test"}) self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, key=key, check_claims={"typ": "wrong"}) # Same typ w/ application prefix jwt.JWT(jwt=token, key=key, check_claims={"typ": "application/test"}) self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, key=key, check_claims={"typ": "application/wrong"}) # check that a '/' in the name makes it not be matched with # 'application/' prefix claims = '{"typ":"diffmime/test"}' t = jwt.JWT(string_header, claims) t.make_signed_token(key) token = t.serialize() self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, key=key, check_claims={"typ": "application/test"}) self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token, key=key, check_claims={"typ": "test"}) # finally make sure it doesn't raise if not checked. jwt.JWT(jwt=token, key=key) def test_empty_claims(self): key = jwk.JWK().generate(kty='oct') # empty dict is valid t = jwt.JWT('{"alg":"HS256"}', {}) self.assertEqual('{}', t.claims) t.make_signed_token(key) token = t.serialize() _jwt = jwt.JWT() _jwt.deserialize(token, key) self.assertEqual('{}', _jwt.claims) # empty string is also valid t = jwt.JWT('{"alg":"HS256"}', '') t.make_signed_token(key) token = t.serialize() # also a space is fine t = jwt.JWT('{"alg":"HS256"}', ' ') self.assertEqual(' ', t.claims) t.make_signed_token(key) token = t.serialize() _jwt = jwt.JWT() _jwt.deserialize(token, key) self.assertEqual(' ', _jwt.claims) def test_Issue_209(self): key = jwk.JWK(**A3_key) t = jwt.JWT('{"alg":"ES256"}', {}) t.make_signed_token(key) token = t.serialize() ks = jwk.JWKSet() ks.add(jwk.JWK().generate(kty='oct')) ks.add(key) # Make sure this one does not assert when cycling through # the oct key before hitting the ES one jwt.JWT(jwt=token, key=ks) def test_Issue_277(self): claims = {"aud": ["www.example.com", "www.test.net"]} key = jwk.JWK(generate='oct', size=256) token = jwt.JWT(header={"alg": "HS256"}, claims=claims) token.make_signed_token(key) sertok = token.serialize() jwt.JWT(key=key, jwt=sertok, check_claims={"aud": "www.example.com"}) jwt.JWT(key=key, jwt=sertok, check_claims={"aud": "www.test.net"}) jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.example.com"]}) jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.test.net"]}) jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.example.com", "www.test.net"]}) jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.example.com", "nomatch"]}) self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, key=key, jwt=sertok, check_claims={"aud": "nomatch"}) self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, key=key, jwt=sertok, check_claims={"aud": ["nomatch"]}) self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, key=key, jwt=sertok, check_claims={"aud": ["nomatch", "failmatch"]}) def test_unexpected(self): key = jwk.JWK(generate='oct', size=256) claims = {"testclaim": "test"} token = jwt.JWT(header={"alg": "HS256"}, claims=claims) token.make_signed_token(key) sertok = token.serialize() token.validate(key) token.expected_type = "JWS" token.validate(key) token.expected_type = "JWE" with self.assertRaises(TypeError): token.validate(key) jwt.JWT(jwt=sertok, key=key) jwt.JWT(jwt=sertok, key=key, expected_type='JWS') with self.assertRaises(TypeError): jwt.JWT(jwt=sertok, key=key, expected_type='JWE') jwt.JWT(jwt=sertok, algs=['HS256'], key=key) key.use = 'sig' jwt.JWT(jwt=sertok, key=key) key.use = 'enc' with self.assertRaises(TypeError): jwt.JWT(jwt=sertok, key=key) key.use = None key.key_ops = 'verify' jwt.JWT(jwt=sertok, key=key) key.key_ops = ['sign', 'verify'] jwt.JWT(jwt=sertok, key=key) key.key_ops = 'decrypt' with self.assertRaises(TypeError): jwt.JWT(jwt=sertok, key=key) key.key_ops = ['encrypt', 'decrypt'] with self.assertRaises(TypeError): jwt.JWT(jwt=sertok, key=key) key.key_ops = None token = jwt.JWT(header={"alg": "A256KW", "enc": "A256GCM"}, claims=claims) token.make_encrypted_token(key) enctok = token.serialize() # test workaround for older applications jwt.JWT_expect_type = False jwt.JWT(jwt=enctok, key=key) jwt.JWT_expect_type = True token.validate(key) token.expected_type = "JWE" token.validate(key) token.expected_type = "JWS" with self.assertRaises(TypeError): token.validate(key) jwt.JWT(jwt=enctok, key=key, expected_type='JWE') with self.assertRaises(TypeError): jwt.JWT(jwt=enctok, key=key) with self.assertRaises(TypeError): jwt.JWT(jwt=enctok, key=key, expected_type='JWS') jwt.JWT(jwt=enctok, algs=['A256KW', 'A256GCM'], key=key) key.use = 'enc' jwt.JWT(jwt=enctok, key=key) key.use = 'sig' with self.assertRaises(TypeError): jwt.JWT(jwt=enctok, key=key) key.use = None key.key_ops = 'verify' with self.assertRaises(TypeError): jwt.JWT(jwt=enctok, key=key) key.key_ops = ['sign', 'verify'] with self.assertRaises(TypeError): jwt.JWT(jwt=enctok, key=key) key.key_ops = 'decrypt' jwt.JWT(jwt=enctok, key=key) key.key_ops = ['encrypt', 'decrypt'] jwt.JWT(jwt=enctok, key=key) key.key_ops = None class ConformanceTests(unittest.TestCase): def test_unknown_key_params(self): key = jwk.JWK(kty='oct', k='secret', unknown='mystery') self.assertEqual('mystery', key.get('unknown')) def test_key_ops_values(self): self.assertRaises(jwk.InvalidJWKValue, jwk.JWK, kty='RSA', n=1, key_ops=['sign'], use='enc') self.assertRaises(jwk.InvalidJWKValue, jwk.JWK, kty='RSA', n=1, key_ops=['sign', 'sign']) def test_jwe_no_protected_header(self): enc = jwe.JWE(plaintext='plain') enc.add_recipient(jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)), '{"alg":"A128KW","enc":"A128GCM"}') def test_jwe_no_alg_in_jose_headers(self): enc = jwe.JWE(plaintext='plain') self.assertRaises(jwe.InvalidJWEData, enc.add_recipient, jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)), '{"enc":"A128GCM"}') def test_jwe_no_enc_in_jose_headers(self): enc = jwe.JWE(plaintext='plain') self.assertRaises(jwe.InvalidJWEData, enc.add_recipient, jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)), '{"alg":"A128KW"}') def test_aes_128(self): enc = jwe.JWE(plaintext='plain') key128 = jwk.JWK(kty='oct', k=base64url_encode(b'A' * (128 // 8))) enc.add_recipient(key128, '{"alg":"A128KW","enc":"A128CBC-HS256"}') enc.add_recipient(key128, '{"alg":"A128KW","enc":"A128GCM"}') def test_aes_192(self): enc = jwe.JWE(plaintext='plain') key192 = jwk.JWK(kty='oct', k=base64url_encode(b'B' * (192 // 8))) enc.add_recipient(key192, '{"alg":"A192KW","enc":"A192CBC-HS384"}') enc.add_recipient(key192, '{"alg":"A192KW","enc":"A192GCM"}') def test_aes_256(self): enc = jwe.JWE(plaintext='plain') key256 = jwk.JWK(kty='oct', k=base64url_encode(b'C' * (256 // 8))) enc.add_recipient(key256, '{"alg":"A256KW","enc":"A256CBC-HS512"}') enc.add_recipient(key256, '{"alg":"A256KW","enc":"A256GCM"}') def test_jws_loopback(self): sign = jws.JWS(payload='message') sign.add_signature(jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)), alg="HS512") o = sign.serialize() check = jws.JWS() check.deserialize(o, jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)), alg="HS512") self.assertTrue(check.objects['valid']) def test_jws_headers_as_dicts(self): sign = jws.JWS(payload='message') key = jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)) sign.add_signature(key, protected={'alg': 'HS512'}, header={'kid': key.thumbprint()}) o = sign.serialize() check = jws.JWS() check.deserialize(o, key, alg="HS512") self.assertTrue(check.objects['valid']) self.assertEqual(check.jose_header['kid'], key.thumbprint()) def test_jwe_headers_as_dicts(self): enc = jwe.JWE(plaintext='message', protected={"alg": "A256KW", "enc": "A256CBC-HS512"}) key = jwk.JWK(kty='oct', k=base64url_encode(b'A' * 32)) enc.add_recipient(key, {'kid': key.thumbprint()}) o = enc.serialize() check = jwe.JWE() check.deserialize(o) check.decrypt(key) self.assertEqual(check.payload, b'message') self.assertEqual( json_decode(check.objects['header'])['kid'], key.thumbprint()) def test_jwe_default_recipient(self): key = jwk.JWK(kty='oct', k=base64url_encode(b'A' * (128 // 8))) enc = jwe.JWE(plaintext='plain', protected='{"alg":"A128KW","enc":"A128GCM"}', recipient=key).serialize() check = jwe.JWE() check.deserialize(enc, key) self.assertEqual(b'plain', check.payload) def test_none_key(self): e = "eyJhbGciOiJub25lIn0." + \ "[AWS-SECRET-REMOVED]b20vaXNfcm9vdCI6dHJ1ZX0." token = jwt.JWT(algs=['none']) k = jwk.JWK(generate='oct', size=0) token.deserialize(jwt=e, key=k) self.assertEqual(json_decode(token.claims), {"iss": "joe", "http://example.com/is_root": True}) with self.assertRaises(KeyError): token = jwt.JWT() token.deserialize(jwt=e) json_decode(token.claims) def test_no_default_rsa_1_5(self): s = jws.JWS('test') with self.assertRaisesRegex(jws.InvalidJWSOperation, 'Algorithm not allowed'): s.add_signature(A2_key, alg="RSA1_5") def test_pbes2_hs256_aeskw(self): enc = jwe.JWE(plaintext='plain', protected={"alg": "PBES2-HS256+A128KW", "enc": "A256CBC-HS512"}) key = jwk.JWK.from_password('password') enc.add_recipient(key) o = enc.serialize() check = jwe.JWE() check.deserialize(o) check.decrypt(key) self.assertEqual(check.payload, b'plain') def test_pbes2_hs256_aeskw_custom_params(self): enc = jwe.JWE(plaintext='plain', protected={"alg": "PBES2-HS256+A128KW", "enc": "A256CBC-HS512", "p2c": 4096, "p2s": base64url_encode("A" * 16)}) key = jwk.JWK.from_password('password') enc.add_recipient(key) o = enc.serialize() check = jwe.JWE() check.deserialize(o) check.decrypt(key) self.assertEqual(check.payload, b'plain') enc = jwe.JWE(plaintext='plain', protected={"alg": "PBES2-HS256+A128KW", "enc": "A256CBC-HS512", "p2c": 4096, "p2s": base64url_encode("A" * 7)}) key = jwk.JWK.from_password('password') self.assertRaises(ValueError, enc.add_recipient, key) # Test p2c iteration checks maxiter = jwa.default_max_pbkdf2_iterations p2cenc = jwe.JWE(plaintext='plain', protected={"alg": "PBES2-HS256+A128KW", "enc": "A256CBC-HS512", "p2c": maxiter + 1, "p2s": base64url_encode("A" * 16)}) with self.assertRaisesRegex(ValueError, 'too large'): p2cenc.add_recipient(key) jwa.default_max_pbkdf2_iterations += 2 p2cenc.add_recipient(key) def test_jwe_decompression_max(self): key = jwk.JWK(kty='oct', k=base64url_encode(b'A' * (128 // 8))) payload = '{"u": "' + "u" * 400000000 + '", "uu":"' \ + "u" * 400000000 + '"}' protected_header = { "alg": "A128KW", "enc": "A128GCM", "typ": "JWE", "zip": "DEF", } enc = jwe.JWE(payload.encode('utf-8'), recipient=key, protected=protected_header).serialize(compact=True) with self.assertRaises(jwe.InvalidJWEData): check = jwe.JWE() check.deserialize(enc) check.decrypt(key) defmax = jwe.default_max_compressed_size jwe.default_max_compressed_size = 1000000000 # ensure we can eraise the limit and decrypt check = jwe.JWE() check.deserialize(enc) check.decrypt(key) jwe.default_max_compressed_size = defmax class JWATests(unittest.TestCase): def test_jwa_create(self): for name, cls in jwa.JWA.algorithms_registry.items(): self.assertEqual(cls.name, name) self.assertIn(cls.algorithm_usage_location, {'alg', 'enc'}) if name == 'ECDH-ES': self.assertIs(cls.keysize, None) elif name == 'EdDSA': self.assertIs(cls.keysize, None) else: self.assertIsInstance(cls.keysize, int) self.assertGreaterEqual(cls.keysize, 0) if cls.algorithm_use == 'sig': with self.assertRaises(jwa.InvalidJWAAlgorithm): jwa.JWA.encryption_alg(name) with self.assertRaises(jwa.InvalidJWAAlgorithm): jwa.JWA.keymgmt_alg(name) inst = jwa.JWA.signing_alg(name) self.assertIsInstance(inst, jwa.JWAAlgorithm) self.assertEqual(inst.name, name) elif cls.algorithm_use == 'kex': with self.assertRaises(jwa.InvalidJWAAlgorithm): jwa.JWA.encryption_alg(name) with self.assertRaises(jwa.InvalidJWAAlgorithm): jwa.JWA.signing_alg(name) inst = jwa.JWA.keymgmt_alg(name) self.assertIsInstance(inst, jwa.JWAAlgorithm) self.assertEqual(inst.name, name) elif cls.algorithm_use == 'enc': with self.assertRaises(jwa.InvalidJWAAlgorithm): jwa.JWA.signing_alg(name) with self.assertRaises(jwa.InvalidJWAAlgorithm): jwa.JWA.keymgmt_alg(name) inst = jwa.JWA.encryption_alg(name) self.assertIsInstance(inst, jwa.JWAAlgorithm) self.assertEqual(inst.name, name) else: self.fail((name, cls)) # RFC 7797 rfc7797_e_header = '{"alg":"HS256"}' rfc7797_u_header = '{"alg":"HS256","b64":false,"crit":["b64"]}' rfc7797_payload = "$.02" class TestUnencodedPayload(unittest.TestCase): def test_regular(self): result = \ 'eyJhbGciOiJIUzI1NiJ9.JC4wMg.' + \ '5mvfOroL-g7HyqJoozehmsaqmvTYGEq5jTI1gVvoEoQ' s = jws.JWS(rfc7797_payload) s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), protected=rfc7797_e_header) sig = s.serialize(compact=True) self.assertEqual(sig, result) def test_compat_unencoded(self): result = \ '[AWS-SECRET-REMOVED]aXQiOlsiYjY0Il19..' + \ 'A5dxf2s96_n5FLueVuW1Z_vh161FwXZC4YLPff6dmDY' s = jws.JWS(rfc7797_payload) s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), protected=rfc7797_u_header) # check unencoded payload is in serialized form sig = s.serialize() self.assertEqual(json_decode(sig)['payload'], rfc7797_payload) # check error raises if we try to get compact serialization with self.assertRaises(jws.InvalidJWSOperation): sig = s.serialize(compact=True) # check compact serialization is allowed with detached payload s.detach_payload() sig = s.serialize(compact=True) self.assertEqual(sig, result) def test_detached_payload_verification(self): token = \ '[AWS-SECRET-REMOVED]aXQiOlsiYjY0Il19..' + \ 'A5dxf2s96_n5FLueVuW1Z_vh161FwXZC4YLPff6dmDY' s = jws.JWS() s.deserialize(token) s.verify(jwk.JWK(**SymmetricKeys['keys'][1]), detached_payload=rfc7797_payload) self.assertTrue(s.is_valid) def test_misses_crit(self): s = jws.JWS(rfc7797_payload) with self.assertRaises(jws.InvalidJWSObject): s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), protected={"alg": "HS256", "b64": False}) def test_mismatching_encoding(self): s = jws.JWS(rfc7797_payload) s.add_signature(jwk.JWK(**SymmetricKeys['keys'][0]), protected=rfc7797_e_header) with self.assertRaises(jws.InvalidJWSObject): s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]), protected=rfc7797_u_header) class TestOverloadedOperators(unittest.TestCase): def test_jws_equality(self): key = jwk.JWK.generate(kty='oct', size=256) payload = "My Integrity protected message" signer_a = jws.JWS(payload.encode('utf-8')) signer_b = jws.JWS(payload.encode('utf-8')) self.assertEqual(signer_a, signer_b) signer_a.add_signature(key, None, json_encode({"alg": "HS256"}), json_encode({"kid": key.thumbprint()})) # One is signed, the other is not self.assertNotEqual(signer_a, signer_b) signer_b.add_signature(key, None, json_encode({"alg": "HS256"}), json_encode({"kid": key.thumbprint()})) # This kind of signature is deterministic so they should be equal self.assertEqual(signer_a, signer_b) signer_c = jws.JWS.from_jose_token(signer_a.serialize()) self.assertNotEqual(signer_a, signer_c) signer_c.verify(key) self.assertEqual(signer_a, signer_c) def test_jws_representations(self): key = jwk.JWK.generate(kty='oct', size=256) payload = "My Integrity protected message" token = jws.JWS(payload.encode('utf-8')) self.assertEqual(str(token), "JWS(payload=My Integrity protected message)") self.assertEqual(repr(token), "JWS(payload=My Integrity protected message)") token.add_signature(key, None, json_encode({"alg": "HS256"}), json_encode({"kid": key.thumbprint()})) ser = token.serialize() self.assertEqual(str(token), ser) self.assertEqual(repr(token), f'JWS.from_json_token("{ser}")') def test_jwe_equality(self): key = jwk.JWK.generate(kty='oct', size=256) payload = "My Encrypted message" signer_a = jwe.JWE(payload.encode('utf-8'), json_encode({"alg": "A256KW", "enc": "A256CBC-HS512"})) signer_b = jwe.JWE(payload.encode('utf-8'), json_encode({"alg": "A256KW", "enc": "A256CBC-HS512"})) self.assertEqual(signer_a, signer_b) signer_a.add_recipient(key) # One is encrypted, the other is not self.assertNotEqual(signer_a, signer_b) signer_b.add_recipient(key) # Encryption generates a random CEK so tokens will always differ self.assertNotEqual(signer_a, signer_b) signer_c = jwe.JWE.from_jose_token(signer_a.serialize()) self.assertEqual(signer_a, signer_c) def test_jwe_representations(self): key = jwk.JWK.generate(kty='oct', size=256) payload = "My Encrypted message" token = jwe.JWE(payload.encode('utf-8'), json_encode({"alg": "A256KW", "enc": "A256CBC-HS512"})) strrep = "JWE(plaintext=b\'My Encrypted message\', " + \ "protected={\"alg\":\"A256KW\"," + \ "\"enc\":\"A256CBC-HS512\"}, " + \ "unprotected=None, aad=None, algs=None)" self.assertEqual(str(token), strrep) self.assertEqual(repr(token), strrep) token.add_recipient(key) ser = token.serialize() self.assertEqual(str(token), ser) self.assertEqual(repr(token), f'JWE.from_json_token("{ser}")') def test_jwt_equality(self): key = jwk.JWK.generate(kty='oct', size=256) signer_a = jwt.JWT(header={"alg": "HS256"}, claims={"info": "I'm a signed token"}) signer_b = jwt.JWT(header={"alg": "HS256"}, claims={"info": "I'm a signed token"}) self.assertEqual(signer_a, signer_b) signer_a.make_signed_token(key) # One is signed, the other is not self.assertNotEqual(signer_a, signer_b) signer_b.make_signed_token(key) # This kind of signature is deterministic so they should be equal self.assertEqual(signer_a, signer_b) signer_c = jwt.JWT.from_jose_token(signer_a.serialize()) self.assertNotEqual(signer_a, signer_c) signer_c.validate(key) self.assertEqual(signer_a, signer_c) ea = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"}, claims=signer_a.serialize()) eb = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"}, claims=signer_b.serialize()) self.assertEqual(ea, eb) ea.make_encrypted_token(key) # One is encrypted, the other is not self.assertNotEqual(ea, eb) eb.make_encrypted_token(key) # Encryption generates a random CEK so tokens will always differ self.assertNotEqual(ea, eb) ect = jwt.JWT.from_jose_token(ea.serialize()) self.assertNotEqual(ea, ect) ect.expected_type = "JWE" ect.validate(key) self.assertEqual(ea, ect) def test_jwt_representations(self): key = jwk.JWK.generate(kty='oct', size=256) token = jwt.JWT(header={"alg": "HS256"}, claims={"info": "I'm a signed token"}) strrep = 'JWT(header={"alg":"HS256"}, claims={"info":"I\'m a ' + \ 'signed token"}, jwt=None, key=None, algs=None, ' + \ 'default_claims=None, check_claims=None)' self.assertEqual(str(token), strrep) self.assertEqual(repr(token), strrep) token.make_signed_token(key) ser = token.serialize() self.assertEqual(str(token), ser) ser2 = token.token.serialize() reprrep = 'JWT(header={"alg":"HS256"}, ' + \ 'claims={"info":"I\'m a signed token"}, ' + \ f'jwt=JWS.from_json_token("{ser2}"), key=None, ' + \ 'algs=None, default_claims=None, check_claims=None)' self.assertEqual(repr(token), reprrep)