diff options
author | Linus Nordberg <linus@nordberg.se> | 2021-09-15 15:29:10 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordberg.se> | 2021-09-15 15:29:10 +0200 |
commit | 838d36e7cab2f11322d4c3c211407a73ebc712b9 (patch) | |
tree | cfbb667b8232221b1d00cdbb1ab5428b4647ef8f /src/authn.py | |
parent | 6cccfe8cb27fc58f373c069e53f1c8773427d173 (diff) |
add simple authentication based on a local yaml file
This allows for mapping username/password pairs to sets of
organisations with 'r' or 'rw' permissions.
To be replaced with an external service providing a JWT in an HTTP
header.
Diffstat (limited to 'src/authn.py')
-rwxr-xr-x | src/authn.py | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/src/authn.py b/src/authn.py new file mode 100755 index 0000000..8227e2c --- /dev/null +++ b/src/authn.py @@ -0,0 +1,109 @@ +#! /usr/bin/env python3 + +import yaml + + +class Authz: + def __init__(self, org, perms): + self._org = org + self._perms = perms + + def dump(self): + return "{}: {}".format(self._org, self._perms) + + def read_p(self): + return 'r' in self._perms + + def write_p(self): + return 'w' in self._perms + +class User: + def __init__(self, username, pw, authz): + self._username = username + self._password = pw + self._authz = {} + for org, perms in authz.items(): + self._authz[org] = Authz(org, perms) + + def dump(self): + return ["{}/{}: {}".format(self._username, self._password, auth.dump()) + for auth in self._authz.values()] + + def authn_p(self, pw): + return pw == self._password + + def orgnames(self): + return [x for x in self._authz.keys()] + + def read_perms(self): + acc = [] + for k,v in self._authz.items(): + if v.read_p(): + acc.append(k) + return acc + + def write_perms(self): + acc = [] + for k,v in self._authz.items(): + if v.write_p(): + acc.append(k) + return acc + +class UserDB: + def __init__(self, yamlfile): + self._users = {} + for u, d in yaml.load(open(yamlfile)).items(): + self._users[u] = User(u, d['pw'], d['authz']) + + def dump(self): + return [u.dump() for u in self._users.values()] + + def user_authn_p(self, username, password): + user = self._users.get(username) + if not user: + return False + return user.authn_p(password) + + def orgs_for_user(self, username): + return self._users.get(username).orgnames() + + def read_perms(self, username, password): + user = self._users.get(username) + if not user: + return None + if not user.authn_p(password): + return None + return user.read_perms() + + def write_perms(self, username, password): + user = self._users.get(username) + if not user: + return None + if not user.authn_p(password): + return None + return user.write_perms() + + +def self_test(): + db = UserDB('userdb.yaml') + print(db.dump()) + + orgs = db.orgs_for_user('user3') + assert('sunet.se' in orgs) + assert('su.se' in orgs) + assert(len(orgs) == 2) + + assert(db.user_authn_p('user3', 'pw3') == True) + assert(db.user_authn_p('user3', 'wrongpw') == False) + + rp = db.read_perms('user3', 'pw3') + assert(len(rp) == 2) + assert('sunet.se' in rp) + assert('su.se' in rp) + + wp = db.write_perms('user3', 'pw3') + assert(len(wp) == 1) + assert('sunet.se' in wp) + +if __name__ == '__main__': + self_test() |