From bfe891000c2d6bb2c73bdc635d22640a3e89e729 Mon Sep 17 00:00:00 2001 From: Ernst Widerberg Date: Thu, 13 Jan 2022 18:10:22 +0100 Subject: Add read/write permissions to JWTs based on YAML - Uses Linus's YAML code, except with password stuff removed since auth-server-poc uses htpasswd. - The collector checks JWT on API endpoints get, get/{key}, and delete/{key}, but not on add. --- auth-server-poc/src/authn.py | 97 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100755 auth-server-poc/src/authn.py (limited to 'auth-server-poc/src/authn.py') diff --git a/auth-server-poc/src/authn.py b/auth-server-poc/src/authn.py new file mode 100755 index 0000000..8b32cdc --- /dev/null +++ b/auth-server-poc/src/authn.py @@ -0,0 +1,97 @@ +#! /usr/bin/env python3 + +import yaml + + +class Authz: + def __init__(self, org, perms): + self._org = org + self._perms = perms + + def dump(self): + return "{}: {}".format(self._org, self._perms) + + def read_p(self): + return "r" in self._perms + + def write_p(self): + return "w" in self._perms + + +class User: + def __init__(self, username, authz): + self._username = username + self._authz = {} + for org, perms in authz.items(): + self._authz[org] = Authz(org, perms) + + def dump(self): + return [ + "{}: {}".format(self._username, auth.dump()) + for auth in self._authz.values() + ] + + def orgnames(self): + return [x for x in self._authz.keys()] + + def read_perms(self): + acc = [] + for k, v in self._authz.items(): + if v.read_p(): + acc.append(k) + return acc + + def write_perms(self): + acc = [] + for k, v in self._authz.items(): + if v.write_p(): + acc.append(k) + return acc + + +class UserDB: + def __init__(self, yamlfile): + self._users = {} + for u, d in yaml.safe_load(open(yamlfile)).items(): + self._users[u] = User(u, d["authz"]) + + def dump(self): + return [u.dump() for u in self._users.values()] + + def orgs_for_user(self, username): + return self._users.get(username).orgnames() + + def read_perms(self, username): + user = self._users.get(username) + if not user: + return None + return user.read_perms() + + def write_perms(self, username): + user = self._users.get(username) + if not user: + return None + return user.write_perms() + + +def self_test(): + db = UserDB("userdb.yaml") + print(db.dump()) + + orgs = db.orgs_for_user("user3") + assert "sunet.se" in orgs + assert "su.se" in orgs + assert len(orgs) == 2 + + rp = db.read_perms("user3", "pw3") + assert len(rp) == 2 + assert "sunet.se" in rp + assert "su.se" in rp + + wp = db.write_perms("user3", "pw3") + assert len(wp) == 1 + assert "sunet.se" in wp + + +if __name__ == "__main__": + self_test() -- cgit v1.1