1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
#! /usr/bin/env python3
# A database storing dictionaries, keyed on a timestamp.
# value = A dict which will be stored as a JSON object encoded in
# UTF-8. Note that dict keys of type integer or float will become
# strings while values will keep their type.
# Note that there's a (slim) chance that you'd stomp on the previous
# value if you're too quick with generating the timestamps, ie
# invoking time.time() several times quickly enough.
import time
import struct
import couchdb
class DictDB():
def __init__(self, database, hostname, username, password):
self.server = couchdb.Server(
f"http://{username}:{password}@{hostname}:5984/")
if database not in self.server:
self.server.create(database)
self.couchdb = self.server[database]
self._ts = time.time()
def unique_key(self):
ts = time.time()
while ts == self._ts:
ts = time.time()
self._ts = round(ts * 1000)
return self._ts
def index_add(self, path):
pass
def add(self, data, batch_write=False):
keys = []
if type(data) is list:
for item in data:
key = str(self.unique_key())
self.couchdb[key] = item
keys.append(key)
else:
key = str(self.unique_key())
self.couchdb[key] = data
keys.append(key)
return keys
def get(self, key):
return self.couchdb[key]
def slice(self, key_from=None, key_to=None):
pass
def search(self, **kwargs):
data = list()
selector = dict()
if kwargs:
selector = {
"selector": {
}
}
for key in kwargs:
if kwargs[key].isnumeric():
kwargs[key] = int(kwargs[key])
selector['selector'][key] = {'$eq': kwargs[key]}
for doc in self.couchdb.find(selector):
data.append(doc)
return data
|