summaryrefslogtreecommitdiff
path: root/auth-server-poc/src/authn.py
diff options
context:
space:
mode:
authorErnst Widerberg <ernst@sunet.se>2022-01-13 18:10:22 +0100
committerErnst Widerberg <ernst@sunet.se>2022-01-13 18:10:22 +0100
commitbfe891000c2d6bb2c73bdc635d22640a3e89e729 (patch)
tree7d56b8af24102823f4976319641d8a977ffdc8ff /auth-server-poc/src/authn.py
parent386f3bd73383368facd9807f737e26478b0302f3 (diff)
Add read/write permissions to JWTs based on YAML
- Uses Linus's YAML code, except with password stuff removed since auth-server-poc uses htpasswd. - The collector checks JWT on API endpoints get, get/{key}, and delete/{key}, but not on add.
Diffstat (limited to 'auth-server-poc/src/authn.py')
-rwxr-xr-xauth-server-poc/src/authn.py97
1 files changed, 97 insertions, 0 deletions
diff --git a/auth-server-poc/src/authn.py b/auth-server-poc/src/authn.py
new file mode 100755
index 0000000..8b32cdc
--- /dev/null
+++ b/auth-server-poc/src/authn.py
@@ -0,0 +1,97 @@
+#! /usr/bin/env python3
+
+import yaml
+
+
+class Authz:
+ def __init__(self, org, perms):
+ self._org = org
+ self._perms = perms
+
+ def dump(self):
+ return "{}: {}".format(self._org, self._perms)
+
+ def read_p(self):
+ return "r" in self._perms
+
+ def write_p(self):
+ return "w" in self._perms
+
+
+class User:
+ def __init__(self, username, authz):
+ self._username = username
+ self._authz = {}
+ for org, perms in authz.items():
+ self._authz[org] = Authz(org, perms)
+
+ def dump(self):
+ return [
+ "{}: {}".format(self._username, auth.dump())
+ for auth in self._authz.values()
+ ]
+
+ def orgnames(self):
+ return [x for x in self._authz.keys()]
+
+ def read_perms(self):
+ acc = []
+ for k, v in self._authz.items():
+ if v.read_p():
+ acc.append(k)
+ return acc
+
+ def write_perms(self):
+ acc = []
+ for k, v in self._authz.items():
+ if v.write_p():
+ acc.append(k)
+ return acc
+
+
+class UserDB:
+ def __init__(self, yamlfile):
+ self._users = {}
+ for u, d in yaml.safe_load(open(yamlfile)).items():
+ self._users[u] = User(u, d["authz"])
+
+ def dump(self):
+ return [u.dump() for u in self._users.values()]
+
+ def orgs_for_user(self, username):
+ return self._users.get(username).orgnames()
+
+ def read_perms(self, username):
+ user = self._users.get(username)
+ if not user:
+ return None
+ return user.read_perms()
+
+ def write_perms(self, username):
+ user = self._users.get(username)
+ if not user:
+ return None
+ return user.write_perms()
+
+
+def self_test():
+ db = UserDB("userdb.yaml")
+ print(db.dump())
+
+ orgs = db.orgs_for_user("user3")
+ assert "sunet.se" in orgs
+ assert "su.se" in orgs
+ assert len(orgs) == 2
+
+ rp = db.read_perms("user3", "pw3")
+ assert len(rp) == 2
+ assert "sunet.se" in rp
+ assert "su.se" in rp
+
+ wp = db.write_perms("user3", "pw3")
+ assert len(wp) == 1
+ assert "sunet.se" in wp
+
+
+if __name__ == "__main__":
+ self_test()