diff options
Diffstat (limited to 'src/couch/client.py')
-rw-r--r-- | src/couch/client.py | 772 |
1 files changed, 0 insertions, 772 deletions
diff --git a/src/couch/client.py b/src/couch/client.py deleted file mode 100644 index 188e0de..0000000 --- a/src/couch/client.py +++ /dev/null @@ -1,772 +0,0 @@ -# -*- coding: utf-8 -*- -# Based on py-couchdb (https://github.com/histrio/py-couchdb) - -import os -import json -import uuid -import copy -import mimetypes -import warnings - -from couch import utils -from couch import feedreader -from couch import exceptions as exp -from couch.resource import Resource - - -DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/') - - -def _id_to_path(_id): - if _id[:1] == "_": - return _id.split("/", 1) - return [_id] - - -def _listen_feed(object, node, feed_reader, **kwargs): - if not callable(feed_reader): - raise exp.UnexpectedError("feed_reader must be callable or class") - - if isinstance(feed_reader, feedreader.BaseFeedReader): - reader = feed_reader(object) - else: - reader = feedreader.SimpleFeedReader()(object, feed_reader) - - # Possible options: "continuous", "longpoll" - kwargs.setdefault("feed", "continuous") - data = utils.force_bytes(json.dumps(kwargs.pop('data', {}))) - - (resp, result) = object.resource(node).post( - params=kwargs, data=data, stream=True) - try: - for line in resp.iter_lines(): - # ignore heartbeats - if not line: - reader.on_heartbeat() - else: - reader.on_message(json.loads(utils.force_text(line))) - except exp.FeedReaderExited: - reader.on_close() - - -class _StreamResponse(object): - """ - Proxy object for python-requests stream response. - - See more on: - http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests - """ - - def __init__(self, response): - self._response = response - - def iter_content(self, chunk_size=1, decode_unicode=False): - return self._response.iter_content(chunk_size=chunk_size, - decode_unicode=decode_unicode) - - def iter_lines(self, chunk_size=512, decode_unicode=None): - return self._response.iter_lines(chunk_size=chunk_size, - decode_unicode=decode_unicode) - - @property - def raw(self): - return self._response.raw - - @property - def url(self): - return self._response.url - - -class Server(object): - """ - Class that represents a couchdb connection. - - :param verify: setup ssl verification. - :param base_url: a full url to couchdb (can contain auth data). - :param full_commit: If ``False``, couchdb not commits all data on a - request is finished. - :param authmethod: specify a authentication method. By default "basic" - method is used but also exists "session" (that requires - some server configuration changes). - - .. versionchanged: 1.4 - Set basic auth method as default instead of session method. - - .. versionchanged: 1.5 - Add verify parameter for setup ssl verificaton - - """ - - def __init__(self, base_url=DEFAULT_BASE_URL, full_commit=True, - authmethod="basic", verify=False): - - self.base_url, credentials = utils.extract_credentials(base_url) - self.resource = Resource(self.base_url, full_commit, - credentials=credentials, - authmethod=authmethod, - verify=verify) - - def __repr__(self): - return '<CouchDB Server "{}">'.format(self.base_url) - - def __contains__(self, name): - try: - self.resource.head(name) - except exp.NotFound: - return False - else: - return True - - def __iter__(self): - (r, result) = self.resource.get('_all_dbs') - return iter(result) - - def __len__(self): - (r, result) = self.resource.get('_all_dbs') - return len(result) - - def info(self): - """ - Get server info. - - :returns: dict with all data that couchdb returns. - :rtype: dict - """ - (r, result) = self.resource.get() - return result - - def delete(self, name): - """ - Delete some database. - - :param name: database name - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a database does not exists - """ - - self.resource.delete(name) - - def database(self, name): - """ - Get a database instance. - - :param name: database name - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a database does not exists - - :returns: a :py:class:`~pycouchdb.client.Database` instance - """ - (r, result) = self.resource.head(name) - if r.status_code == 404: - raise exp.NotFound("Database '{0}' does not exists".format(name)) - - db = Database(self.resource(name), name) - return db - - # TODO: Config in 2.0 are applicable for nodes only - # TODO: Reimplement when nodes endpoint will be ready - # def config(self): - # pass - - def version(self): - """ - Get the current version of a couchdb server. - """ - (resp, result) = self.resource.get() - return result["version"] - - # TODO: Stats in 2.0 are applicable for nodes only - # TODO: Reimplement when nodes endpoint will be ready - # def stats(self, name=None): - # pass - - def create(self, name): - """ - Create a database. - - :param name: database name - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` - if a database already exists - :returns: a :py:class:`~pycouchdb.client.Database` instance - """ - (resp, result) = self.resource.put(name) - if resp.status_code in (200, 201): - return self.database(name) - - def replicate(self, source, target, **kwargs): - """ - Replicate the source database to the target one. - - .. versionadded:: 1.3 - - :param source: full URL to the source database - :param target: full URL to the target database - """ - - data = {'source': source, 'target': target} - data.update(kwargs) - - data = utils.force_bytes(json.dumps(data)) - - (resp, result) = self.resource.post('_replicate', data=data) - return result - - def changes_feed(self, feed_reader, **kwargs): - """ - Subscribe to changes feed of the whole CouchDB server. - - Note: this method is blocking. - - - :param feed_reader: callable or :py:class:`~BaseFeedReader` - instance - - .. [Ref] http://docs.couchdb.org/en/1.6.1/api/server/common.html#db-updates - .. versionadded: 1.10 - """ - object = self - _listen_feed(object, "_db_updates", feed_reader, **kwargs) - - -class Database(object): - """ - Class that represents a couchdb database. - """ - - def __init__(self, resource, name): - self.resource = resource - self.name = name - - def __repr__(self): - return '<CouchDB Database "{}">'.format(self.name) - - def __contains__(self, doc_id): - try: - (resp, result) = self.resource.head(_id_to_path(doc_id)) - return resp.status_code < 206 - except exp.NotFound: - return False - - def config(self): - """ - Get database status data such as document count, update sequence etc. - :return: dict - """ - (resp, result) = self.resource.get() - return result - - def __nonzero__(self): - """Is the database available""" - resp, _ = self.resource.head() - return resp.status_code == 200 - - def __len__(self): - return self.config()['doc_count'] - - def delete(self, doc_or_id): - """ - Delete document by id. - - .. versionchanged:: 1.2 - Accept document or id. - - :param doc_or_id: document or id - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document - not exists - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if delete with - wrong revision. - """ - - _id = None - if isinstance(doc_or_id, dict): - if "_id" not in doc_or_id: - raise ValueError("Invalid document, missing _id attr") - _id = doc_or_id['_id'] - else: - _id = doc_or_id - - resource = self.resource(*_id_to_path(_id)) - - (r, result) = resource.head() - (r, result) = resource.delete( - params={"rev": r.headers["etag"].strip('"')}) - - def delete_bulk(self, docs, transaction=True): - """ - Delete a bulk of documents. - - .. versionadded:: 1.2 - - :param docs: list of docs - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if a delete - is not success - :returns: raw results from server - """ - - _docs = copy.copy(docs) - for doc in _docs: - if "_deleted" not in doc: - doc["_deleted"] = True - - data = utils.force_bytes(json.dumps({"docs": _docs})) - params = {"all_or_nothing": "true" if transaction else "false"} - (resp, results) = self.resource.post( - "_bulk_docs", data=data, params=params) - - for result, doc in zip(results, _docs): - if "error" in result: - raise exp.Conflict("one or more docs are not saved") - - return results - - def get(self, doc_id, params=None, **kwargs): - """ - Get a document by id. - - .. versionadded: 1.5 - Now the prefered method to pass params is via **kwargs - instead of params argument. **params** argument is now - deprecated and will be deleted in future versions. - - :param doc_id: document id - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document - not exists - - :returns: document (dict) - """ - - if params: - warnings.warn("params parameter is now deprecated in favor to" - "**kwargs usage.", DeprecationWarning) - - if params is None: - params = {} - - params.update(kwargs) - - (resp, result) = self.resource(*_id_to_path(doc_id)).get(params=params) - return result - - def save(self, doc, batch=False): - """ - Save or update a document. - - .. versionchanged:: 1.2 - Now returns a new document instead of modify the original. - - :param doc: document - :param batch: allow batch=ok inserts (default False) - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if save with wrong - revision. - :returns: doc - """ - - _doc = copy.copy(doc) - if "_id" not in _doc: - _doc['_id'] = uuid.uuid4().hex - - if batch: - params = {'batch': 'ok'} - else: - params = {} - - data = utils.force_bytes(json.dumps(_doc)) - (resp, result) = self.resource(_doc['_id']).put( - data=data, params=params) - - if resp.status_code == 409: - raise exp.Conflict(result['reason']) - - if "rev" in result and result["rev"] is not None: - _doc["_rev"] = result["rev"] - - return _doc - - def save_bulk(self, docs, try_setting_ids=True, transaction=True): - """ - Save a bulk of documents. - - .. versionchanged:: 1.2 - Now returns a new document list instead of modify the original. - - :param docs: list of docs - :param try_setting_ids: if ``True``, we loop through docs and generate/set - an id in each doc if none exists - :param transaction: if ``True``, couchdb do a insert in transaction - model. - :returns: docs - """ - - _docs = copy.deepcopy(docs) - - # Insert _id field if it not exists and try_setting_ids is true - if try_setting_ids: - for doc in _docs: - if "_id" not in doc: - doc["_id"] = uuid.uuid4().hex - - data = utils.force_bytes(json.dumps({"docs": _docs})) - params = {"all_or_nothing": "true" if transaction else "false"} - - (resp, results) = self.resource.post("_bulk_docs", data=data, - params=params) - - for result, doc in zip(results, _docs): - if "rev" in result: - doc['_rev'] = result['rev'] - - return _docs - - def all(self, wrapper=None, flat=None, as_list=False, **kwargs): - """ - Execute a builtin view for get all documents. - - :param wrapper: wrap result into a specific class. - :param as_list: return a list of results instead of a - default lazy generator. - :param flat: get a specific field from a object instead - of a complete object. - - .. versionadded: 1.4 - Add as_list parameter. - Add flat parameter. - - :returns: generator object - """ - - params = {"include_docs": "true"} - params.update(kwargs) - - data = None - - if "keys" in params: - data = {"keys": params.pop("keys")} - data = utils.force_bytes(json.dumps(data)) - - params = utils.encode_view_options(params) - if data: - (resp, result) = self.resource.post( - "_all_docs", params=params, data=data) - else: - (resp, result) = self.resource.get("_all_docs", params=params) - - if wrapper is None: - def wrapper(doc): return doc - - if flat is not None: - def wrapper(doc): return doc[flat] - - def _iterate(): - for row in result["rows"]: - yield wrapper(row) - - if as_list: - return list(_iterate()) - return _iterate() - - def cleanup(self): - """ - Execute a cleanup operation. - """ - (r, result) = self.resource('_view_cleanup').post() - return result - - def commit(self): - """ - Send commit message to server. - """ - (resp, result) = self.resource.post('_ensure_full_commit') - return result - - def compact(self): - """ - Send compact message to server. Compacting write-heavy databases - should be avoided, otherwise the process may not catch up with - the writes. Read load has no effect. - """ - (r, result) = self.resource("_compact").post() - return result - - def compact_view(self, ddoc): - """ - Execute compact over design view. - - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a view does not exists. - """ - (r, result) = self.resource("_compact", ddoc).post() - return result - - def revisions(self, doc_id, status='available', params=None, **kwargs): - """ - Get all revisions of one document. - - :param doc_id: document id - :param status: filter of revision status, set empty to list all - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a view does not exists. - - :returns: generator object - """ - if params: - warnings.warn("params parameter is now deprecated in favor to" - "**kwargs usage.", DeprecationWarning) - - if params is None: - params = {} - - params.update(kwargs) - - if not params.get('revs_info'): - params['revs_info'] = 'true' - - resource = self.resource(doc_id) - (resp, result) = resource.get(params=params) - if resp.status_code == 404: - raise exp.NotFound("Document id `{0}` not found".format(doc_id)) - - for rev in result['_revs_info']: - if status and rev['status'] == status: - yield self.get(doc_id, rev=rev['rev']) - elif not status: - yield self.get(doc_id, rev=rev['rev']) - - def delete_attachment(self, doc, filename): - """ - Delete attachment by filename from document. - - .. versionchanged:: 1.2 - Now returns a new document instead of modify the original. - - :param doc: document dict - :param filename: name of attachment. - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` - if save with wrong revision. - :returns: doc - """ - - _doc = copy.deepcopy(doc) - resource = self.resource(_doc['_id']) - - (resp, result) = resource.delete( - filename, params={'rev': _doc['_rev']}) - if resp.status_code == 404: - raise exp.NotFound("filename {0} not found".format(filename)) - - if resp.status_code > 205: - raise exp.Conflict(result['reason']) - - _doc['_rev'] = result['rev'] - try: - del _doc['_attachments'][filename] - - if not _doc['_attachments']: - del _doc['_attachments'] - except KeyError: - pass - - return _doc - - def get_attachment(self, doc, filename, stream=False, **kwargs): - """ - Get attachment by filename from document. - - :param doc: document dict - :param filename: attachment file name. - :param stream: setup streaming output (default: False) - - .. versionchanged: 1.5 - Add stream parameter for obtain very large attachments - without load all file to the memory. - - :returns: binary data or - """ - - params = {"rev": doc["_rev"]} - params.update(kwargs) - - r, result = self.resource(doc['_id']).get(filename, stream=stream, - params=params) - if stream: - return _StreamResponse(r) - - return r.content - - def put_attachment(self, doc, content, filename=None, content_type=None): - """ - Put a attachment to a document. - - .. versionchanged:: 1.2 - Now returns a new document instead of modify the original. - - :param doc: document dict. - :param content: the content to upload, either a file-like object or - bytes - :param filename: the name of the attachment file; if omitted, this - function tries to get the filename from the file-like - object passed as the `content` argument value - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` - if save with wrong revision. - :raises: ValueError - :returns: doc - """ - - if filename is None: - if hasattr(content, 'name'): - filename = os.path.basename(content.name) - else: - raise ValueError('no filename specified for attachment') - - if content_type is None: - content_type = ';'.join( - filter(None, mimetypes.guess_type(filename))) - - headers = {"Content-Type": content_type} - resource = self.resource(doc['_id']) - - (resp, result) = resource.put( - filename, data=content, params={'rev': doc['_rev']}, headers=headers) - - if resp.status_code < 206: - return self.get(doc["_id"]) - - raise exp.Conflict(result['reason']) - - def one(self, name, flat=None, wrapper=None, **kwargs): - """ - Execute a design document view query and returns a first - result. - - :param name: name of the view (eg: docidname/viewname). - :param wrapper: wrap result into a specific class. - :param flat: get a specific field from a object instead - of a complete object. - - .. versionadded: 1.4 - - :returns: object or None - """ - - params = {"limit": 1} - params.update(kwargs) - - path = utils._path_from_name(name, '_view') - data = None - - if "keys" in params: - data = {"keys": params.pop('keys')} - - if data: - data = utils.force_bytes(json.dumps(data)) - - params = utils.encode_view_options(params) - result = list(self._query(self.resource(*path), wrapper=wrapper, - flat=flat, params=params, data=data)) - - return result[0] if len(result) > 0 else None - - def _query(self, resource, data=None, params=None, headers=None, - flat=None, wrapper=None): - - if data is None: - (resp, result) = resource.get(params=params, headers=headers) - else: - (resp, result) = resource.post( - data=data, params=params, headers=headers) - - if wrapper is None: - def wrapper(row): return row - - if flat is not None: - def wrapper(row): return row[flat] - - for row in result["rows"]: - yield wrapper(row) - - def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs): - """ - Execute a design document view query. - - :param name: name of the view (eg: docidname/viewname). - :param wrapper: wrap result into a specific class. - :param as_list: return a list of results instead of a - default lazy generator. - :param flat: get a specific field from a object instead - of a complete object. - - .. versionadded: 1.4 - Add as_list parameter. - Add flat parameter. - - :returns: generator object - """ - params = copy.copy(kwargs) - path = utils._path_from_name(name, '_view') - data = None - - if "keys" in params: - data = {"keys": params.pop('keys')} - - if data: - data = utils.force_bytes(json.dumps(data)) - - params = utils.encode_view_options(params) - result = self._query(self.resource(*path), wrapper=wrapper, - flat=flat, params=params, data=data) - - if as_list: - return list(result) - return result - - def changes_feed(self, feed_reader, **kwargs): - """ - Subscribe to changes feed of couchdb database. - - Note: this method is blocking. - - - :param feed_reader: callable or :py:class:`~BaseFeedReader` - instance - - .. versionadded: 1.5 - """ - - object = self - _listen_feed(object, "_changes", feed_reader, **kwargs) - - def changes_list(self, **kwargs): - """ - Obtain a list of changes from couchdb. - - .. versionadded: 1.5 - """ - - (resp, result) = self.resource("_changes").get(params=kwargs) - return result['last_seq'], result['results'] - - def find(self, selector, wrapper=None, **kwargs): - """ - Execute Mango querys using _find. - - :param selector: data to search - :param wrapper: wrap result into a specific class. - - """ - path = '_find' - data = utils.force_bytes(json.dumps(selector)) - - (resp, result) = self.resource.post(path, data=data, params=kwargs) - - if wrapper is None: - def wrapper(doc): return doc - - for doc in result["docs"]: - yield wrapper(doc) - - def index(self, index_doc, **kwargs): - path = '_index' - data = utils.force_bytes(json.dumps(index_doc)) - - (resp, result) = self.resource.post(path, data=data, params=kwargs) - - return result |