summaryrefslogtreecommitdiff
path: root/tools/certtools.py
blob: 0a482e35de4a22286d44801879684945074f0384 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import subprocess
import json
import base64
import urllib
import urllib2
import struct
import sys

def get_cert_info(s):
    p = subprocess.Popen(
        ["openssl", "x509", "-noout", "-subject", "-issuer", "-inform", "der"],
        stdin=subprocess.PIPE, stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    parsed = p.communicate(s)
    if parsed[1]:
        print "error:", parsed[1]
    result = {}
    for line in parsed[0].split("\n"):
        (key, sep, value) = line.partition("=")
        if sep == "=":
            result[key] = value
    return result

def get_certs_from_file(certfile):
    certs = []
    cert = ""
    incert = False
    
    for line in open(certfile):
        line = line.strip()
        if line == "-----BEGIN CERTIFICATE-----":
            cert = ""
            incert = True
        elif line == "-----END CERTIFICATE-----":
            certs.append(cert)
            incert = False
        elif incert:
            cert += line
    return certs

def get_root_cert(issuer):
    accepted_certs = \
        json.loads(open("googlelog-accepted-certs.txt").read())["certificates"]
    
    root_cert = None

    for accepted_cert in accepted_certs:
        subject = get_cert_info(base64.decodestring(accepted_cert))["subject"]
        if subject == issuer:
            print "found root cert"
            root_cert = base64.decodestring(accepted_cert)

    return root_cert

def get_sth(baseurl):
    result = urllib2.urlopen(baseurl + "ct/v1/get-sth").read()
    return json.loads(result)

def get_proof_by_hash(baseurl, hash, tree_size):
    try:
        params = urllib.urlencode({"hash":base64.b64encode(hash),
                                   "tree_size":tree_size})
        print params
        result = \
          urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read()
        return json.loads(result)
    except urllib2.HTTPError, e:
        print e.read()
        sys.exit(1)

def tls_array(data, length_len):
    length_bytes = struct.pack(">Q", len(data))[-length_len:]
    return length_bytes + data

def unpack_tls_array(packed_data, length_len):
    padded_length = ["\x00"] * 8
    padded_length[-length_len:] = packed_data[:length_len]
    (length,) = struct.unpack(">Q", "".join(padded_length))
    unpacked_data = packed_data[length_len:length_len+length]
    assert len(unpacked_data) == length
    rest_data = packed_data[length_len+length:]
    return (unpacked_data, rest_data)

def add_chain(baseurl, submission):
    try:
        return json.loads(urllib2.urlopen(baseurl + "ct/v1/add-chain",
                                          json.dumps(submission)).read())
    except urllib2.HTTPError, e:
        print e.read()
        sys.exit(1)

def get_entries(baseurl, start, end):
    try:
        params = urllib.urlencode({"start":start, "end":end})
        result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read()
        return json.loads(result)
    except urllib2.HTTPError, e:
        print e.read()
        sys.exit(1)

def decode_certificate_chain(packed_certchain):
    (unpacked_certchain, rest) = unpack_tls_array(packed_certchain, 3)
    assert len(rest) == 0
    certs = []
    while len(unpacked_certchain):
        (cert, rest) = unpack_tls_array(unpacked_certchain, 3)
        certs.append(cert)
        unpacked_certchain = rest
    return certs