summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristofer Hallin <kristofer@sunet.se>2021-10-01 10:21:54 +0200
committerKristofer Hallin <kristofer@sunet.se>2021-10-01 10:21:54 +0200
commitcb47680777b3fd3bcd955f9e81ddf45c9b69ecfa (patch)
tree1fa8dfae5e9f39c634498f2c60b56a99558cae9a
parent0398e77a809abcaf78c6f7d3e6064a5bee50be23 (diff)
* Use py-couchdb.
* Other minor fixes and tweaks.
-rw-r--r--src/couch/__init__.py11
-rw-r--r--src/couch/client.py772
-rw-r--r--src/couch/exceptions.py38
-rw-r--r--src/couch/feedreader.py52
-rw-r--r--src/couch/resource.py128
-rw-r--r--src/couch/utils.py150
-rwxr-xr-xsrc/db.py34
-rwxr-xr-xsrc/wsgi.py12
8 files changed, 1180 insertions, 17 deletions
diff --git a/src/couch/__init__.py b/src/couch/__init__.py
new file mode 100644
index 0000000..a7537bc
--- /dev/null
+++ b/src/couch/__init__.py
@@ -0,0 +1,11 @@
+# -*- coding: utf-8 -*-
+
+__author__ = "Andrey Antukh"
+__license__ = "BSD"
+__version__ = "1.14.1"
+__maintainer__ = "Rinat Sabitov"
+__email__ = "rinat.sabitov@gmail.com"
+__status__ = "Development"
+
+
+from couch.client import Server # noqa: F401
diff --git a/src/couch/client.py b/src/couch/client.py
new file mode 100644
index 0000000..188e0de
--- /dev/null
+++ b/src/couch/client.py
@@ -0,0 +1,772 @@
+# -*- coding: utf-8 -*-
+# Based on py-couchdb (https://github.com/histrio/py-couchdb)
+
+import os
+import json
+import uuid
+import copy
+import mimetypes
+import warnings
+
+from couch import utils
+from couch import feedreader
+from couch import exceptions as exp
+from couch.resource import Resource
+
+
+DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/')
+
+
+def _id_to_path(_id):
+ if _id[:1] == "_":
+ return _id.split("/", 1)
+ return [_id]
+
+
+def _listen_feed(object, node, feed_reader, **kwargs):
+ if not callable(feed_reader):
+ raise exp.UnexpectedError("feed_reader must be callable or class")
+
+ if isinstance(feed_reader, feedreader.BaseFeedReader):
+ reader = feed_reader(object)
+ else:
+ reader = feedreader.SimpleFeedReader()(object, feed_reader)
+
+ # Possible options: "continuous", "longpoll"
+ kwargs.setdefault("feed", "continuous")
+ data = utils.force_bytes(json.dumps(kwargs.pop('data', {})))
+
+ (resp, result) = object.resource(node).post(
+ params=kwargs, data=data, stream=True)
+ try:
+ for line in resp.iter_lines():
+ # ignore heartbeats
+ if not line:
+ reader.on_heartbeat()
+ else:
+ reader.on_message(json.loads(utils.force_text(line)))
+ except exp.FeedReaderExited:
+ reader.on_close()
+
+
+class _StreamResponse(object):
+ """
+ Proxy object for python-requests stream response.
+
+ See more on:
+ http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests
+ """
+
+ def __init__(self, response):
+ self._response = response
+
+ def iter_content(self, chunk_size=1, decode_unicode=False):
+ return self._response.iter_content(chunk_size=chunk_size,
+ decode_unicode=decode_unicode)
+
+ def iter_lines(self, chunk_size=512, decode_unicode=None):
+ return self._response.iter_lines(chunk_size=chunk_size,
+ decode_unicode=decode_unicode)
+
+ @property
+ def raw(self):
+ return self._response.raw
+
+ @property
+ def url(self):
+ return self._response.url
+
+
+class Server(object):
+ """
+ Class that represents a couchdb connection.
+
+ :param verify: setup ssl verification.
+ :param base_url: a full url to couchdb (can contain auth data).
+ :param full_commit: If ``False``, couchdb not commits all data on a
+ request is finished.
+ :param authmethod: specify a authentication method. By default "basic"
+ method is used but also exists "session" (that requires
+ some server configuration changes).
+
+ .. versionchanged: 1.4
+ Set basic auth method as default instead of session method.
+
+ .. versionchanged: 1.5
+ Add verify parameter for setup ssl verificaton
+
+ """
+
+ def __init__(self, base_url=DEFAULT_BASE_URL, full_commit=True,
+ authmethod="basic", verify=False):
+
+ self.base_url, credentials = utils.extract_credentials(base_url)
+ self.resource = Resource(self.base_url, full_commit,
+ credentials=credentials,
+ authmethod=authmethod,
+ verify=verify)
+
+ def __repr__(self):
+ return '<CouchDB Server "{}">'.format(self.base_url)
+
+ def __contains__(self, name):
+ try:
+ self.resource.head(name)
+ except exp.NotFound:
+ return False
+ else:
+ return True
+
+ def __iter__(self):
+ (r, result) = self.resource.get('_all_dbs')
+ return iter(result)
+
+ def __len__(self):
+ (r, result) = self.resource.get('_all_dbs')
+ return len(result)
+
+ def info(self):
+ """
+ Get server info.
+
+ :returns: dict with all data that couchdb returns.
+ :rtype: dict
+ """
+ (r, result) = self.resource.get()
+ return result
+
+ def delete(self, name):
+ """
+ Delete some database.
+
+ :param name: database name
+ :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
+ if a database does not exists
+ """
+
+ self.resource.delete(name)
+
+ def database(self, name):
+ """
+ Get a database instance.
+
+ :param name: database name
+ :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
+ if a database does not exists
+
+ :returns: a :py:class:`~pycouchdb.client.Database` instance
+ """
+ (r, result) = self.resource.head(name)
+ if r.status_code == 404:
+ raise exp.NotFound("Database '{0}' does not exists".format(name))
+
+ db = Database(self.resource(name), name)
+ return db
+
+ # TODO: Config in 2.0 are applicable for nodes only
+ # TODO: Reimplement when nodes endpoint will be ready
+ # def config(self):
+ # pass
+
+ def version(self):
+ """
+ Get the current version of a couchdb server.
+ """
+ (resp, result) = self.resource.get()
+ return result["version"]
+
+ # TODO: Stats in 2.0 are applicable for nodes only
+ # TODO: Reimplement when nodes endpoint will be ready
+ # def stats(self, name=None):
+ # pass
+
+ def create(self, name):
+ """
+ Create a database.
+
+ :param name: database name
+ :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
+ if a database already exists
+ :returns: a :py:class:`~pycouchdb.client.Database` instance
+ """
+ (resp, result) = self.resource.put(name)
+ if resp.status_code in (200, 201):
+ return self.database(name)
+
+ def replicate(self, source, target, **kwargs):
+ """
+ Replicate the source database to the target one.
+
+ .. versionadded:: 1.3
+
+ :param source: full URL to the source database
+ :param target: full URL to the target database
+ """
+
+ data = {'source': source, 'target': target}
+ data.update(kwargs)
+
+ data = utils.force_bytes(json.dumps(data))
+
+ (resp, result) = self.resource.post('_replicate', data=data)
+ return result
+
+ def changes_feed(self, feed_reader, **kwargs):
+ """
+ Subscribe to changes feed of the whole CouchDB server.
+
+ Note: this method is blocking.
+
+
+ :param feed_reader: callable or :py:class:`~BaseFeedReader`
+ instance
+
+ .. [Ref] http://docs.couchdb.org/en/1.6.1/api/server/common.html#db-updates
+ .. versionadded: 1.10
+ """
+ object = self
+ _listen_feed(object, "_db_updates", feed_reader, **kwargs)
+
+
+class Database(object):
+ """
+ Class that represents a couchdb database.
+ """
+
+ def __init__(self, resource, name):
+ self.resource = resource
+ self.name = name
+
+ def __repr__(self):
+ return '<CouchDB Database "{}">'.format(self.name)
+
+ def __contains__(self, doc_id):
+ try:
+ (resp, result) = self.resource.head(_id_to_path(doc_id))
+ return resp.status_code < 206
+ except exp.NotFound:
+ return False
+
+ def config(self):
+ """
+ Get database status data such as document count, update sequence etc.
+ :return: dict
+ """
+ (resp, result) = self.resource.get()
+ return result
+
+ def __nonzero__(self):
+ """Is the database available"""
+ resp, _ = self.resource.head()
+ return resp.status_code == 200
+
+ def __len__(self):
+ return self.config()['doc_count']
+
+ def delete(self, doc_or_id):
+ """
+ Delete document by id.
+
+ .. versionchanged:: 1.2
+ Accept document or id.
+
+ :param doc_or_id: document or id
+ :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document
+ not exists
+ :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if delete with
+ wrong revision.
+ """
+
+ _id = None
+ if isinstance(doc_or_id, dict):
+ if "_id" not in doc_or_id:
+ raise ValueError("Invalid document, missing _id attr")
+ _id = doc_or_id['_id']
+ else:
+ _id = doc_or_id
+
+ resource = self.resource(*_id_to_path(_id))
+
+ (r, result) = resource.head()
+ (r, result) = resource.delete(
+ params={"rev": r.headers["etag"].strip('"')})
+
+ def delete_bulk(self, docs, transaction=True):
+ """
+ Delete a bulk of documents.
+
+ .. versionadded:: 1.2
+
+ :param docs: list of docs
+ :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if a delete
+ is not success
+ :returns: raw results from server
+ """
+
+ _docs = copy.copy(docs)
+ for doc in _docs:
+ if "_deleted" not in doc:
+ doc["_deleted"] = True
+
+ data = utils.force_bytes(json.dumps({"docs": _docs}))
+ params = {"all_or_nothing": "true" if transaction else "false"}
+ (resp, results) = self.resource.post(
+ "_bulk_docs", data=data, params=params)
+
+ for result, doc in zip(results, _docs):
+ if "error" in result:
+ raise exp.Conflict("one or more docs are not saved")
+
+ return results
+
+ def get(self, doc_id, params=None, **kwargs):
+ """
+ Get a document by id.
+
+ .. versionadded: 1.5
+ Now the prefered method to pass params is via **kwargs
+ instead of params argument. **params** argument is now
+ deprecated and will be deleted in future versions.
+
+ :param doc_id: document id
+ :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document
+ not exists
+
+ :returns: document (dict)
+ """
+
+ if params:
+ warnings.warn("params parameter is now deprecated in favor to"
+ "**kwargs usage.", DeprecationWarning)
+
+ if params is None:
+ params = {}
+
+ params.update(kwargs)
+
+ (resp, result) = self.resource(*_id_to_path(doc_id)).get(params=params)
+ return result
+
+ def save(self, doc, batch=False):
+ """
+ Save or update a document.
+
+ .. versionchanged:: 1.2
+ Now returns a new document instead of modify the original.
+
+ :param doc: document
+ :param batch: allow batch=ok inserts (default False)
+ :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if save with wrong
+ revision.
+ :returns: doc
+ """
+
+ _doc = copy.copy(doc)
+ if "_id" not in _doc:
+ _doc['_id'] = uuid.uuid4().hex
+
+ if batch:
+ params = {'batch': 'ok'}
+ else:
+ params = {}
+
+ data = utils.force_bytes(json.dumps(_doc))
+ (resp, result) = self.resource(_doc['_id']).put(
+ data=data, params=params)
+
+ if resp.status_code == 409:
+ raise exp.Conflict(result['reason'])
+
+ if "rev" in result and result["rev"] is not None:
+ _doc["_rev"] = result["rev"]
+
+ return _doc
+
+ def save_bulk(self, docs, try_setting_ids=True, transaction=True):
+ """
+ Save a bulk of documents.
+
+ .. versionchanged:: 1.2
+ Now returns a new document list instead of modify the original.
+
+ :param docs: list of docs
+ :param try_setting_ids: if ``True``, we loop through docs and generate/set
+ an id in each doc if none exists
+ :param transaction: if ``True``, couchdb do a insert in transaction
+ model.
+ :returns: docs
+ """
+
+ _docs = copy.deepcopy(docs)
+
+ # Insert _id field if it not exists and try_setting_ids is true
+ if try_setting_ids:
+ for doc in _docs:
+ if "_id" not in doc:
+ doc["_id"] = uuid.uuid4().hex
+
+ data = utils.force_bytes(json.dumps({"docs": _docs}))
+ params = {"all_or_nothing": "true" if transaction else "false"}
+
+ (resp, results) = self.resource.post("_bulk_docs", data=data,
+ params=params)
+
+ for result, doc in zip(results, _docs):
+ if "rev" in result:
+ doc['_rev'] = result['rev']
+
+ return _docs
+
+ def all(self, wrapper=None, flat=None, as_list=False, **kwargs):
+ """
+ Execute a builtin view for get all documents.
+
+ :param wrapper: wrap result into a specific class.
+ :param as_list: return a list of results instead of a
+ default lazy generator.
+ :param flat: get a specific field from a object instead
+ of a complete object.
+
+ .. versionadded: 1.4
+ Add as_list parameter.
+ Add flat parameter.
+
+ :returns: generator object
+ """
+
+ params = {"include_docs": "true"}
+ params.update(kwargs)
+
+ data = None
+
+ if "keys" in params:
+ data = {"keys": params.pop("keys")}
+ data = utils.force_bytes(json.dumps(data))
+
+ params = utils.encode_view_options(params)
+ if data:
+ (resp, result) = self.resource.post(
+ "_all_docs", params=params, data=data)
+ else:
+ (resp, result) = self.resource.get("_all_docs", params=params)
+
+ if wrapper is None:
+ def wrapper(doc): return doc
+
+ if flat is not None:
+ def wrapper(doc): return doc[flat]
+
+ def _iterate():
+ for row in result["rows"]:
+ yield wrapper(row)
+
+ if as_list:
+ return list(_iterate())
+ return _iterate()
+
+ def cleanup(self):
+ """
+ Execute a cleanup operation.
+ """
+ (r, result) = self.resource('_view_cleanup').post()
+ return result
+
+ def commit(self):
+ """
+ Send commit message to server.
+ """
+ (resp, result) = self.resource.post('_ensure_full_commit')
+ return result
+
+ def compact(self):
+ """
+ Send compact message to server. Compacting write-heavy databases
+ should be avoided, otherwise the process may not catch up with
+ the writes. Read load has no effect.
+ """
+ (r, result) = self.resource("_compact").post()
+ return result
+
+ def compact_view(self, ddoc):
+ """
+ Execute compact over design view.
+
+ :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
+ if a view does not exists.
+ """
+ (r, result) = self.resource("_compact", ddoc).post()
+ return result
+
+ def revisions(self, doc_id, status='available', params=None, **kwargs):
+ """
+ Get all revisions of one document.
+
+ :param doc_id: document id
+ :param status: filter of revision status, set empty to list all
+ :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
+ if a view does not exists.
+
+ :returns: generator object
+ """
+ if params:
+ warnings.warn("params parameter is now deprecated in favor to"
+ "**kwargs usage.", DeprecationWarning)
+
+ if params is None:
+ params = {}
+
+ params.update(kwargs)
+
+ if not params.get('revs_info'):
+ params['revs_info'] = 'true'
+
+ resource = self.resource(doc_id)
+ (resp, result) = resource.get(params=params)
+ if resp.status_code == 404:
+ raise exp.NotFound("Document id `{0}` not found".format(doc_id))
+
+ for rev in result['_revs_info']:
+ if status and rev['status'] == status:
+ yield self.get(doc_id, rev=rev['rev'])
+ elif not status:
+ yield self.get(doc_id, rev=rev['rev'])
+
+ def delete_attachment(self, doc, filename):
+ """
+ Delete attachment by filename from document.
+
+ .. versionchanged:: 1.2
+ Now returns a new document instead of modify the original.
+
+ :param doc: document dict
+ :param filename: name of attachment.
+ :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
+ if save with wrong revision.
+ :returns: doc
+ """
+
+ _doc = copy.deepcopy(doc)
+ resource = self.resource(_doc['_id'])
+
+ (resp, result) = resource.delete(
+ filename, params={'rev': _doc['_rev']})
+ if resp.status_code == 404:
+ raise exp.NotFound("filename {0} not found".format(filename))
+
+ if resp.status_code > 205:
+ raise exp.Conflict(result['reason'])
+
+ _doc['_rev'] = result['rev']
+ try:
+ del _doc['_attachments'][filename]
+
+ if not _doc['_attachments']:
+ del _doc['_attachments']
+ except KeyError:
+ pass
+
+ return _doc
+
+ def get_attachment(self, doc, filename, stream=False, **kwargs):
+ """
+ Get attachment by filename from document.
+
+ :param doc: document dict
+ :param filename: attachment file name.
+ :param stream: setup streaming output (default: False)
+
+ .. versionchanged: 1.5
+ Add stream parameter for obtain very large attachments
+ without load all file to the memory.
+
+ :returns: binary data or
+ """
+
+ params = {"rev": doc["_rev"]}
+ params.update(kwargs)
+
+ r, result = self.resource(doc['_id']).get(filename, stream=stream,
+ params=params)
+ if stream:
+ return _StreamResponse(r)
+
+ return r.content
+
+ def put_attachment(self, doc, content, filename=None, content_type=None):
+ """
+ Put a attachment to a document.
+
+ .. versionchanged:: 1.2
+ Now returns a new document instead of modify the original.
+
+ :param doc: document dict.
+ :param content: the content to upload, either a file-like object or
+ bytes
+ :param filename: the name of the attachment file; if omitted, this
+ function tries to get the filename from the file-like
+ object passed as the `content` argument value
+ :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
+ if save with wrong revision.
+ :raises: ValueError
+ :returns: doc
+ """
+
+ if filename is None:
+ if hasattr(content, 'name'):
+ filename = os.path.basename(content.name)
+ else:
+ raise ValueError('no filename specified for attachment')
+
+ if content_type is None:
+ content_type = ';'.join(
+ filter(None, mimetypes.guess_type(filename)))
+
+ headers = {"Content-Type": content_type}
+ resource = self.resource(doc['_id'])
+
+ (resp, result) = resource.put(
+ filename, data=content, params={'rev': doc['_rev']}, headers=headers)
+
+ if resp.status_code < 206:
+ return self.get(doc["_id"])
+
+ raise exp.Conflict(result['reason'])
+
+ def one(self, name, flat=None, wrapper=None, **kwargs):
+ """
+ Execute a design document view query and returns a first
+ result.
+
+ :param name: name of the view (eg: docidname/viewname).
+ :param wrapper: wrap result into a specific class.
+ :param flat: get a specific field from a object instead
+ of a complete object.
+
+ .. versionadded: 1.4
+
+ :returns: object or None
+ """
+
+ params = {"limit": 1}
+ params.update(kwargs)
+
+ path = utils._path_from_name(name, '_view')
+ data = None
+
+ if "keys" in params:
+ data = {"keys": params.pop('keys')}
+
+ if data:
+ data = utils.force_bytes(json.dumps(data))
+
+ params = utils.encode_view_options(params)
+ result = list(self._query(self.resource(*path), wrapper=wrapper,
+ flat=flat, params=params, data=data))
+
+ return result[0] if len(result) > 0 else None
+
+ def _query(self, resource, data=None, params=None, headers=None,
+ flat=None, wrapper=None):
+
+ if data is None:
+ (resp, result) = resource.get(params=params, headers=headers)
+ else:
+ (resp, result) = resource.post(
+ data=data, params=params, headers=headers)
+
+ if wrapper is None:
+ def wrapper(row): return row
+
+ if flat is not None:
+ def wrapper(row): return row[flat]
+
+ for row in result["rows"]:
+ yield wrapper(row)
+
+ def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs):
+ """
+ Execute a design document view query.
+
+ :param name: name of the view (eg: docidname/viewname).
+ :param wrapper: wrap result into a specific class.
+ :param as_list: return a list of results instead of a
+ default lazy generator.
+ :param flat: get a specific field from a object instead
+ of a complete object.
+
+ .. versionadded: 1.4
+ Add as_list parameter.
+ Add flat parameter.
+
+ :returns: generator object
+ """
+ params = copy.copy(kwargs)
+ path = utils._path_from_name(name, '_view')
+ data = None
+
+ if "keys" in params:
+ data = {"keys": params.pop('keys')}
+
+ if data:
+ data = utils.force_bytes(json.dumps(data))
+
+ params = utils.encode_view_options(params)
+ result = self._query(self.resource(*path), wrapper=wrapper,
+ flat=flat, params=params, data=data)
+
+ if as_list:
+ return list(result)
+ return result
+
+ def changes_feed(self, feed_reader, **kwargs):
+ """
+ Subscribe to changes feed of couchdb database.
+
+ Note: this method is blocking.
+
+
+ :param feed_reader: callable or :py:class:`~BaseFeedReader`
+ instance
+
+ .. versionadded: 1.5
+ """
+
+ object = self
+ _listen_feed(object, "_changes", feed_reader, **kwargs)
+
+ def changes_list(self, **kwargs):
+ """
+ Obtain a list of changes from couchdb.
+
+ .. versionadded: 1.5
+ """
+
+ (resp, result) = self.resource("_changes").get(params=kwargs)
+ return result['last_seq'], result['results']
+
+ def find(self, selector, wrapper=None, **kwargs):
+ """
+ Execute Mango querys using _find.
+
+ :param selector: data to search
+ :param wrapper: wrap result into a specific class.
+
+ """
+ path = '_find'
+ data = utils.force_bytes(json.dumps(selector))
+
+ (resp, result) = self.resource.post(path, data=data, params=kwargs)
+
+ if wrapper is None:
+ def wrapper(doc): return doc
+
+ for doc in result["docs"]:
+ yield wrapper(doc)
+
+ def index(self, index_doc, **kwargs):
+ path = '_index'
+ data = utils.force_bytes(json.dumps(index_doc))
+
+ (resp, result) = self.resource.post(path, data=data, params=kwargs)
+
+ return result
diff --git a/src/couch/exceptions.py b/src/couch/exceptions.py
new file mode 100644
index 0000000..d7e037b
--- /dev/null
+++ b/src/couch/exceptions.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+# Based on py-couchdb (https://github.com/histrio/py-couchdb)
+
+
+class Error(Exception):
+ pass
+
+
+class UnexpectedError(Error):
+ pass
+
+
+class FeedReaderExited(Error):
+ pass
+
+
+class ApiError(Error):
+ pass
+
+
+class GenericError(ApiError):
+ pass
+
+
+class Conflict(ApiError):
+ pass
+
+
+class NotFound(ApiError):
+ pass
+
+
+class BadRequest(ApiError):
+ pass
+
+
+class AuthenticationFailed(ApiError):
+ pass
diff --git a/src/couch/feedreader.py b/src/couch/feedreader.py
new file mode 100644
index 0000000..e293932
--- /dev/null
+++ b/src/couch/feedreader.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Based on py-couchdb (https://github.com/histrio/py-couchdb)
+
+
+class BaseFeedReader(object):
+ """
+ Base interface class for changes feed reader.
+ """
+
+ def __call__(self, db):
+ self.db = db
+ return self
+
+ def on_message(self, message):
+ """
+ Callback method that is called when change
+ message is received from couchdb.
+
+ :param message: change object
+ :returns: None
+ """
+
+ raise NotImplementedError()
+
+ def on_close(self):
+ """
+ Callback method that is received when connection
+ is closed with a server. By default, does nothing.
+ """
+ pass
+
+ def on_heartbeat(self):
+ """
+ Callback method invoked when a hearbeat (empty line) is received
+ from the _changes stream. Override this to purge the reader's internal
+ buffers (if any) if it waited too long without receiving anything.
+ """
+ pass
+
+
+class SimpleFeedReader(BaseFeedReader):
+ """
+ Simple feed reader that encapsule any callable in
+ a valid feed reader interface.
+ """
+
+ def __call__(self, db, callback):
+ self.callback = callback
+ return super(SimpleFeedReader, self).__call__(db)
+
+ def on_message(self, message):
+ self.callback(message, db=self.db)
diff --git a/src/couch/resource.py b/src/couch/resource.py
new file mode 100644
index 0000000..da1e0dd
--- /dev/null
+++ b/src/couch/resource.py
@@ -0,0 +1,128 @@
+# -*- coding: utf-8 -*-
+# Based on py-couchdb (https://github.com/histrio/py-couchdb)
+
+
+from __future__ import unicode_literals
+
+import json
+import requests
+
+from couch import utils
+from couch import exceptions
+
+
+class Resource(object):
+ def __init__(self, base_url, full_commit=True, session=None,
+ credentials=None, authmethod="session", verify=False):
+
+ self.base_url = base_url
+# self.verify = verify
+
+ if not session:
+ self.session = requests.session()
+
+ self.session.headers.update({"accept": "application/json",
+ "content-type": "application/json"})
+ self._authenticate(credentials, authmethod)
+
+ if not full_commit:
+ self.session.headers.update({'X-Couch-Full-Commit': 'false'})
+ else:
+ self.session = session
+ self.session.verify = verify
+
+ def _authenticate(self, credentials, method):
+ if not credentials:
+ return
+
+ if method == "session":
+ data = {"name": credentials[0], "password": credentials[1]}
+ data = utils.force_bytes(json.dumps(data))
+
+ post_url = utils.urljoin(self.base_url, "_session")
+ r = self.session.post(post_url, data=data)
+ if r.status_code != 200:
+ raise exceptions.AuthenticationFailed()
+
+ elif method == "basic":
+ self.session.auth = credentials
+
+ else:
+ raise RuntimeError("Invalid authentication method")
+
+ def __call__(self, *path):
+ base_url = utils.urljoin(self.base_url, *path)
+ return self.__class__(base_url, session=self.session)
+
+ def _check_result(self, response, result):
+ try:
+ error = result.get('error', None)
+ reason = result.get('reason', None)
+ except AttributeError:
+ error = None
+ reason = ''
+
+ # This is here because couchdb can return http 201
+ # but containing a list of conflict errors
+ if error == 'conflict' or error == "file_exists":
+ raise exceptions.Conflict(reason or "Conflict")
+
+ if response.status_code > 205:
+ if response.status_code == 404 or error == 'not_found':
+ raise exceptions.NotFound(reason or 'Not found')
+ elif error == 'bad_request':
+ raise exceptions.BadRequest(reason or "Bad request")
+ raise exceptions.GenericError(result)
+
+ def request(self, method, path, params=None, data=None,
+ headers=None, stream=False, **kwargs):
+
+ if headers is None:
+ headers = {}
+
+ headers.setdefault('Accept', 'application/json')
+
+ if path:
+ if not isinstance(path, (list, tuple)):
+ path = [path]
+ url = utils.urljoin(self.base_url, *path)
+ else:
+ url = self.base_url
+
+ response = self.session.request(method, url, stream=stream,
+ data=data, params=params,
+ headers=headers, **kwargs)
+ # Ignore result validation if
+ # request is with stream mode
+
+ if stream and response.status_code < 400:
+ result = None
+ self._check_result(response, result)
+ else:
+ result = utils.as_json(response)
+
+ if result is None:
+ return response, result
+
+ if isinstance(result, list):
+ for res in result:
+ self._check_result(response, res)
+ else:
+ self._check_result(response, result)
+
+ return response, result
+
+ def get(self, path=None, **kwargs):
+ return self.request("GET", path, **kwargs)
+
+ def put(self, path=None, **kwargs):
+ return self.request("PUT", path, **kwargs)
+
+ def post(self, path=None, **kwargs):
+ return self.request("POST", path, **kwargs)
+
+ def delete(self, path=None, **kwargs):
+ return self.request("DELETE", path, **kwargs)
+
+ def head(self, path=None, **kwargs):
+ return self.request("HEAD", path, **kwargs)
diff --git a/src/couch/utils.py b/src/couch/utils.py
new file mode 100644
index 0000000..1cd21d8
--- /dev/null
+++ b/src/couch/utils.py
@@ -0,0 +1,150 @@
+# -*- coding: utf-8 -*-
+# Based on py-couchdb (https://github.com/histrio/py-couchdb)
+
+
+import json
+import sys
+
+
+if sys.version_info[0] == 3:
+ from urllib.parse import unquote as _unquote
+ from urllib.parse import urlunsplit, urlsplit
+
+ string_type = str
+ bytes_type = bytes
+
+ from functools import reduce
+
+else:
+ from urllib import unquote as _unquote
+ from urlparse import urlunsplit, urlsplit
+
+ string_type = unicode # noqa: F821
+ bytes_type = str
+
+URLSPLITTER = '/'
+
+
+json_encoder = json.JSONEncoder()
+
+
+def extract_credentials(url):
+ """
+ Extract authentication (user name and password) credentials from the
+ given URL.
+
+ >>> extract_credentials('http://localhost:5984/_config/')
+ ('http://localhost:5984/_config/', None)
+ >>> extract_credentials('http://joe:secret@localhost:5984/_config/')
+ ('http://localhost:5984/_config/', ('joe', 'secret'))
+ >>> extract_credentials('http://joe%40example.com:secret@'
+ ... 'localhost:5984/_config/')
+ ('http://localhost:5984/_config/', ('joe@example.com', 'secret'))
+ """
+ parts = urlsplit(url)
+ netloc = parts[1]
+ if '@' in netloc:
+ creds, netloc = netloc.split('@')
+ credentials = tuple(_unquote(i) for i in creds.split(':'))
+ parts = list(parts)
+ parts[1] = netloc
+ else:
+ credentials = None
+ return urlunsplit(parts), credentials
+
+
+def _join(head, tail):
+ parts = [head.rstrip(URLSPLITTER), tail.lstrip(URLSPLITTER)]
+ return URLSPLITTER.join(parts)
+
+
+def urljoin(base, *path):
+ """
+ Assemble a uri based on a base, any number of path segments, and query
+ string parameters.
+
+ >>> urljoin('http://example.org', '_all_dbs')
+ 'http://example.org/_all_dbs'
+
+ A trailing slash on the uri base is handled gracefully:
+
+ >>> urljoin('http://example.org/', '_all_dbs')
+ 'http://example.org/_all_dbs'
+
+ And multiple positional arguments become path parts:
+
+ >>> urljoin('http://example.org/', 'foo', 'bar')
+ 'http://example.org/foo/bar'
+
+ >>> urljoin('http://example.org/', 'foo/bar')
+ 'http://example.org/foo/bar'
+
+ >>> urljoin('http://example.org/', 'foo', '/bar/')
+ 'http://example.org/foo/bar/'
+
+ >>> urljoin('http://example.com', 'org.couchdb.user:username')
+ 'http://example.com/org.couchdb.user:username'
+ """
+ return reduce(_join, path, base)
+
+
+def as_json(response):
+ if "application/json" in response.headers['content-type']:
+ response_src = response.content.decode('utf-8')
+ if response.content != b'':
+ return json.loads(response_src)
+ else:
+ return response_src
+ return None
+
+
+def _path_from_name(name, type):
+ """
+ Expand a 'design/foo' style name to its full path as a list of
+ segments.
+
+ >>> _path_from_name("_design/test", '_view')
+ ['_design', 'test']
+ >>> _path_from_name("design/test", '_view')
+ ['_design', 'design', '_view', 'test']
+ """
+ if name.startswith('_'):
+ return name.split('/')
+ design, name = name.split('/', 1)
+ return ['_design', design, type, name]
+
+
+def encode_view_options(options):
+ """
+ Encode any items in the options dict that are sent as a JSON string to a
+ view/list function.
+
+ >>> opts = {'key': 'foo', "notkey":"bar"}
+ >>> res = encode_view_options(opts)
+ >>> res["key"], res["notkey"]
+ ('"foo"', 'bar')
+
+ >>> opts = {'startkey': 'foo', "endkey":"bar"}
+ >>> res = encode_view_options(opts)
+ >>> res['startkey'], res['endkey']
+ ('"foo"', '"bar"')
+ """
+ retval = {}
+
+ for name, value in options.items():
+ if name in ('key', 'startkey', 'endkey'):
+ value = json_encoder.encode(value)
+ retval[name] = value
+ return retval
+
+
+def force_bytes(data, encoding="utf-8"):
+ if isinstance(data, string_type):
+ data = data.encode(encoding)
+ return data
+
+
+def force_text(data, encoding="utf-8"):
+ if isinstance(data, bytes_type):
+ data = data.decode(encoding)
+ return data
diff --git a/src/db.py b/src/db.py
index 0c7e998..86c6f24 100755
--- a/src/db.py
+++ b/src/db.py
@@ -10,19 +10,31 @@
# invoking time.time() several times quickly enough.
import time
-import struct
-import couchdb
+import couch
class DictDB():
def __init__(self, database, hostname, username, password):
- self.server = couchdb.Server(
+ self.server = couch.client.Server(
f"http://{username}:{password}@{hostname}:5984/")
- if database not in self.server:
- self.server.create(database)
+ try:
+ self.couchdb = self.server.database(database)
+ except couch.exceptions.NotFound:
+ print("Creating database and indexes.")
+ index = {
+ "index": {
+ "fields": [
+ "domain"
+ ]
+ },
+ "name": "domain-json-index",
+ "type": "json"
+ }
+
+ self.couchdb = self.server.create(database)
+ print(self.couchdb.index(index))
- self.couchdb = self.server[database]
self._ts = time.time()
def unique_key(self):
@@ -33,17 +45,17 @@ class DictDB():
return self._ts
- def index_add(self, path):
- pass
-
def add(self, data, batch_write=False):
key = str(self.unique_key())
if type(data) is list:
for item in data:
- self.couchdb[key] = item
+ item['_id'] = str(self._ts)
+
+ self.couchdb.save(item)
else:
- self.couchdb[key] = data
+ data['_id'] = str(self._ts)
+ self.couchdb.save(data)
return key
diff --git a/src/wsgi.py b/src/wsgi.py
index 49c1c17..15f9224 100755
--- a/src/wsgi.py
+++ b/src/wsgi.py
@@ -2,13 +2,13 @@
import os
import sys
-from wsgiref.simple_server import make_server
-import falcon
import json
+import authn
+import falcon
+
from db import DictDB
from base64 import b64decode
-
-import authn
+from wsgiref.simple_server import make_server
class CollectorResource():
@@ -36,9 +36,9 @@ class CollectorResource():
class EPGet(CollectorResource):
def on_get(self, req, resp):
+ out = []
resp.status = falcon.HTTP_200
resp.content_type = falcon.MEDIA_JSON
- out = []
orgs = self.user_auth(req.auth, self._users.read_perms)
if not orgs:
@@ -46,7 +46,7 @@ class EPGet(CollectorResource):
resp.text = 'Invalid username or password\n'
return
- # We really should rely on req.params in its pure form since
+ # We really shouldn't rely on req.params in its pure form since
# it might contain garbage.
selectors = req.params