summaryrefslogtreecommitdiff
path: root/src/couch/resource.py
blob: 364bff43ade9bdb8248e919db09b4120d7689763 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# -*- coding: utf-8 -*-
# Based on py-couchdb (https://github.com/histrio/py-couchdb)


from __future__ import unicode_literals
from typing import Union, Tuple

import json
import requests



from couch import utils
from couch import exceptions


class Resource:
    def __init__(self, base_url: str, full_commit: bool = True, session: Union[requests.sessions.Session, None] = None,
                 credentials: Union[Tuple[str, str], None] = None, authmethod: str = "session", verify: bool = False) -> None:

        self.base_url = base_url
#        self.verify = verify

        if not session:
            self.session = requests.session()

            self.session.headers.update({"accept": "application/json",
                                         "content-type": "application/json"})
            self._authenticate(credentials, authmethod)

            if not full_commit:
                self.session.headers.update({'X-Couch-Full-Commit': 'false'})
        else:
            self.session = session
        self.session.verify = verify

    def _authenticate(self, credentials: Union[Tuple[str, str], None], method: str) ->  None:
        if not credentials:
            return

        if method == "session":
            data = {"name": credentials[0], "password": credentials[1]}
            data = utils.force_bytes(json.dumps(data))

            post_url = utils.urljoin(self.base_url, "_session")
            r = self.session.post(post_url, data=data)
            if r.status_code != 200:
                raise exceptions.AuthenticationFailed()

        elif method == "basic":
            self.session.auth = credentials

        else:
            raise RuntimeError("Invalid authentication method")

    def __call__(self, *path: str):
        base_url = utils.urljoin(self.base_url, *path)
        return self.__class__(base_url, session=self.session)

    def _check_result(self, response, result) -> None:
        try:
            error = result.get('error', None)
            reason = result.get('reason', None)
        except AttributeError:
            error = None
            reason = ''

        # This is here because couchdb can return http 201
        # but containing a list of conflict errors
        if error == 'conflict' or error == "file_exists":
            raise exceptions.Conflict(reason or "Conflict")

        if response.status_code > 205:
            if response.status_code == 404 or error == 'not_found':
                raise exceptions.NotFound(reason or 'Not found')
            elif error == 'bad_request':
                raise exceptions.BadRequest(reason or "Bad request")
            raise exceptions.GenericError(result)

    def request(self, method, path: str, params=None, data=None,
                headers=None, stream=False, **kwargs):

        if headers is None:
            headers = {}

        headers.setdefault('Accept', 'application/json')

        if path:
            if not isinstance(path, (list, tuple)):
                path = [path]
            url = utils.urljoin(self.base_url, *path)
        else:
            url = self.base_url

        response = self.session.request(method, url, stream=stream,
                                        data=data, params=params,
                                        headers=headers, **kwargs)
        # Ignore result validation if
        # request is with stream mode

        if stream and response.status_code < 400:
            result = None
            self._check_result(response, result)
        else:
            result = utils.as_json(response)

        if result is None:
            return response, result

        if isinstance(result, list):
            for res in result:
                self._check_result(response, res)
        else:
            self._check_result(response, result)

        return response, result

    def get(self, path: Union[str, None] = None, **kwargs):
        return self.request("GET", path, **kwargs)

    def put(self, path: Union[str, None] = None, **kwargs):
        return self.request("PUT", path, **kwargs)

    def post(self, path: Union[str, None] = None, **kwargs):
        return self.request("POST", path, **kwargs)

    def delete(self, path: Union[str, None] = None, **kwargs):
        return self.request("DELETE", path, **kwargs)

    def head(self, path: Union[str, None] = None, **kwargs):
        return self.request("HEAD", path, **kwargs)