diff options
author | Ernst Widerberg <ernst@sunet.se> | 2022-01-13 18:10:22 +0100 |
---|---|---|
committer | Ernst Widerberg <ernst@sunet.se> | 2022-01-13 18:10:22 +0100 |
commit | bfe891000c2d6bb2c73bdc635d22640a3e89e729 (patch) | |
tree | 7d56b8af24102823f4976319641d8a977ffdc8ff /auth-server-poc/src/authn.py | |
parent | 386f3bd73383368facd9807f737e26478b0302f3 (diff) |
Add read/write permissions to JWTs based on YAML
- Uses Linus's YAML code, except with password stuff removed since
auth-server-poc uses htpasswd.
- The collector checks JWT on API endpoints get, get/{key}, and
delete/{key}, but not on add.
Diffstat (limited to 'auth-server-poc/src/authn.py')
-rwxr-xr-x | auth-server-poc/src/authn.py | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/auth-server-poc/src/authn.py b/auth-server-poc/src/authn.py new file mode 100755 index 0000000..8b32cdc --- /dev/null +++ b/auth-server-poc/src/authn.py @@ -0,0 +1,97 @@ +#! /usr/bin/env python3 + +import yaml + + +class Authz: + def __init__(self, org, perms): + self._org = org + self._perms = perms + + def dump(self): + return "{}: {}".format(self._org, self._perms) + + def read_p(self): + return "r" in self._perms + + def write_p(self): + return "w" in self._perms + + +class User: + def __init__(self, username, authz): + self._username = username + self._authz = {} + for org, perms in authz.items(): + self._authz[org] = Authz(org, perms) + + def dump(self): + return [ + "{}: {}".format(self._username, auth.dump()) + for auth in self._authz.values() + ] + + def orgnames(self): + return [x for x in self._authz.keys()] + + def read_perms(self): + acc = [] + for k, v in self._authz.items(): + if v.read_p(): + acc.append(k) + return acc + + def write_perms(self): + acc = [] + for k, v in self._authz.items(): + if v.write_p(): + acc.append(k) + return acc + + +class UserDB: + def __init__(self, yamlfile): + self._users = {} + for u, d in yaml.safe_load(open(yamlfile)).items(): + self._users[u] = User(u, d["authz"]) + + def dump(self): + return [u.dump() for u in self._users.values()] + + def orgs_for_user(self, username): + return self._users.get(username).orgnames() + + def read_perms(self, username): + user = self._users.get(username) + if not user: + return None + return user.read_perms() + + def write_perms(self, username): + user = self._users.get(username) + if not user: + return None + return user.write_perms() + + +def self_test(): + db = UserDB("userdb.yaml") + print(db.dump()) + + orgs = db.orgs_for_user("user3") + assert "sunet.se" in orgs + assert "su.se" in orgs + assert len(orgs) == 2 + + rp = db.read_perms("user3", "pw3") + assert len(rp) == 2 + assert "sunet.se" in rp + assert "su.se" in rp + + wp = db.write_perms("user3", "pw3") + assert len(wp) == 1 + assert "sunet.se" in wp + + +if __name__ == "__main__": + self_test() |