summaryrefslogtreecommitdiff
path: root/src/couch/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/client.py')
-rw-r--r--src/couch/client.py801
1 files changed, 0 insertions, 801 deletions
diff --git a/src/couch/client.py b/src/couch/client.py
deleted file mode 100644
index 96dc78a..0000000
--- a/src/couch/client.py
+++ /dev/null
@@ -1,801 +0,0 @@
-# -*- coding: utf-8 -*-
-# Based on py-couchdb (https://github.com/histrio/py-couchdb)
-
-import os
-import json
-import uuid
-import copy
-import mimetypes
-import warnings
-
-from .utils import (
- force_bytes,
- force_text,
- encode_view_options,
- extract_credentials,
-)
-from .feedreader import (
- SimpleFeedReader,
- BaseFeedReader,
-)
-
-from .exceptions import (
- Conflict,
- NotFound,
- FeedReaderExited,
- UnexpectedError,
-)
-from .resource import Resource
-
-
-DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/')
-
-
-def _id_to_path(_id: str) -> str:
- if _id[:1] == "_":
- return _id.split("/", 1)
- return [_id]
-
-
-def _listen_feed(object, node, feed_reader, **kwargs):
- if not callable(feed_reader):
- raise UnexpectedError("feed_reader must be callable or class")
-
- if isinstance(feed_reader, BaseFeedReader):
- reader = feed_reader(object)
- else:
- reader = SimpleFeedReader()(object, feed_reader)
-
- # Possible options: "continuous", "longpoll"
- kwargs.setdefault("feed", "continuous")
- data = force_bytes(json.dumps(kwargs.pop('data', {})))
-
- (resp, result) = object.resource(node).post(
- params=kwargs, data=data, stream=True)
- try:
- for line in resp.iter_lines():
- # ignore heartbeats
- if not line:
- reader.on_heartbeat()
- else:
- reader.on_message(json.loads(force_text(line)))
- except FeedReaderExited:
- reader.on_close()
-
-
-class _StreamResponse(object):
- """
- Proxy object for python-requests stream response.
-
- See more on:
- http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests
- """
-
- def __init__(self, response):
- self._response = response
-
- def iter_content(self, chunk_size=1, decode_unicode=False):
- return self._response.iter_content(chunk_size=chunk_size,
- decode_unicode=decode_unicode)
-
- def iter_lines(self, chunk_size=512, decode_unicode=None):
- return self._response.iter_lines(chunk_size=chunk_size,
- decode_unicode=decode_unicode)
-
- @property
- def raw(self):
- return self._response.raw
-
- @property
- def url(self):
- return self._response.url
-
-
-class Server(object):
- """
- Class that represents a couchdb connection.
-
- :param verify: setup ssl verification.
- :param base_url: a full url to couchdb (can contain auth data).
- :param full_commit: If ``False``, couchdb not commits all data on a
- request is finished.
- :param authmethod: specify a authentication method. By default "basic"
- method is used but also exists "session" (that requires
- some server configuration changes).
-
- .. versionchanged: 1.4
- Set basic auth method as default instead of session method.
-
- .. versionchanged: 1.5
- Add verify parameter for setup ssl verificaton
-
- """
-
- def __init__(self, base_url=DEFAULT_BASE_URL, full_commit=True,
- authmethod="basic", verify=False):
-
- self.base_url, credentials = extract_credentials(base_url)
- self.resource = Resource(self.base_url, full_commit,
- credentials=credentials,
- authmethod=authmethod,
- verify=verify)
-
- def __repr__(self):
- return '<CouchDB Server "{}">'.format(self.base_url)
-
- def __contains__(self, name):
- try:
- self.resource.head(name)
- except NotFound:
- return False
- else:
- return True
-
- def __iter__(self):
- (r, result) = self.resource.get('_all_dbs')
- return iter(result)
-
- def __len__(self):
- (r, result) = self.resource.get('_all_dbs')
- return len(result)
-
- def info(self):
- """
- Get server info.
-
- :returns: dict with all data that couchdb returns.
- :rtype: dict
- """
- (r, result) = self.resource.get()
- return result
-
- def delete(self, name):
- """
- Delete some database.
-
- :param name: database name
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a database does not exists
- """
-
- self.resource.delete(name)
-
- def database(self, name):
- """
- Get a database instance.
-
- :param name: database name
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a database does not exists
-
- :returns: a :py:class:`~pycouchdb.client.Database` instance
- """
- (r, result) = self.resource.head(name)
- if r.status_code == 404:
- raise NotFound("Database '{0}' does not exists".format(name))
-
- db = Database(self.resource(name), name)
- return db
-
- # TODO: Config in 2.0 are applicable for nodes only
- # TODO: Reimplement when nodes endpoint will be ready
- # def config(self):
- # pass
-
- def version(self):
- """
- Get the current version of a couchdb server.
- """
- (resp, result) = self.resource.get()
- return result["version"]
-
- # TODO: Stats in 2.0 are applicable for nodes only
- # TODO: Reimplement when nodes endpoint will be ready
- # def stats(self, name=None):
- # pass
-
- def create(self, name):
- """
- Create a database.
-
- :param name: database name
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
- if a database already exists
- :returns: a :py:class:`~pycouchdb.client.Database` instance
- """
- (resp, result) = self.resource.put(name)
- if resp.status_code in (200, 201):
- return self.database(name)
-
- def replicate(self, source, target, **kwargs):
- """
- Replicate the source database to the target one.
-
- .. versionadded:: 1.3
-
- :param source: full URL to the source database
- :param target: full URL to the target database
- """
-
- data = {'source': source, 'target': target}
- data.update(kwargs)
-
- data = force_bytes(json.dumps(data))
-
- (resp, result) = self.resource.post('_replicate', data=data)
- return result
-
- def changes_feed(self, feed_reader, **kwargs):
- """
- Subscribe to changes feed of the whole CouchDB server.
-
- Note: this method is blocking.
-
-
- :param feed_reader: callable or :py:class:`~BaseFeedReader`
- instance
-
- .. [Ref] http://docs.couchdb.org/en/1.6.1/api/server/common.html#db-updates
- .. versionadded: 1.10
- """
- object = self
- _listen_feed(object, "_db_updates", feed_reader, **kwargs)
-
-
-class Database(object):
- """
- Class that represents a couchdb database.
- """
-
- def __init__(self, resource, name):
- self.resource = resource
- self.name = name
-
- def __repr__(self):
- return '<CouchDB Database "{}">'.format(self.name)
-
- def __contains__(self, doc_id):
- try:
- (resp, result) = self.resource.head(_id_to_path(doc_id))
- return resp.status_code < 206
- except NotFound:
- return False
-
- def config(self):
- """
- Get database status data such as document count, update sequence etc.
- :return: dict
- """
- (resp, result) = self.resource.get()
- return result
-
- def __nonzero__(self):
- """Is the database available"""
- resp, _ = self.resource.head()
- return resp.status_code == 200
-
- def __len__(self):
- return self.config()['doc_count']
-
- def delete(self, doc_or_id):
- """
- Delete document by id.
-
- .. versionchanged:: 1.2
- Accept document or id.
-
- :param doc_or_id: document or id
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document
- not exists
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if delete with
- wrong revision.
- """
-
- _id = None
- if isinstance(doc_or_id, dict):
- if "_id" not in doc_or_id:
- raise ValueError("Invalid document, missing _id attr")
- _id = doc_or_id['_id']
- else:
- _id = doc_or_id
-
- resource = self.resource(*_id_to_path(_id))
-
- (r, result) = resource.head()
- (r, result) = resource.delete(
- params={"rev": r.headers["etag"].strip('"')})
-
- def delete_bulk(self, docs, transaction=True):
- """
- Delete a bulk of documents.
-
- .. versionadded:: 1.2
-
- :param docs: list of docs
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if a delete
- is not success
- :returns: raw results from server
- """
-
- _docs = copy.copy(docs)
- for doc in _docs:
- if "_deleted" not in doc:
- doc["_deleted"] = True
-
- data = force_bytes(json.dumps({"docs": _docs}))
- params = {"all_or_nothing": "true" if transaction else "false"}
- (resp, results) = self.resource.post(
- "_bulk_docs", data=data, params=params)
-
- for result, doc in zip(results, _docs):
- if "error" in result:
- raise Conflict("one or more docs are not saved")
-
- return results
-
- def get(self, doc_id, params=None, **kwargs):
- """
- Get a document by id.
-
- .. versionadded: 1.5
- Now the prefered method to pass params is via **kwargs
- instead of params argument. **params** argument is now
- deprecated and will be deleted in future versions.
-
- :param doc_id: document id
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document
- not exists
-
- :returns: document (dict)
- """
-
- if params:
- warnings.warn("params parameter is now deprecated in favor to"
- "**kwargs usage.", DeprecationWarning)
-
- if params is None:
- params = {}
-
- params.update(kwargs)
-
- (resp, result) = self.resource(*_id_to_path(doc_id)).get(params=params)
- return result
-
- def save(self, doc, batch=False):
- """
- Save or update a document.
-
- .. versionchanged:: 1.2
- Now returns a new document instead of modify the original.
-
- :param doc: document
- :param batch: allow batch=ok inserts (default False)
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if save with wrong
- revision.
- :returns: doc
- """
-
- _doc = copy.copy(doc)
- if "_id" not in _doc:
- _doc['_id'] = uuid.uuid4().hex
-
- if batch:
- params = {'batch': 'ok'}
- else:
- params = {}
-
- data = force_bytes(json.dumps(_doc))
-
- print("gg1", flush=True)
- print(data, flush=True)
- print("vv1", flush=True)
-
- (resp, result) = self.resource(_doc['_id']).put(
- data=data, params=params)
-
- print("gg3", flush=True)
- print(resp.status_code)
- print(resp.content)
- #print(resp.contents)
-
- print("gg2", flush=True)
- print(data, flush=True)
- print(result, flush=True)
- print("vv2", flush=True)
-
- if resp.status_code == 409:
- raise Conflict(result['reason'])
-
- if "rev" in result and result["rev"] is not None:
- _doc["_rev"] = result["rev"]
-
- return _doc
-
- def save_bulk(self, docs, try_setting_ids=True, transaction=True):
- """
- Save a bulk of documents.
-
- .. versionchanged:: 1.2
- Now returns a new document list instead of modify the original.
-
- :param docs: list of docs
- :param try_setting_ids: if ``True``, we loop through docs and generate/set
- an id in each doc if none exists
- :param transaction: if ``True``, couchdb do a insert in transaction
- model.
- :returns: docs
- """
-
- _docs = copy.deepcopy(docs)
-
- # Insert _id field if it not exists and try_setting_ids is true
- if try_setting_ids:
- for doc in _docs:
- if "_id" not in doc:
- doc["_id"] = uuid.uuid4().hex
-
- data = orce_bytes(json.dumps({"docs": _docs}))
- params = {"all_or_nothing": "true" if transaction else "false"}
-
- (resp, results) = self.resource.post("_bulk_docs", data=data,
- params=params)
-
- for result, doc in zip(results, _docs):
- if "rev" in result:
- doc['_rev'] = result['rev']
-
- return _docs
-
- def all(self, wrapper=None, flat=None, as_list=False, **kwargs):
- """
- Execute a builtin view for get all documents.
-
- :param wrapper: wrap result into a specific class.
- :param as_list: return a list of results instead of a
- default lazy generator.
- :param flat: get a specific field from a object instead
- of a complete object.
-
- .. versionadded: 1.4
- Add as_list parameter.
- Add flat parameter.
-
- :returns: generator object
- """
-
- params = {"include_docs": "true"}
- params.update(kwargs)
-
- data = None
-
- if "keys" in params:
- data = {"keys": params.pop("keys")}
- data = force_bytes(json.dumps(data))
-
- params = encode_view_options(params)
- if data:
- (resp, result) = self.resource.post(
- "_all_docs", params=params, data=data)
- else:
- (resp, result) = self.resource.get("_all_docs", params=params)
-
- if wrapper is None:
- def wrapper(doc): return doc
-
- if flat is not None:
- def wrapper(doc): return doc[flat]
-
- def _iterate():
- for row in result["rows"]:
- yield wrapper(row)
-
- if as_list:
- return list(_iterate())
- return _iterate()
-
- def cleanup(self):
- """
- Execute a cleanup operation.
- """
- (r, result) = self.resource('_view_cleanup').post()
- return result
-
- def commit(self):
- """
- Send commit message to server.
- """
- (resp, result) = self.resource.post('_ensure_full_commit')
- return result
-
- def compact(self):
- """
- Send compact message to server. Compacting write-heavy databases
- should be avoided, otherwise the process may not catch up with
- the writes. Read load has no effect.
- """
- (r, result) = self.resource("_compact").post()
- return result
-
- def compact_view(self, ddoc):
- """
- Execute compact over design view.
-
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a view does not exists.
- """
- (r, result) = self.resource("_compact", ddoc).post()
- return result
-
- def revisions(self, doc_id, status='available', params=None, **kwargs):
- """
- Get all revisions of one document.
-
- :param doc_id: document id
- :param status: filter of revision status, set empty to list all
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a view does not exists.
-
- :returns: generator object
- """
- if params:
- warnings.warn("params parameter is now deprecated in favor to"
- "**kwargs usage.", DeprecationWarning)
-
- if params is None:
- params = {}
-
- params.update(kwargs)
-
- if not params.get('revs_info'):
- params['revs_info'] = 'true'
-
- resource = self.resource(doc_id)
- (resp, result) = resource.get(params=params)
- if resp.status_code == 404:
- raise NotFound("Document id `{0}` not found".format(doc_id))
-
- for rev in result['_revs_info']:
- if status and rev['status'] == status:
- yield self.get(doc_id, rev=rev['rev'])
- elif not status:
- yield self.get(doc_id, rev=rev['rev'])
-
- def delete_attachment(self, doc, filename):
- """
- Delete attachment by filename from document.
-
- .. versionchanged:: 1.2
- Now returns a new document instead of modify the original.
-
- :param doc: document dict
- :param filename: name of attachment.
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
- if save with wrong revision.
- :returns: doc
- """
-
- _doc = copy.deepcopy(doc)
- resource = self.resource(_doc['_id'])
-
- (resp, result) = resource.delete(
- filename, params={'rev': _doc['_rev']})
- if resp.status_code == 404:
- raise NotFound("filename {0} not found".format(filename))
-
- if resp.status_code > 205:
- raise Conflict(result['reason'])
-
- _doc['_rev'] = result['rev']
- try:
- del _doc['_attachments'][filename]
-
- if not _doc['_attachments']:
- del _doc['_attachments']
- except KeyError:
- pass
-
- return _doc
-
- def get_attachment(self, doc, filename, stream=False, **kwargs):
- """
- Get attachment by filename from document.
-
- :param doc: document dict
- :param filename: attachment file name.
- :param stream: setup streaming output (default: False)
-
- .. versionchanged: 1.5
- Add stream parameter for obtain very large attachments
- without load all file to the memory.
-
- :returns: binary data or
- """
-
- params = {"rev": doc["_rev"]}
- params.update(kwargs)
-
- r, result = self.resource(doc['_id']).get(filename, stream=stream,
- params=params)
- if stream:
- return _StreamResponse(r)
-
- return r.content
-
- def put_attachment(self, doc, content, filename=None, content_type=None):
- """
- Put a attachment to a document.
-
- .. versionchanged:: 1.2
- Now returns a new document instead of modify the original.
-
- :param doc: document dict.
- :param content: the content to upload, either a file-like object or
- bytes
- :param filename: the name of the attachment file; if omitted, this
- function tries to get the filename from the file-like
- object passed as the `content` argument value
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
- if save with wrong revision.
- :raises: ValueError
- :returns: doc
- """
-
- if filename is None:
- if hasattr(content, 'name'):
- filename = os.path.basename(content.name)
- else:
- raise ValueError('no filename specified for attachment')
-
- if content_type is None:
- content_type = ';'.join(
- filter(None, mimetypes.guess_type(filename)))
-
- headers = {"Content-Type": content_type}
- resource = self.resource(doc['_id'])
-
- (resp, result) = resource.put(
- filename, data=content, params={'rev': doc['_rev']}, headers=headers)
-
- if resp.status_code < 206:
- return self.get(doc["_id"])
-
- raise Conflict(result['reason'])
-
- def one(self, name, flat=None, wrapper=None, **kwargs):
- """
- Execute a design document view query and returns a first
- result.
-
- :param name: name of the view (eg: docidname/viewname).
- :param wrapper: wrap result into a specific class.
- :param flat: get a specific field from a object instead
- of a complete object.
-
- .. versionadded: 1.4
-
- :returns: object or None
- """
-
- params = {"limit": 1}
- params.update(kwargs)
-
- path = _path_from_name(name, '_view')
- data = None
-
- if "keys" in params:
- data = {"keys": params.pop('keys')}
-
- if data:
- data = force_bytes(json.dumps(data))
-
- params = encode_view_options(params)
- result = list(self._query(self.resource(*path), wrapper=wrapper,
- flat=flat, params=params, data=data))
-
- return result[0] if len(result) > 0 else None
-
- def _query(self, resource, data=None, params=None, headers=None,
- flat=None, wrapper=None):
-
- if data is None:
- (resp, result) = resource.get(params=params, headers=headers)
- else:
- (resp, result) = resource.post(
- data=data, params=params, headers=headers)
-
- if wrapper is None:
- def wrapper(row): return row
-
- if flat is not None:
- def wrapper(row): return row[flat]
-
- for row in result["rows"]:
- yield wrapper(row)
-
- def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs):
- """
- Execute a design document view query.
-
- :param name: name of the view (eg: docidname/viewname).
- :param wrapper: wrap result into a specific class.
- :param as_list: return a list of results instead of a
- default lazy generator.
- :param flat: get a specific field from a object instead
- of a complete object.
-
- .. versionadded: 1.4
- Add as_list parameter.
- Add flat parameter.
-
- :returns: generator object
- """
- params = copy.copy(kwargs)
- path = _path_from_name(name, '_view')
- data = None
-
- if "keys" in params:
- data = {"keys": params.pop('keys')}
-
- if data:
- data = force_bytes(json.dumps(data))
-
- params = encode_view_options(params)
- result = self._query(self.resource(*path), wrapper=wrapper,
- flat=flat, params=params, data=data)
-
- if as_list:
- return list(result)
- return result
-
- def changes_feed(self, feed_reader, **kwargs):
- """
- Subscribe to changes feed of couchdb database.
-
- Note: this method is blocking.
-
-
- :param feed_reader: callable or :py:class:`~BaseFeedReader`
- instance
-
- .. versionadded: 1.5
- """
-
- object = self
- _listen_feed(object, "_changes", feed_reader, **kwargs)
-
- def changes_list(self, **kwargs):
- """
- Obtain a list of changes from couchdb.
-
- .. versionadded: 1.5
- """
-
- (resp, result) = self.resource("_changes").get(params=kwargs)
- return result['last_seq'], result['results']
-
- def find(self, selector, wrapper=None, **kwargs):
- """
- Execute Mango querys using _find.
-
- :param selector: data to search
- :param wrapper: wrap result into a specific class.
-
- """
- path = '_find'
- data = force_bytes(json.dumps(selector))
-
- (resp, result) = self.resource.post(path, data=data, params=kwargs)
-
- if wrapper is None:
- def wrapper(doc): return doc
-
- for doc in result["docs"]:
- yield wrapper(doc)
-
- def index(self, index_doc, **kwargs):
- path = '_index'
- data = force_bytes(json.dumps(index_doc))
-
- (resp, result) = self.resource.post(path, data=data, params=kwargs)
-
- return result