#!/usr/bin/env python # Authors: # Trevor Perrin # Kees Bos - Added tests for XML-RPC # Dimitris Moraitis - Anon ciphersuites # Marcelo Fernandez - Added test for NPN # Martin von Loewis - python 3 port # # See the LICENSE file for legal information regarding use of this file. from __future__ import print_function import sys import os import os.path import socket import time import getopt try: from BaseHTTPServer import HTTPServer from SimpleHTTPServer import SimpleHTTPRequestHandler except ImportError: from http.server import HTTPServer, SimpleHTTPRequestHandler from tlslite import TLSConnection, Fault, HandshakeSettings, \ X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \ parsePEMKey, constants, \ AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \ POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \ Checker, __version__ from tlslite.errors import * from tlslite.utils.cryptomath import prngName try: import xmlrpclib except ImportError: # Python 3 from xmlrpc import client as xmlrpclib from tlslite import * try: from tack.structures.Tack import Tack except ImportError: pass def printUsage(s=None): if m2cryptoLoaded: crypto = "M2Crypto/OpenSSL" else: crypto = "Python crypto" if s: print("ERROR: %s" % s) print("""\ntls.py version %s (using %s) Commands: server HOST:PORT DIRECTORY client HOST:PORT DIRECTORY """ % (__version__, crypto)) sys.exit(-1) def testConnClient(conn): b1 = os.urandom(1) b10 = os.urandom(10) b100 = os.urandom(100) b1000 = os.urandom(1000) conn.write(b1) conn.write(b10) conn.write(b100) conn.write(b1000) assert(conn.read(min=1, max=1) == b1) assert(conn.read(min=10, max=10) == b10) assert(conn.read(min=100, max=100) == b100) assert(conn.read(min=1000, max=1000) == b1000) def clientTestCmd(argv): address = argv[0] dir = argv[1] #Split address into hostname/port tuple address = address.split(":") address = ( address[0], int(address[1]) ) def connect(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if hasattr(sock, 'settimeout'): #It's a python 2.3 feature sock.settimeout(5) sock.connect(address) c = TLSConnection(sock) return c test = 0 badFault = False print("Test 0 - anonymous handshake") connection = connect() connection.handshakeClientAnonymous() testConnClient(connection) connection.close() print("Test 1 - good X509 (plus SNI)") connection = connect() connection.handshakeClientCert(serverName=address[0]) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) connection.close() print("Test 1.a - good X509, SSLv3") connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() print("Test 1.b - good X509, RC4-MD5") connection = connect() settings = HandshakeSettings() settings.macNames = ["md5"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5) connection.close() if tackpyLoaded: settings = HandshakeSettings() settings.useExperimentalTackExtension = True print("Test 2.a - good X.509, TACK") connection = connect() connection.handshakeClientCert(settings=settings) assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3") assert(connection.session.tackExt.activation_flags == 1) testConnClient(connection) connection.close() print("Test 2.b - good X.509, TACK unrelated to cert chain") connection = connect() try: connection.handshakeClientCert(settings=settings) assert(False) except TLSLocalAlert as alert: if alert.description != AlertDescription.illegal_parameter: raise connection.close() print("Test 3 - good SRP") connection = connect() connection.handshakeClientSRP("test", "password") testConnClient(connection) connection.close() print("Test 4 - SRP faults") for fault in Fault.clientSrpFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeClientSRP("test", "password") print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True print("Test 6 - good SRP: with X.509 certificate, TLSv1.0") settings = HandshakeSettings() settings.minVersion = (3,1) settings.maxVersion = (3,1) connection = connect() connection.handshakeClientSRP("test", "password", settings=settings) assert(isinstance(connection.session.serverCertChain, X509CertChain)) testConnClient(connection) connection.close() print("Test 7 - X.509 with SRP faults") for fault in Fault.clientSrpFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeClientSRP("test", "password") print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True print("Test 11 - X.509 faults") for fault in Fault.clientNoAuthFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeClientCert() print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True print("Test 14 - good mutual X509") x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) x509Chain = X509CertChain([x509Cert]) s = open(os.path.join(dir, "clientX509Key.pem")).read() x509Key = parsePEMKey(s, private=True) connection = connect() connection.handshakeClientCert(x509Chain, x509Key) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() print("Test 14.a - good mutual X509, SSLv3") connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeClientCert(x509Chain, x509Key, settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() print("Test 15 - mutual X.509 faults") for fault in Fault.clientCertFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeClientCert(x509Chain, x509Key) print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True print("Test 18 - good SRP, prepare to resume... (plus SNI)") connection = connect() connection.handshakeClientSRP("test", "password", serverName=address[0]) testConnClient(connection) connection.close() session = connection.session print("Test 19 - resumption (plus SNI)") connection = connect() connection.handshakeClientSRP("test", "garbage", serverName=address[0], session=session) testConnClient(connection) #Don't close! -- see below print("Test 20 - invalidated resumption (plus SNI)") connection.sock.close() #Close the socket without a close_notify! connection = connect() try: connection.handshakeClientSRP("test", "garbage", serverName=address[0], session=session) assert(False) except TLSRemoteAlert as alert: if alert.description != AlertDescription.bad_record_mac: raise connection.close() print("Test 21 - HTTPS test X.509") address = address[0], address[1]+1 if hasattr(socket, "timeout"): timeoutEx = socket.timeout else: timeoutEx = socket.error while 1: try: time.sleep(2) htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8") fingerprint = None for y in range(2): checker =Checker(x509Fingerprint=fingerprint) h = HTTPTLSConnection(\ address[0], address[1], checker=checker) for x in range(3): h.request("GET", "/index.html") r = h.getresponse() assert(r.status == 200) b = bytearray(r.read()) assert(b == htmlBody) fingerprint = h.tlsSession.serverCertChain.getFingerprint() assert(fingerprint) time.sleep(2) break except timeoutEx: print("timeout, retrying...") pass address = address[0], address[1]+1 implementations = [] if m2cryptoLoaded: implementations.append("openssl") if pycryptoLoaded: implementations.append("pycrypto") implementations.append("python") print("Test 22 - different ciphers, TLSv1.0") for implementation in implementations: for cipher in ["aes128", "aes256", "rc4"]: print("Test 22:", end=' ') connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] settings.minVersion = (3,1) settings.maxVersion = (3,1) connection.handshakeClientCert(settings=settings) testConnClient(connection) print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) connection.close() print("Test 23 - throughput test") for implementation in implementations: for cipher in ["aes128", "aes256", "3des", "rc4"]: if cipher == "3des" and implementation not in ("openssl", "pycrypto"): continue print("Test 23:", end=' ') connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] connection.handshakeClientCert(settings=settings) print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ') startTime = time.clock() connection.write(b"hello"*10000) h = connection.read(min=50000, max=50000) stopTime = time.clock() if stopTime-startTime: print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))) else: print("100K exchanged very fast") assert(h == b"hello"*10000) connection.close() print("Test 24.a - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'http/1.1') connection.close() print("Test 24.b - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() print("Test 24.c - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() print("Test 24.d - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() print("Test 24.e - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/3') connection.close() print("Test 24.f - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'http/1.1') connection.close() print("Test 24.g - Next-Protocol Client Negotiation") connection = connect() connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() print('Test 25 - good standard XMLRPC https client') time.sleep(2) # Hack for lack of ability to set timeout here address = address[0], address[1]+1 server = xmlrpclib.Server('https://%s:%s' % address) assert server.add(1,2) == 3 assert server.pow(2,4) == 16 print('Test 26 - good tlslite XMLRPC client') transport = XMLRPCTransport(ignoreAbruptClose=True) server = xmlrpclib.Server('https://%s:%s' % address, transport) assert server.add(1,2) == 3 assert server.pow(2,4) == 16 print('Test 27 - good XMLRPC ignored protocol') server = xmlrpclib.Server('http://%s:%s' % address, transport) assert server.add(1,2) == 3 assert server.pow(2,4) == 16 print("Test 28 - Internet servers test") try: i = IMAP4_TLS("cyrus.andrew.cmu.edu") i.login("anonymous", "anonymous@anonymous.net") i.logout() print("Test 28: IMAP4 good") p = POP3_TLS("pop.gmail.com") p.quit() print("Test 29: POP3 good") except socket.error as e: print("Non-critical error: socket error trying to reach internet server: ", e) if not badFault: print("Test succeeded") else: print("Test failed") def testConnServer(connection): count = 0 while 1: s = connection.read() count += len(s) if len(s) == 0: break connection.write(s) if count == 1111: break def serverTestCmd(argv): address = argv[0] dir = argv[1] #Split address into hostname/port tuple address = address.split(":") address = ( address[0], int(address[1]) ) #Connect to server lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lsock.bind(address) lsock.listen(5) def connect(): return TLSConnection(lsock.accept()[0]) x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) x509Chain = X509CertChain([x509Cert]) s = open(os.path.join(dir, "serverX509Key.pem")).read() x509Key = parsePEMKey(s, private=True) print("Test 0 - Anonymous server handshake") connection = connect() connection.handshakeServer(anon=True) testConnServer(connection) connection.close() print("Test 1 - good X.509") connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) assert(connection.session.serverName == address[0]) testConnServer(connection) connection.close() print("Test 1.a - good X.509, SSL v3") connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() print("Test 1.b - good X.509, RC4-MD5") connection = connect() settings = HandshakeSettings() settings.macNames = ["sha", "md5"] settings.cipherNames = ["rc4"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() if tackpyLoaded: tack = Tack.createFromPem(open("./TACK1.pem", "rU").read()) tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read()) settings = HandshakeSettings() settings.useExperimentalTackExtension = True print("Test 2.a - good X.509, TACK") connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, tacks=[tack], activationFlags=1, settings=settings) testConnServer(connection) connection.close() print("Test 2.b - good X.509, TACK unrelated to cert chain") connection = connect() try: connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, tacks=[tackUnrelated], settings=settings) assert(False) except TLSRemoteAlert as alert: if alert.description != AlertDescription.illegal_parameter: raise print("Test 3 - good SRP") verifierDB = VerifierDB() verifierDB.create() entry = VerifierDB.makeVerifier("test", "password", 1536) verifierDB["test"] = entry connection = connect() connection.handshakeServer(verifierDB=verifierDB) testConnServer(connection) connection.close() print("Test 4 - SRP faults") for fault in Fault.clientSrpFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeServer(verifierDB=verifierDB) assert() except: pass connection.close() print("Test 6 - good SRP: with X.509 cert") connection = connect() connection.handshakeServer(verifierDB=verifierDB, \ certChain=x509Chain, privateKey=x509Key) testConnServer(connection) connection.close() print("Test 7 - X.509 with SRP faults") for fault in Fault.clientSrpFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeServer(verifierDB=verifierDB, \ certChain=x509Chain, privateKey=x509Key) assert() except: pass connection.close() print("Test 11 - X.509 faults") for fault in Fault.clientNoAuthFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) assert() except: pass connection.close() print("Test 14 - good mutual X.509") connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) testConnServer(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() print("Test 14a - good mutual X.509, SSLv3") connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() print("Test 15 - mutual X.509 faults") for fault in Fault.clientCertFaults + Fault.genericFaults: connection = connect() connection.fault = fault try: connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) assert() except: pass connection.close() print("Test 18 - good SRP, prepare to resume") sessionCache = SessionCache() connection = connect() connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) assert(connection.session.serverName == address[0]) testConnServer(connection) connection.close() print("Test 19 - resumption") connection = connect() connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) assert(connection.session.serverName == address[0]) testConnServer(connection) #Don't close! -- see next test print("Test 20 - invalidated resumption") try: connection.read(min=1, max=1) assert() #Client is going to close the socket without a close_notify except TLSAbruptCloseError as e: pass connection = connect() try: connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) except TLSLocalAlert as alert: if alert.description != AlertDescription.bad_record_mac: raise connection.close() print("Test 21 - HTTPS test X.509") #Close the current listening socket lsock.close() #Create and run an HTTP Server using TLSSocketServerMixIn class MyHTTPServer(TLSSocketServerMixIn, HTTPServer): def handshake(self, tlsConnection): tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) return True cd = os.getcwd() os.chdir(dir) address = address[0], address[1]+1 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) for x in range(6): httpd.handle_request() httpd.server_close() cd = os.chdir(cd) #Re-connect the listening socket lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) address = address[0], address[1]+1 lsock.bind(address) lsock.listen(5) implementations = [] if m2cryptoLoaded: implementations.append("openssl") if pycryptoLoaded: implementations.append("pycrypto") implementations.append("python") print("Test 22 - different ciphers") for implementation in ["python"] * len(implementations): for cipher in ["aes128", "aes256", "rc4"]: print("Test 22:", end=' ') connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) print(connection.getCipherName(), connection.getCipherImplementation()) testConnServer(connection) connection.close() print("Test 23 - throughput test") for implementation in implementations: for cipher in ["aes128", "aes256", "3des", "rc4"]: if cipher == "3des" and implementation not in ("openssl", "pycrypto"): continue print("Test 23:", end=' ') connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) print(connection.getCipherName(), connection.getCipherImplementation()) h = connection.read(min=50000, max=50000) assert(h == b"hello"*10000) connection.write(h) connection.close() print("Test 24.a - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"http/1.1"]) testConnServer(connection) connection.close() print("Test 24.b - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) testConnServer(connection) connection.close() print("Test 24.c - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"http/1.1", b"spdy/2"]) testConnServer(connection) connection.close() print("Test 24.d - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) testConnServer(connection) connection.close() print("Test 24.e - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"]) testConnServer(connection) connection.close() print("Test 24.f - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"spdy/3", b"spdy/2"]) testConnServer(connection) connection.close() print("Test 24.g - Next-Protocol Server Negotiation") connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[]) testConnServer(connection) connection.close() print("Tests 25-27 - XMLRPXC server") address = address[0], address[1]+1 class Server(TLSXMLRPCServer): def handshake(self, tlsConnection): try: tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key, sessionCache=sessionCache) tlsConnection.ignoreAbruptClose = True return True except TLSError as error: print("Handshake failure:", str(error)) return False class MyFuncs: def pow(self, x, y): return pow(x, y) def add(self, x, y): return x + y server = Server(address) server.register_instance(MyFuncs()) #sa = server.socket.getsockname() #print "Serving HTTPS on", sa[0], "port", sa[1] for i in range(6): server.handle_request() print("Test succeeded") if __name__ == '__main__': if len(sys.argv) < 2: printUsage("Missing command") elif sys.argv[1] == "client"[:len(sys.argv[1])]: clientTestCmd(sys.argv[2:]) elif sys.argv[1] == "server"[:len(sys.argv[1])]: serverTestCmd(sys.argv[2:]) else: printUsage("Unknown command: %s" % sys.argv[1])