summaryrefslogtreecommitdiff
path: root/src/authn.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordberg.se>2021-09-15 15:29:10 +0200
committerLinus Nordberg <linus@nordberg.se>2021-09-15 15:29:10 +0200
commit838d36e7cab2f11322d4c3c211407a73ebc712b9 (patch)
treecfbb667b8232221b1d00cdbb1ab5428b4647ef8f /src/authn.py
parent6cccfe8cb27fc58f373c069e53f1c8773427d173 (diff)
add simple authentication based on a local yaml file
This allows for mapping username/password pairs to sets of organisations with 'r' or 'rw' permissions. To be replaced with an external service providing a JWT in an HTTP header.
Diffstat (limited to 'src/authn.py')
-rwxr-xr-xsrc/authn.py109
1 files changed, 109 insertions, 0 deletions
diff --git a/src/authn.py b/src/authn.py
new file mode 100755
index 0000000..8227e2c
--- /dev/null
+++ b/src/authn.py
@@ -0,0 +1,109 @@
+#! /usr/bin/env python3
+
+import yaml
+
+
+class Authz:
+ def __init__(self, org, perms):
+ self._org = org
+ self._perms = perms
+
+ def dump(self):
+ return "{}: {}".format(self._org, self._perms)
+
+ def read_p(self):
+ return 'r' in self._perms
+
+ def write_p(self):
+ return 'w' in self._perms
+
+class User:
+ def __init__(self, username, pw, authz):
+ self._username = username
+ self._password = pw
+ self._authz = {}
+ for org, perms in authz.items():
+ self._authz[org] = Authz(org, perms)
+
+ def dump(self):
+ return ["{}/{}: {}".format(self._username, self._password, auth.dump())
+ for auth in self._authz.values()]
+
+ def authn_p(self, pw):
+ return pw == self._password
+
+ def orgnames(self):
+ return [x for x in self._authz.keys()]
+
+ def read_perms(self):
+ acc = []
+ for k,v in self._authz.items():
+ if v.read_p():
+ acc.append(k)
+ return acc
+
+ def write_perms(self):
+ acc = []
+ for k,v in self._authz.items():
+ if v.write_p():
+ acc.append(k)
+ return acc
+
+class UserDB:
+ def __init__(self, yamlfile):
+ self._users = {}
+ for u, d in yaml.load(open(yamlfile)).items():
+ self._users[u] = User(u, d['pw'], d['authz'])
+
+ def dump(self):
+ return [u.dump() for u in self._users.values()]
+
+ def user_authn_p(self, username, password):
+ user = self._users.get(username)
+ if not user:
+ return False
+ return user.authn_p(password)
+
+ def orgs_for_user(self, username):
+ return self._users.get(username).orgnames()
+
+ def read_perms(self, username, password):
+ user = self._users.get(username)
+ if not user:
+ return None
+ if not user.authn_p(password):
+ return None
+ return user.read_perms()
+
+ def write_perms(self, username, password):
+ user = self._users.get(username)
+ if not user:
+ return None
+ if not user.authn_p(password):
+ return None
+ return user.write_perms()
+
+
+def self_test():
+ db = UserDB('userdb.yaml')
+ print(db.dump())
+
+ orgs = db.orgs_for_user('user3')
+ assert('sunet.se' in orgs)
+ assert('su.se' in orgs)
+ assert(len(orgs) == 2)
+
+ assert(db.user_authn_p('user3', 'pw3') == True)
+ assert(db.user_authn_p('user3', 'wrongpw') == False)
+
+ rp = db.read_perms('user3', 'pw3')
+ assert(len(rp) == 2)
+ assert('sunet.se' in rp)
+ assert('su.se' in rp)
+
+ wp = db.write_perms('user3', 'pw3')
+ assert(len(wp) == 1)
+ assert('sunet.se' in wp)
+
+if __name__ == '__main__':
+ self_test()