% TLS session establishment tests

# More informations at http://www.secdev.org/projects/UTscapy/

############
############
+ TLS server automaton tests

### DISCLAIMER: Those tests are slow ###

= Load server util functions
~ open_ssl_client crypto

from __future__ import print_function

import sys, os, re, time, multiprocessing, subprocess

sys.path.append(os.path.abspath("./tls"))

from travis_test_server import *

def test_tls_server(suite="", version=""):
    msg = ("TestS_%s_data" % suite).encode()
    # Run server
    q_ = multiprocessing.Manager().Queue()
    th_ = multiprocessing.Process(target=run_tls_test_server, args=(msg, q_))
    th_.start()
    # Synchronise threads
    q_.get()
    time.sleep(1)
    # Run client
    CA_f = os.path.abspath("./tls/pki/ca_cert.pem")
    p = subprocess.Popen(
        ["openssl", "s_client", "-debug", "-cipher", suite, version, "-CAfile", CA_f],
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
    )
    msg += b"\nstop_server\n"
    out = p.communicate(input=msg)[0]
    print(out.decode())
    if p.returncode != 0:
        th_.terminate()
        raise RuntimeError("OpenSSL returned with error code")
    else:
        p = re.compile(b'verify return:(\d+)')
        _failed = False
        _one_success = False
        for match in p.finditer(out):
            if match.group(1).strip() != b"1":
                _failed = True
                break
            else:
                _one_success = True
        if _failed or not _one_success:
            th_.terminate()
            raise RuntimeError("OpenSSL returned unexpected values")
    # Wait for server
    th_.join(30)
    if th_.is_alive():
        th_.terminate()
        raise RuntimeError("Test timed out")
    # Analyse values
    print(q_.get())
    assert th_.exitcode == 0


= Testing TLS server with TLS 1.0 and TLS_RSA_WITH_RC4_128_SHA
~ open_ssl_client crypto

test_tls_server("RC4-SHA", "-tls1")

= Testing TLS server with TLS 1.1 and TLS_DHE_RSA_WITH_3DES_EDE_CBC_SHA
~ open_ssl_client crypto

test_tls_server("EDH-RSA-DES-CBC3-SHA", "-tls1_1")

= Testing TLS server with TLS 1.2 and TLS_DHE_RSA_WITH_AES_128_CBC_SHA256
~ open_ssl_client crypto

test_tls_server("DHE-RSA-AES128-SHA256", "-tls1_2")

= Testing TLS server with TLS 1.2 and TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
~ open_ssl_client crypto

test_tls_server("ECDHE-RSA-AES256-GCM-SHA384", "-tls1_2")

+ TLS client automaton tests

= Load client utils functions
~ crypto

import sys, os, threading

from scapy.modules.six.moves.queue import Queue

sys.path.append(os.path.abspath("./tls"))

from travis_test_client import *

def perform_tls_client_test(suite, version):
    # Run test_tls_client in an other thread
    q = Queue()
    p = threading.Thread(target=test_tls_client, args=(suite, version, q))
    p.start()
    # Wait for the function to end
    p.join()
    # Analyse data and return
    if not q.empty():
        print(q.get())
    if not q.empty():
        assert q.get() == 0
    else:
        print("ERROR: Missing one of the return value detected !")
        assert False

= Testing TLS server and client with SSLv2 and SSL_CK_DES_192_EDE3_CBC_WITH_MD5
~ crypto

perform_tls_client_test("0700c0", "0002")

= Testing TLS client with SSLv3 and TLS_RSA_EXPORT_WITH_RC4_40_MD5
~ crypto

perform_tls_client_test("0003", "0300")

= Testing TLS client with TLS 1.0 and TLS_DHE_RSA_WITH_CAMELLIA_256_CBC_SHA
~ crypto

perform_tls_client_test("0088", "0301")

= Testing TLS client with TLS 1.1 and TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA
~ crypto

perform_tls_client_test("c013", "0302")

= Testing TLS client with TLS 1.2 and TLS_DHE_RSA_WITH_AES_128_GCM_SHA256
~ crypto

perform_tls_client_test("009e", "0303")

= Testing TLS client with TLS 1.2 and TLS_ECDH_anon_WITH_RC4_128_SHA
~ crypto

perform_tls_client_test("c016", "0303")