Random tech thoughts

I better write some things down before there're forgotten

Creating Client Certificates for Kubernetes Apiserver With Python

Kubernetes features a complex, multi-user permissions system to control access to the cluster. Those options are bound to contexts and users can be given access to those contexts. For that, a user must authenticate itself to the api-server and there are various ways for doing that.

The authentication options are documented here

One of those options is authentication through x509 client certificates.

A user wanting to authenticate needs to have a certificate with his/her username as the subject’s commonName. And the certificate needs to be signed by the cluster’s ca. Although the concept is quite easy, creating certificates using the openssl-cli often proves to be difficult. So here is a python script that creates/takes a ca and creates new client certificates for a kubernetes user.

Be sure apiserver runs with --client-ca-file=yourca.crt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python

from socket import gethostname

import yaml
import random
from OpenSSL import crypto
import base64

CA_CERT_FILE = "ca.crt"
CA_KEY_FILE = "ca.key"
CLIENT_CERT_FILE = "client.crt"
CLIENT_KEY_FILE = "client.key"


def generate_self_signed_ca():
    k = crypto.PKey()
    k.generate_key(crypto.TYPE_RSA, 1024)

    # create a self-signed ca
    ca = crypto.X509()
    ca.get_subject().C = "DE"
    ca.get_subject().ST = "Duesseldorf"
    ca.get_subject().L = "Duesseldorf"
    ca.get_subject().O = "Dummy GmbH"
    ca.get_subject().OU = "Dummy GmbH"
    ca.get_subject().CN = gethostname()
    ca.set_serial_number(1000)
    ca.gmtime_adj_notBefore(0)
    ca.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
    ca.set_issuer(ca.get_subject())
    ca.set_pubkey(k)

    ca.add_extensions([
        crypto.X509Extension(b"basicConstraints", True,
                             b"CA:TRUE, pathlen:0"),
        crypto.X509Extension(b"keyUsage", True,
                             b"keyCertSign, cRLSign"),
        crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash",
                             subject=ca),
    ])
    ca.add_extensions([
        crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always", issuer=ca)
    ])

    ca.sign(k, 'sha1')

    open(CA_CERT_FILE, "wb").write(
        crypto.dump_certificate(crypto.FILETYPE_PEM, ca))
    open(CA_KEY_FILE, "wb").write(
        crypto.dump_privatekey(crypto.FILETYPE_PEM, k))

    return ca, k


def load_cert():
    with open(CA_CERT_FILE, "rb") as certfile:
        catext = certfile.read()

    with open(CA_KEY_FILE, "rb") as keyfile:
        keytext = keyfile.read()

    return (
        crypto.load_certificate(crypto.FILETYPE_PEM, catext),
        crypto.load_privatekey(crypto.FILETYPE_PEM, keytext, None)
    )


def generate_client_cert(ca_cert, ca_key, username):
    client_key = crypto.PKey()
    client_key.generate_key(crypto.TYPE_RSA, 2048)

    client_cert = crypto.X509()
    client_cert.set_version(2)
    client_cert.set_serial_number(random.randint(50000000, 100000000))

    client_subj = client_cert.get_subject()
    client_subj.commonName = username
    # client_subj.organizationName = "user-group"

    client_cert.add_extensions([
        crypto.X509Extension(b"basicConstraints", False, b"CA:FALSE"),
        crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=client_cert),
    ])

    client_cert.add_extensions([
        crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always", issuer=ca_cert),
        crypto.X509Extension(b"extendedKeyUsage", False, b"clientAuth"),
        crypto.X509Extension(b"keyUsage", False, b"digitalSignature"),
    ])

    client_cert.gmtime_adj_notBefore(0)
    client_cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)

    client_cert.set_subject(client_subj)

    client_cert.set_issuer(ca_cert.get_issuer())
    client_cert.set_pubkey(client_key)
    client_cert.sign(ca_key, 'sha256')

    with open(CLIENT_CERT_FILE, "wb") as f:
        f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, client_cert))

    with open(CLIENT_KEY_FILE, "wb") as f:
        f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, client_key))

    return client_cert, client_key


def new_client_cert(username):
    ca_cert, ca_key = None, None

    try:
        ca_cert, ca_key = load_cert()
    except Exception as e:
        print(e)
        ca_cert, ca_key = generate_self_signed_ca()

    client_cert, client_key = generate_client_cert(ca_cert, ca_key, username)

    return client_cert, client_key, username


def create_user_config(client_cert, client_key, username):
    with open("admin.conf") as adminconfigtext:
        config = yaml.load(adminconfigtext)

    config["users"][0]["name"] = username
    config["users"][0]["user"]["client-certificate-data"] = base64.b64encode(
        crypto.dump_certificate(crypto.FILETYPE_PEM, client_cert)).decode()
    config["users"][0]["user"]["client-key-data"] = base64.b64encode(
        crypto.dump_privatekey(crypto.FILETYPE_PEM, client_key)).decode()

    config["contexts"][0]["context"]["user"] = username
    config["contexts"][0]["name"] = username + "@kubernetes"
    config["current-context"] = username + "@kubernetes"

    print(config["users"][0]["user"]["client-certificate-data"])

    with open("user.conf", "w") as userconfigtext:
        yaml.dump(config, userconfigtext)


if __name__ == "__main__":
    client_cert, client_key, username = new_client_cert("myusername")
    try:
        create_user_config(client_cert, client_key, username)
    except FileNotFoundError:
        print("admin.conf needed to create user.conf")

If you have a admin.conf file, then the script also generates a kubectl configfile for the user.

1
kubectl --kubeconfig user.conf

Easy peasy. Hope it’ll be useful to someone.

Comments