summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--[-rwxr-xr-x]src/collector/db.py185
-rwxr-xr-xsrc/collector/main.py332
-rw-r--r--src/collector/schema.py57
-rw-r--r--src/couch/__init__.py11
-rw-r--r--src/couch/client.py801
-rw-r--r--src/couch/exceptions.py38
-rw-r--r--src/couch/feedreader.py52
-rw-r--r--src/couch/resource.py139
-rw-r--r--src/couch/utils.py144
-rwxr-xr-xsrc/quickstart_test.sh67
10 files changed, 172 insertions, 1654 deletions
diff --git a/src/collector/db.py b/src/collector/db.py
index 0bfa014..3b16ef5 100755..100644
--- a/src/collector/db.py
+++ b/src/collector/db.py
@@ -1,148 +1,39 @@
-# A database storing dictionaries, keyed on a timestamp. value = A
-# dict which will be stored as a JSON object encoded in UTF-8. Note
-# that dict keys of type integer or float will become strings while
-# values will keep their type.
-
-# Note that there's a (slim) chance that you'd stomp on the previous
-# value if you're too quick with generating the timestamps, ie
-# invoking time.time() several times quickly enough.
-
-from typing import Dict, List, Tuple, Union, Any
-import os
-import sys
-import time
-
-from src import couch
-from .schema import as_index_list, validate_collector_data
-
-
-class DictDB:
- def __init__(self) -> None:
- """
- Check if the database exists, otherwise we will create it together
- with the indexes specified in index.py.
- """
-
- print(os.environ)
-
- try:
- self.database = os.environ["COUCHDB_NAME"]
- self.hostname = os.environ["COUCHDB_HOSTNAME"]
- self.username = os.environ["COUCHDB_USER"]
- self.password = os.environ["COUCHDB_PASSWORD"]
- except KeyError:
- print(
- "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME,"
- + " COUCHDB_USER and COUCHDB_PASSWORD must be set."
- )
- sys.exit(-1)
-
- if "COUCHDB_PORT" in os.environ:
- couchdb_port = os.environ["COUCHDB_PORT"]
- else:
- couchdb_port = "5984"
-
- self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/")
-
- try:
- self.couchdb = self.server.database(self.database)
- print("Database already exists")
- except couch.exceptions.NotFound:
- print("Creating database and indexes.")
- self.couchdb = self.server.create(self.database)
-
- for i in as_index_list():
- self.couchdb.index(i)
-
- self._ts = time.time()
-
- def unique_key(self) -> int:
- """
- Create a unique key based on the current time. We will use this as
- the ID for any new documents we store in CouchDB.
- """
-
- ts = time.time()
- while round(ts * 1000) == self._ts:
- ts = time.time()
- self._ts = round(ts * 1000)
-
- return self._ts
-
- # Why batch_write???
- def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]:
- """
- Store a document in CouchDB.
- """
-
- if isinstance(data, List):
- for item in data:
- error = validate_collector_data(item)
- if error != "":
- return error
- item["_id"] = str(self.unique_key())
- ret: Tuple[str, str] = self.couchdb.save_bulk(data)
+"""Our database module"""
+from time import sleep
+from sys import exit as app_exit
+from dataclasses import dataclass
+
+from motor.motor_asyncio import (
+ AsyncIOMotorClient,
+ AsyncIOMotorCollection,
+)
+from bson import ObjectId
+
+
+@dataclass()
+class DBClient:
+ """Class to hold database connections for us."""
+
+ client: AsyncIOMotorClient
+ collection: AsyncIOMotorCollection
+
+ def __init__(self, username: str, password: str, collection: str) -> None:
+ self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000)
+ self.collection = self.client["production"][collection]
+
+ async def check_server(self) -> None:
+ """Try query the DB and exit the program if we fail after 5 times.
+
+ :return: None
+ """
+ for i in range(5):
+ try:
+ await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")})
+ print("Connection to DB - OK")
+ break
+ except: # pylint: disable=bare-except
+ print(f"WARNING failed to connect to DB - {i} / 4", flush=True)
+ sleep(1)
else:
- error = validate_collector_data(data)
- if error != "":
- return error
- data["_id"] = str(self.unique_key())
- ret = self.couchdb.save(data)
-
- return ret
-
- def get(self, key: int) -> Dict[str, Any]:
- """
- Get a document based on its ID, return an empty dict if not found.
- """
-
- try:
- doc: Dict[str, Any] = self.couchdb.get(key)
- except couch.exceptions.NotFound:
- doc = {}
-
- return doc
-
- #
- # def slice(self, key_from=None, key_to=None):
- # pass
-
- def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]:
- """
- Execute a Mango query, ideally we should have an index matching
- the query otherwise things will be slow.
- """
-
- data: List[Dict[str, Any]] = []
- selector: Dict[str, Any] = {}
-
- try:
- limit = int(limit)
- skip = int(skip)
- except ValueError:
- limit = 25
- skip = 0
-
- if kwargs:
- selector = {"limit": limit, "skip": skip, "selector": {}}
-
- for key in kwargs:
- if kwargs[key] and kwargs[key].isnumeric():
- kwargs[key] = int(kwargs[key])
- selector["selector"][key] = {"$eq": kwargs[key]}
-
- for doc in self.couchdb.find(selector, wrapper=None, limit=5):
- data.append(doc)
-
- return data
-
- def delete(self, key: int) -> Union[int, None]:
- """
- Delete a document based on its ID.
- """
- try:
- self.couchdb.delete(key)
- except couch.exceptions.NotFound:
- return None
-
- return key
+ print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production")
+ app_exit(1)
diff --git a/src/collector/main.py b/src/collector/main.py
index c363885..096b788 100755
--- a/src/collector/main.py
+++ b/src/collector/main.py
@@ -1,267 +1,175 @@
-from typing import Dict, Union, List, Callable, Awaitable, Any
-import json
-import os
+"""Our main module"""
+from typing import Dict, Optional, List, Any
+from os import environ
+import asyncio
import sys
-import time
+from json.decoder import JSONDecodeError
-import uvicorn
-from fastapi import Depends, FastAPI, Request, Response
-from fastapi.middleware.cors import CORSMiddleware
+from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
-from fastapi_jwt_auth import AuthJWT
-from fastapi_jwt_auth.auth_config import AuthConfig
-from fastapi_jwt_auth.exceptions import AuthJWTException
from pydantic import BaseModel
-
-from .db import DictDB
-from .schema import get_index_keys, validate_collector_data
-
-app = FastAPI()
-
-app.add_middleware(
- CORSMiddleware,
- allow_origins=["http://localhost:8001"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- expose_headers=["X-Total-Count"],
+from pymongo.errors import OperationFailure
+from bson import (
+ ObjectId,
+ json_util,
)
+from dotenv import load_dotenv
-# TODO: X-Total-Count
-
-
-@app.middleware("http")
-async def mock_x_total_count_header(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
-
- print(type(call_next))
-
- response: Response = await call_next(request)
- response.headers["X-Total-Count"] = "100"
- return response
-
-
-for i in range(10):
- try:
- db = DictDB()
- except Exception as e:
- print(f"Database not responding, will try again soon. Attempt {i + 1} of 10.")
- else:
- break
- time.sleep(1)
-else:
- print("Database did not respond after 10 attempts, quitting.")
- sys.exit(-1)
-
-
-def get_pubkey() -> str:
- try:
- if "JWT_PUBKEY_PATH" in os.environ:
- keypath = os.environ["JWT_PUBKEY_PATH"]
- else:
- keypath = "/opt/certs/public.pem"
-
- with open(keypath, "r") as fd:
- pubkey = fd.read()
- except FileNotFoundError:
- print(f"Could not find JWT certificate in {keypath}")
- sys.exit(-1)
-
- return pubkey
-
-
-def get_data(
- key: Union[int, None] = None,
- limit: int = 25,
- skip: int = 0,
- ip: Union[str, None] = None,
- port: Union[int, None] = None,
- asn: Union[str, None] = None,
- domain: Union[str, None] = None,
-) -> List[Dict[str, Any]]:
- if key:
- return [db.get(key)]
+from .db import DBClient
+from .schema import valid_schema
- selectors: Dict[str, Any] = {}
- indexes = get_index_keys()
- selectors["domain"] = domain
- if ip and "ip" in indexes:
- selectors["ip"] = ip
- if port and "port" in indexes:
- selectors["port"] = port
- if asn and "asn" in indexes:
- selectors["asn"] = asn
+load_dotenv()
+# Get credentials
+if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ:
+ print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env")
+ sys.exit(1)
- data: List[Dict[str, Any]] = db.search(**selectors, limit=limit, skip=skip)
+# Create DB object
+db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"])
- return data
-
-
-class JWTConfig(BaseModel):
- authjwt_algorithm: str = "ES256"
- authjwt_public_key: str = get_pubkey()
+# Check DB
+loop = asyncio.get_running_loop()
+startup_task = loop.create_task(db.check_server())
+app = FastAPI()
-@AuthJWT.load_config # type: ignore
-def jwt_config():
- return JWTConfig()
+# @app.exception_handler(RuntimeError)
+# def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse:
+# print(exc, flush=True)
+# return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400)
+# return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400)
-@app.exception_handler(AuthJWTException)
-def authjwt_exception_handler(request: Request, exc: AuthJWTException) -> JSONResponse:
- return JSONResponse(content={"status": "error", "message": exc.message}, status_code=400)
+class SearchInput(BaseModel):
+ """Handle search data for HTTP request"""
-@app.exception_handler(RuntimeError)
-def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse:
- return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400)
+ search: Optional[Dict[str, Any]]
+ limit: int = 25
+ skip: int = 0
-@app.get("/sc/v0/get")
-async def get(
- key: Union[int, None] = None,
- limit: int = 25,
- skip: int = 0,
- ip: Union[str, None] = None,
- port: Union[int, None] = None,
- asn: Union[str, None] = None,
- Authorize: AuthJWT = Depends(),
-) -> JSONResponse:
+@app.post("/sc/v0/search")
+async def search(search_data: SearchInput) -> JSONResponse:
+ """/sc/v0/search, POST method
- Authorize.jwt_required()
+ :param search_data: The search data.
+ :return: JSONResponse
+ """
+ data: List[Dict[str, Any]] = []
- data = []
- raw_jwt = Authorize.get_raw_jwt()
+ cursor = db.collection.find(search_data.search)
+ cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip)
- if "read" not in raw_jwt:
+ try:
+ async for document in cursor:
+ data.append(document)
+ except OperationFailure as exc:
+ print(f"DB failed to process: {exc.details}")
return JSONResponse(
content={
"status": "error",
- "message": "Could not find read claim in JWT token",
+ "message": "Probably wrong syntax, note the dictionary for find: "
+ + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for",
},
status_code=400,
)
- else:
- domains = raw_jwt["read"]
- for domain in domains:
- data.extend(get_data(key, limit, skip, ip, port, asn, domain))
+ if not data:
+ return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
- return JSONResponse(content={"status": "success", "docs": data})
+ return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)})
-@app.get("/sc/v0/get/{key}")
-async def get_key(key: Union[int, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse:
+@app.post("/sc/v0")
+async def create(request: Request) -> JSONResponse:
+ """/sc/v0, POST method
- Authorize.jwt_required()
+ :param request: The request where we get the json body.
+ :return: JSONResponse
+ """
- raw_jwt = Authorize.get_raw_jwt()
+ try:
+ json_data = await request.json()
+ except JSONDecodeError:
+ return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400)
- if "read" not in raw_jwt:
- return JSONResponse(
- content={
- "status": "error",
- "message": "Could not find read claim in JWT token",
- },
- status_code=400,
- )
- else:
- allowed_domains = raw_jwt["read"]
+ if not valid_schema(json_data):
+ return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
- data_list = get_data(key)
+ result = await db.collection.insert_one(json_data)
+ return JSONResponse(content={"status": "success", "key": str(result.inserted_id)})
- # Handle if missing
- data = data_list[0]
- if data and data["domain"] not in allowed_domains:
- return JSONResponse(
- content={
- "status": "error",
- "message": "User not authorized to view this object",
- },
- status_code=400,
- )
-
- return JSONResponse(content={"status": "success", "docs": data})
+@app.put("/sc/v0")
+async def update(request: Request) -> JSONResponse:
+ """/sc/v0, PUT method
-
-# WHY IS AUTH OUTCOMMENTED???
-@app.post("/sc/v0/add")
-async def add(data: Request, Authorize: AuthJWT = Depends()) -> JSONResponse:
- # Authorize.jwt_required()
+ :param request: The request where we get the json body.
+ :return: JSONResponse
+ """
try:
- json_data = await data.json()
- except json.decoder.JSONDecodeError:
- return JSONResponse(
- content={
- "status": "error",
- "message": "Invalid JSON.",
- },
- status_code=400,
- )
-
- key = db.add(json_data)
-
- if isinstance(key, str):
- return JSONResponse(
- content={
- "status": "error",
- "message": key,
- },
- status_code=400,
- )
-
- return JSONResponse(content={"status": "success", "docs": key})
-
+ json_data = await request.json()
+ except JSONDecodeError:
+ return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400)
+
+ if "_id" not in json_data:
+ return JSONResponse(content={"status": "error", "message": "Missing key '_id'"}, status_code=400)
+
+ # Get the key
+ if isinstance(json_data["_id"], str):
+ object_id = ObjectId(json_data["_id"])
+ elif (
+ isinstance(json_data["_id"], dict) and "$oid" in json_data["_id"] and isinstance(json_data["_id"]["$oid"], str)
+ ):
+ object_id = ObjectId(json_data["_id"]["$oid"])
+ else:
+ return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400)
-@app.delete("/sc/v0/delete/{key}")
-async def delete(key: int, Authorize: AuthJWT = Depends()) -> JSONResponse:
+ # Ensure the updating key exist
+ document = await db.collection.find_one({"_id": object_id})
+ if document is None:
+ return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
- Authorize.jwt_required()
+ # Ensure valid schema
+ del json_data["_id"]
+ if not valid_schema(json_data):
+ return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
- raw_jwt = Authorize.get_raw_jwt()
+ # Replace the data
+ json_data["_id"] = object_id
+ await db.collection.replace_one({"_id": object_id}, json_data)
+ return JSONResponse(content={"status": "success", "key": str(object_id)})
- if "write" not in raw_jwt:
- return JSONResponse(
- content={
- "status": "error",
- "message": "Could not find write claim in JWT token",
- },
- status_code=400,
- )
- else:
- allowed_domains = raw_jwt["write"]
- data_list = get_data(key)
+@app.get("/sc/v0/{key}")
+async def get(key: str) -> JSONResponse:
+ """/sc/v0, POST method
- # Handle if missing
- data = data_list[0]
+ :param key: The document key in the database.
+ :return: JSONResponse
+ """
- if data and data["domain"] not in allowed_domains:
- return JSONResponse(
- content={
- "status": "error",
- "message": "User not authorized to delete this object",
- },
- status_code=400,
- )
+ document = await db.collection.find_one({"_id": ObjectId(key)})
- if db.delete(key) is None:
+ if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
- return JSONResponse(content={"status": "success", "docs": data})
+ return JSONResponse(content={"status": "success", "docs": json_util.dumps(document)})
-# def main(standalone: bool = False):
-# print(type(app))
-# if not standalone:
-# return app
+@app.delete("/sc/v0/{key}")
+async def delete(key: str) -> JSONResponse:
+ """/sc/v0, POST method
-# uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
+ :param key: The document key in the database.
+ :return: JSONResponse
+ """
+ result = await db.collection.delete_one({"_id": ObjectId(key)})
+ if result.deleted_count == 0:
+ return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
-# if __name__ == "__main__":
-# main(standalone=True)
-# else:
-# app = main()
+ return JSONResponse(content={"status": "success", "key": key})
diff --git a/src/collector/schema.py b/src/collector/schema.py
index e291f10..221990a 100644
--- a/src/collector/schema.py
+++ b/src/collector/schema.py
@@ -1,8 +1,5 @@
-from typing import List, Any, Dict
-import json
-import sys
-import traceback
-
+"""Our schema module"""
+from typing import Any, Dict
import jsonschema
# fmt:off
@@ -64,7 +61,8 @@ schema = {
]
},
{
- "required": [
+ "required":
+ [
"display_name",
"investigation_needed",
# "reliability", # TODO: reliability is required if investigation_needed = true
@@ -93,44 +91,17 @@ schema = {
"result",
],
}
-# fmt:on
-
-
-def get_index_keys() -> List[str]:
- keys: List[str] = []
- for key in schema["properties"]:
- keys.append(key)
- return keys
-def as_index_list() -> List[Dict[str, Any]]:
- index_list: List[Dict[str, Any]] = []
- for key in schema["properties"]:
- name = f"{key}-json-index"
- index = {
- "index": {
- "fields": [
- key,
- ]
- },
- "name": name,
- "type": "json",
- }
- index_list.append(index)
-
- return index_list
+def valid_schema(json_data: Dict[str, Any]) -> bool:
+ """Check if json data follows the schema.
-
-def validate_collector_data(json_blob: Dict[str, Any]) -> str:
+ :param json_data: Json object
+ :return: bool
+ """
try:
- jsonschema.validate(json_blob, schema, format_checker=jsonschema.FormatChecker())
- except jsonschema.exceptions.ValidationError as e:
- return f"Validation failed with error: {e.message}"
- return ""
-
-
-if __name__ == "__main__":
- with open(sys.argv[1]) as fd:
- json_data = json.loads(fd.read())
-
- print(validate_collector_data(json_data))
+ jsonschema.validate(json_data, schema, format_checker=jsonschema.FormatChecker())
+ except jsonschema.exceptions.ValidationError as exc:
+ print(f"Validation failed with error: {exc.message}")
+ return False
+ return True
diff --git a/src/couch/__init__.py b/src/couch/__init__.py
deleted file mode 100644
index 64e0252..0000000
--- a/src/couch/__init__.py
+++ /dev/null
@@ -1,11 +0,0 @@
-# -*- coding: utf-8 -*-
-
-__author__ = "Andrey Antukh"
-__license__ = "BSD"
-__version__ = "1.14.1"
-__maintainer__ = "Rinat Sabitov"
-__email__ = "rinat.sabitov@gmail.com"
-__status__ = "Development"
-
-
-from .client import Server # noqa: F401
diff --git a/src/couch/client.py b/src/couch/client.py
deleted file mode 100644
index 96dc78a..0000000
--- a/src/couch/client.py
+++ /dev/null
@@ -1,801 +0,0 @@
-# -*- coding: utf-8 -*-
-# Based on py-couchdb (https://github.com/histrio/py-couchdb)
-
-import os
-import json
-import uuid
-import copy
-import mimetypes
-import warnings
-
-from .utils import (
- force_bytes,
- force_text,
- encode_view_options,
- extract_credentials,
-)
-from .feedreader import (
- SimpleFeedReader,
- BaseFeedReader,
-)
-
-from .exceptions import (
- Conflict,
- NotFound,
- FeedReaderExited,
- UnexpectedError,
-)
-from .resource import Resource
-
-
-DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/')
-
-
-def _id_to_path(_id: str) -> str:
- if _id[:1] == "_":
- return _id.split("/", 1)
- return [_id]
-
-
-def _listen_feed(object, node, feed_reader, **kwargs):
- if not callable(feed_reader):
- raise UnexpectedError("feed_reader must be callable or class")
-
- if isinstance(feed_reader, BaseFeedReader):
- reader = feed_reader(object)
- else:
- reader = SimpleFeedReader()(object, feed_reader)
-
- # Possible options: "continuous", "longpoll"
- kwargs.setdefault("feed", "continuous")
- data = force_bytes(json.dumps(kwargs.pop('data', {})))
-
- (resp, result) = object.resource(node).post(
- params=kwargs, data=data, stream=True)
- try:
- for line in resp.iter_lines():
- # ignore heartbeats
- if not line:
- reader.on_heartbeat()
- else:
- reader.on_message(json.loads(force_text(line)))
- except FeedReaderExited:
- reader.on_close()
-
-
-class _StreamResponse(object):
- """
- Proxy object for python-requests stream response.
-
- See more on:
- http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests
- """
-
- def __init__(self, response):
- self._response = response
-
- def iter_content(self, chunk_size=1, decode_unicode=False):
- return self._response.iter_content(chunk_size=chunk_size,
- decode_unicode=decode_unicode)
-
- def iter_lines(self, chunk_size=512, decode_unicode=None):
- return self._response.iter_lines(chunk_size=chunk_size,
- decode_unicode=decode_unicode)
-
- @property
- def raw(self):
- return self._response.raw
-
- @property
- def url(self):
- return self._response.url
-
-
-class Server(object):
- """
- Class that represents a couchdb connection.
-
- :param verify: setup ssl verification.
- :param base_url: a full url to couchdb (can contain auth data).
- :param full_commit: If ``False``, couchdb not commits all data on a
- request is finished.
- :param authmethod: specify a authentication method. By default "basic"
- method is used but also exists "session" (that requires
- some server configuration changes).
-
- .. versionchanged: 1.4
- Set basic auth method as default instead of session method.
-
- .. versionchanged: 1.5
- Add verify parameter for setup ssl verificaton
-
- """
-
- def __init__(self, base_url=DEFAULT_BASE_URL, full_commit=True,
- authmethod="basic", verify=False):
-
- self.base_url, credentials = extract_credentials(base_url)
- self.resource = Resource(self.base_url, full_commit,
- credentials=credentials,
- authmethod=authmethod,
- verify=verify)
-
- def __repr__(self):
- return '<CouchDB Server "{}">'.format(self.base_url)
-
- def __contains__(self, name):
- try:
- self.resource.head(name)
- except NotFound:
- return False
- else:
- return True
-
- def __iter__(self):
- (r, result) = self.resource.get('_all_dbs')
- return iter(result)
-
- def __len__(self):
- (r, result) = self.resource.get('_all_dbs')
- return len(result)
-
- def info(self):
- """
- Get server info.
-
- :returns: dict with all data that couchdb returns.
- :rtype: dict
- """
- (r, result) = self.resource.get()
- return result
-
- def delete(self, name):
- """
- Delete some database.
-
- :param name: database name
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a database does not exists
- """
-
- self.resource.delete(name)
-
- def database(self, name):
- """
- Get a database instance.
-
- :param name: database name
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a database does not exists
-
- :returns: a :py:class:`~pycouchdb.client.Database` instance
- """
- (r, result) = self.resource.head(name)
- if r.status_code == 404:
- raise NotFound("Database '{0}' does not exists".format(name))
-
- db = Database(self.resource(name), name)
- return db
-
- # TODO: Config in 2.0 are applicable for nodes only
- # TODO: Reimplement when nodes endpoint will be ready
- # def config(self):
- # pass
-
- def version(self):
- """
- Get the current version of a couchdb server.
- """
- (resp, result) = self.resource.get()
- return result["version"]
-
- # TODO: Stats in 2.0 are applicable for nodes only
- # TODO: Reimplement when nodes endpoint will be ready
- # def stats(self, name=None):
- # pass
-
- def create(self, name):
- """
- Create a database.
-
- :param name: database name
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
- if a database already exists
- :returns: a :py:class:`~pycouchdb.client.Database` instance
- """
- (resp, result) = self.resource.put(name)
- if resp.status_code in (200, 201):
- return self.database(name)
-
- def replicate(self, source, target, **kwargs):
- """
- Replicate the source database to the target one.
-
- .. versionadded:: 1.3
-
- :param source: full URL to the source database
- :param target: full URL to the target database
- """
-
- data = {'source': source, 'target': target}
- data.update(kwargs)
-
- data = force_bytes(json.dumps(data))
-
- (resp, result) = self.resource.post('_replicate', data=data)
- return result
-
- def changes_feed(self, feed_reader, **kwargs):
- """
- Subscribe to changes feed of the whole CouchDB server.
-
- Note: this method is blocking.
-
-
- :param feed_reader: callable or :py:class:`~BaseFeedReader`
- instance
-
- .. [Ref] http://docs.couchdb.org/en/1.6.1/api/server/common.html#db-updates
- .. versionadded: 1.10
- """
- object = self
- _listen_feed(object, "_db_updates", feed_reader, **kwargs)
-
-
-class Database(object):
- """
- Class that represents a couchdb database.
- """
-
- def __init__(self, resource, name):
- self.resource = resource
- self.name = name
-
- def __repr__(self):
- return '<CouchDB Database "{}">'.format(self.name)
-
- def __contains__(self, doc_id):
- try:
- (resp, result) = self.resource.head(_id_to_path(doc_id))
- return resp.status_code < 206
- except NotFound:
- return False
-
- def config(self):
- """
- Get database status data such as document count, update sequence etc.
- :return: dict
- """
- (resp, result) = self.resource.get()
- return result
-
- def __nonzero__(self):
- """Is the database available"""
- resp, _ = self.resource.head()
- return resp.status_code == 200
-
- def __len__(self):
- return self.config()['doc_count']
-
- def delete(self, doc_or_id):
- """
- Delete document by id.
-
- .. versionchanged:: 1.2
- Accept document or id.
-
- :param doc_or_id: document or id
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document
- not exists
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if delete with
- wrong revision.
- """
-
- _id = None
- if isinstance(doc_or_id, dict):
- if "_id" not in doc_or_id:
- raise ValueError("Invalid document, missing _id attr")
- _id = doc_or_id['_id']
- else:
- _id = doc_or_id
-
- resource = self.resource(*_id_to_path(_id))
-
- (r, result) = resource.head()
- (r, result) = resource.delete(
- params={"rev": r.headers["etag"].strip('"')})
-
- def delete_bulk(self, docs, transaction=True):
- """
- Delete a bulk of documents.
-
- .. versionadded:: 1.2
-
- :param docs: list of docs
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if a delete
- is not success
- :returns: raw results from server
- """
-
- _docs = copy.copy(docs)
- for doc in _docs:
- if "_deleted" not in doc:
- doc["_deleted"] = True
-
- data = force_bytes(json.dumps({"docs": _docs}))
- params = {"all_or_nothing": "true" if transaction else "false"}
- (resp, results) = self.resource.post(
- "_bulk_docs", data=data, params=params)
-
- for result, doc in zip(results, _docs):
- if "error" in result:
- raise Conflict("one or more docs are not saved")
-
- return results
-
- def get(self, doc_id, params=None, **kwargs):
- """
- Get a document by id.
-
- .. versionadded: 1.5
- Now the prefered method to pass params is via **kwargs
- instead of params argument. **params** argument is now
- deprecated and will be deleted in future versions.
-
- :param doc_id: document id
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document
- not exists
-
- :returns: document (dict)
- """
-
- if params:
- warnings.warn("params parameter is now deprecated in favor to"
- "**kwargs usage.", DeprecationWarning)
-
- if params is None:
- params = {}
-
- params.update(kwargs)
-
- (resp, result) = self.resource(*_id_to_path(doc_id)).get(params=params)
- return result
-
- def save(self, doc, batch=False):
- """
- Save or update a document.
-
- .. versionchanged:: 1.2
- Now returns a new document instead of modify the original.
-
- :param doc: document
- :param batch: allow batch=ok inserts (default False)
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if save with wrong
- revision.
- :returns: doc
- """
-
- _doc = copy.copy(doc)
- if "_id" not in _doc:
- _doc['_id'] = uuid.uuid4().hex
-
- if batch:
- params = {'batch': 'ok'}
- else:
- params = {}
-
- data = force_bytes(json.dumps(_doc))
-
- print("gg1", flush=True)
- print(data, flush=True)
- print("vv1", flush=True)
-
- (resp, result) = self.resource(_doc['_id']).put(
- data=data, params=params)
-
- print("gg3", flush=True)
- print(resp.status_code)
- print(resp.content)
- #print(resp.contents)
-
- print("gg2", flush=True)
- print(data, flush=True)
- print(result, flush=True)
- print("vv2", flush=True)
-
- if resp.status_code == 409:
- raise Conflict(result['reason'])
-
- if "rev" in result and result["rev"] is not None:
- _doc["_rev"] = result["rev"]
-
- return _doc
-
- def save_bulk(self, docs, try_setting_ids=True, transaction=True):
- """
- Save a bulk of documents.
-
- .. versionchanged:: 1.2
- Now returns a new document list instead of modify the original.
-
- :param docs: list of docs
- :param try_setting_ids: if ``True``, we loop through docs and generate/set
- an id in each doc if none exists
- :param transaction: if ``True``, couchdb do a insert in transaction
- model.
- :returns: docs
- """
-
- _docs = copy.deepcopy(docs)
-
- # Insert _id field if it not exists and try_setting_ids is true
- if try_setting_ids:
- for doc in _docs:
- if "_id" not in doc:
- doc["_id"] = uuid.uuid4().hex
-
- data = orce_bytes(json.dumps({"docs": _docs}))
- params = {"all_or_nothing": "true" if transaction else "false"}
-
- (resp, results) = self.resource.post("_bulk_docs", data=data,
- params=params)
-
- for result, doc in zip(results, _docs):
- if "rev" in result:
- doc['_rev'] = result['rev']
-
- return _docs
-
- def all(self, wrapper=None, flat=None, as_list=False, **kwargs):
- """
- Execute a builtin view for get all documents.
-
- :param wrapper: wrap result into a specific class.
- :param as_list: return a list of results instead of a
- default lazy generator.
- :param flat: get a specific field from a object instead
- of a complete object.
-
- .. versionadded: 1.4
- Add as_list parameter.
- Add flat parameter.
-
- :returns: generator object
- """
-
- params = {"include_docs": "true"}
- params.update(kwargs)
-
- data = None
-
- if "keys" in params:
- data = {"keys": params.pop("keys")}
- data = force_bytes(json.dumps(data))
-
- params = encode_view_options(params)
- if data:
- (resp, result) = self.resource.post(
- "_all_docs", params=params, data=data)
- else:
- (resp, result) = self.resource.get("_all_docs", params=params)
-
- if wrapper is None:
- def wrapper(doc): return doc
-
- if flat is not None:
- def wrapper(doc): return doc[flat]
-
- def _iterate():
- for row in result["rows"]:
- yield wrapper(row)
-
- if as_list:
- return list(_iterate())
- return _iterate()
-
- def cleanup(self):
- """
- Execute a cleanup operation.
- """
- (r, result) = self.resource('_view_cleanup').post()
- return result
-
- def commit(self):
- """
- Send commit message to server.
- """
- (resp, result) = self.resource.post('_ensure_full_commit')
- return result
-
- def compact(self):
- """
- Send compact message to server. Compacting write-heavy databases
- should be avoided, otherwise the process may not catch up with
- the writes. Read load has no effect.
- """
- (r, result) = self.resource("_compact").post()
- return result
-
- def compact_view(self, ddoc):
- """
- Execute compact over design view.
-
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a view does not exists.
- """
- (r, result) = self.resource("_compact", ddoc).post()
- return result
-
- def revisions(self, doc_id, status='available', params=None, **kwargs):
- """
- Get all revisions of one document.
-
- :param doc_id: document id
- :param status: filter of revision status, set empty to list all
- :raises: :py:exc:`~pycouchdb.exceptions.NotFound`
- if a view does not exists.
-
- :returns: generator object
- """
- if params:
- warnings.warn("params parameter is now deprecated in favor to"
- "**kwargs usage.", DeprecationWarning)
-
- if params is None:
- params = {}
-
- params.update(kwargs)
-
- if not params.get('revs_info'):
- params['revs_info'] = 'true'
-
- resource = self.resource(doc_id)
- (resp, result) = resource.get(params=params)
- if resp.status_code == 404:
- raise NotFound("Document id `{0}` not found".format(doc_id))
-
- for rev in result['_revs_info']:
- if status and rev['status'] == status:
- yield self.get(doc_id, rev=rev['rev'])
- elif not status:
- yield self.get(doc_id, rev=rev['rev'])
-
- def delete_attachment(self, doc, filename):
- """
- Delete attachment by filename from document.
-
- .. versionchanged:: 1.2
- Now returns a new document instead of modify the original.
-
- :param doc: document dict
- :param filename: name of attachment.
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
- if save with wrong revision.
- :returns: doc
- """
-
- _doc = copy.deepcopy(doc)
- resource = self.resource(_doc['_id'])
-
- (resp, result) = resource.delete(
- filename, params={'rev': _doc['_rev']})
- if resp.status_code == 404:
- raise NotFound("filename {0} not found".format(filename))
-
- if resp.status_code > 205:
- raise Conflict(result['reason'])
-
- _doc['_rev'] = result['rev']
- try:
- del _doc['_attachments'][filename]
-
- if not _doc['_attachments']:
- del _doc['_attachments']
- except KeyError:
- pass
-
- return _doc
-
- def get_attachment(self, doc, filename, stream=False, **kwargs):
- """
- Get attachment by filename from document.
-
- :param doc: document dict
- :param filename: attachment file name.
- :param stream: setup streaming output (default: False)
-
- .. versionchanged: 1.5
- Add stream parameter for obtain very large attachments
- without load all file to the memory.
-
- :returns: binary data or
- """
-
- params = {"rev": doc["_rev"]}
- params.update(kwargs)
-
- r, result = self.resource(doc['_id']).get(filename, stream=stream,
- params=params)
- if stream:
- return _StreamResponse(r)
-
- return r.content
-
- def put_attachment(self, doc, content, filename=None, content_type=None):
- """
- Put a attachment to a document.
-
- .. versionchanged:: 1.2
- Now returns a new document instead of modify the original.
-
- :param doc: document dict.
- :param content: the content to upload, either a file-like object or
- bytes
- :param filename: the name of the attachment file; if omitted, this
- function tries to get the filename from the file-like
- object passed as the `content` argument value
- :raises: :py:exc:`~pycouchdb.exceptions.Conflict`
- if save with wrong revision.
- :raises: ValueError
- :returns: doc
- """
-
- if filename is None:
- if hasattr(content, 'name'):
- filename = os.path.basename(content.name)
- else:
- raise ValueError('no filename specified for attachment')
-
- if content_type is None:
- content_type = ';'.join(
- filter(None, mimetypes.guess_type(filename)))
-
- headers = {"Content-Type": content_type}
- resource = self.resource(doc['_id'])
-
- (resp, result) = resource.put(
- filename, data=content, params={'rev': doc['_rev']}, headers=headers)
-
- if resp.status_code < 206:
- return self.get(doc["_id"])
-
- raise Conflict(result['reason'])
-
- def one(self, name, flat=None, wrapper=None, **kwargs):
- """
- Execute a design document view query and returns a first
- result.
-
- :param name: name of the view (eg: docidname/viewname).
- :param wrapper: wrap result into a specific class.
- :param flat: get a specific field from a object instead
- of a complete object.
-
- .. versionadded: 1.4
-
- :returns: object or None
- """
-
- params = {"limit": 1}
- params.update(kwargs)
-
- path = _path_from_name(name, '_view')
- data = None
-
- if "keys" in params:
- data = {"keys": params.pop('keys')}
-
- if data:
- data = force_bytes(json.dumps(data))
-
- params = encode_view_options(params)
- result = list(self._query(self.resource(*path), wrapper=wrapper,
- flat=flat, params=params, data=data))
-
- return result[0] if len(result) > 0 else None
-
- def _query(self, resource, data=None, params=None, headers=None,
- flat=None, wrapper=None):
-
- if data is None:
- (resp, result) = resource.get(params=params, headers=headers)
- else:
- (resp, result) = resource.post(
- data=data, params=params, headers=headers)
-
- if wrapper is None:
- def wrapper(row): return row
-
- if flat is not None:
- def wrapper(row): return row[flat]
-
- for row in result["rows"]:
- yield wrapper(row)
-
- def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs):
- """
- Execute a design document view query.
-
- :param name: name of the view (eg: docidname/viewname).
- :param wrapper: wrap result into a specific class.
- :param as_list: return a list of results instead of a
- default lazy generator.
- :param flat: get a specific field from a object instead
- of a complete object.
-
- .. versionadded: 1.4
- Add as_list parameter.
- Add flat parameter.
-
- :returns: generator object
- """
- params = copy.copy(kwargs)
- path = _path_from_name(name, '_view')
- data = None
-
- if "keys" in params:
- data = {"keys": params.pop('keys')}
-
- if data:
- data = force_bytes(json.dumps(data))
-
- params = encode_view_options(params)
- result = self._query(self.resource(*path), wrapper=wrapper,
- flat=flat, params=params, data=data)
-
- if as_list:
- return list(result)
- return result
-
- def changes_feed(self, feed_reader, **kwargs):
- """
- Subscribe to changes feed of couchdb database.
-
- Note: this method is blocking.
-
-
- :param feed_reader: callable or :py:class:`~BaseFeedReader`
- instance
-
- .. versionadded: 1.5
- """
-
- object = self
- _listen_feed(object, "_changes", feed_reader, **kwargs)
-
- def changes_list(self, **kwargs):
- """
- Obtain a list of changes from couchdb.
-
- .. versionadded: 1.5
- """
-
- (resp, result) = self.resource("_changes").get(params=kwargs)
- return result['last_seq'], result['results']
-
- def find(self, selector, wrapper=None, **kwargs):
- """
- Execute Mango querys using _find.
-
- :param selector: data to search
- :param wrapper: wrap result into a specific class.
-
- """
- path = '_find'
- data = force_bytes(json.dumps(selector))
-
- (resp, result) = self.resource.post(path, data=data, params=kwargs)
-
- if wrapper is None:
- def wrapper(doc): return doc
-
- for doc in result["docs"]:
- yield wrapper(doc)
-
- def index(self, index_doc, **kwargs):
- path = '_index'
- data = force_bytes(json.dumps(index_doc))
-
- (resp, result) = self.resource.post(path, data=data, params=kwargs)
-
- return result
diff --git a/src/couch/exceptions.py b/src/couch/exceptions.py
deleted file mode 100644
index d7e037b..0000000
--- a/src/couch/exceptions.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-# Based on py-couchdb (https://github.com/histrio/py-couchdb)
-
-
-class Error(Exception):
- pass
-
-
-class UnexpectedError(Error):
- pass
-
-
-class FeedReaderExited(Error):
- pass
-
-
-class ApiError(Error):
- pass
-
-
-class GenericError(ApiError):
- pass
-
-
-class Conflict(ApiError):
- pass
-
-
-class NotFound(ApiError):
- pass
-
-
-class BadRequest(ApiError):
- pass
-
-
-class AuthenticationFailed(ApiError):
- pass
diff --git a/src/couch/feedreader.py b/src/couch/feedreader.py
deleted file mode 100644
index aac51d3..0000000
--- a/src/couch/feedreader.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# -*- coding: utf-8 -*-
-# Based on py-couchdb (https://github.com/histrio/py-couchdb)
-from __future__ import annotations
-
-class BaseFeedReader:
- """
- Base interface class for changes feed reader.
- """
-
- def __call__(self, db) -> BaseFeedReader:
- self.db = db
- return self
-
- def on_message(self, message: str) -> None:
- """
- Callback method that is called when change
- message is received from couchdb.
-
- :param message: change object
- :returns: None
- """
-
- raise NotImplementedError()
-
- def on_close(self) -> None:
- """
- Callback method that is received when connection
- is closed with a server. By default, does nothing.
- """
- pass
-
- def on_heartbeat(self) -> None:
- """
- Callback method invoked when a hearbeat (empty line) is received
- from the _changes stream. Override this to purge the reader's internal
- buffers (if any) if it waited too long without receiving anything.
- """
- pass
-
-
-class SimpleFeedReader(BaseFeedReader):
- """
- Simple feed reader that encapsule any callable in
- a valid feed reader interface.
- """
-
- def __call__(self, db, callback) -> BaseFeedReader:
- self.callback = callback
- return super(SimpleFeedReader, self).__call__(db)
-
- def on_message(self, message: str) -> None:
- self.callback(message, db=self.db)
diff --git a/src/couch/resource.py b/src/couch/resource.py
deleted file mode 100644
index f110c8d..0000000
--- a/src/couch/resource.py
+++ /dev/null
@@ -1,139 +0,0 @@
-# -*- coding: utf-8 -*-
-# Based on py-couchdb (https://github.com/histrio/py-couchdb)
-
-from __future__ import annotations
-from __future__ import unicode_literals
-from typing import Union, Tuple, Dict, Any
-
-import json
-import requests
-
-from .utils import (
- urljoin,
- as_json,
- force_bytes,
-)
-from .exceptions import (
- GenericError,
- NotFound,
- BadRequest,
- Conflict,
- AuthenticationFailed,
-)
-
-
-class Resource:
- def __init__(self, base_url: str, full_commit: bool = True, session: Union[requests.sessions.Session, None] = None,
- credentials: Union[Tuple[str, str], None] = None, authmethod: str = "session", verify: bool = False) -> None:
-
- self.base_url = base_url
-# self.verify = verify
-
- if not session:
- self.session = requests.session()
-
- self.session.headers.update({"accept": "application/json",
- "content-type": "application/json"})
- self._authenticate(credentials, authmethod)
-
- if not full_commit:
- self.session.headers.update({'X-Couch-Full-Commit': 'false'})
- else:
- self.session = session
- self.session.verify = verify
-
- def _authenticate(self, credentials: Union[Tuple[str, str], None], method: str) -> None:
- if not credentials:
- return
-
- if method == "session":
- data = {"name": credentials[0], "password": credentials[1]}
-
- post_url = urljoin(self.base_url, "_session")
- r = self.session.post(post_url, data=force_bytes(json.dumps(data)))
- if r and r.status_code != 200:
- raise AuthenticationFailed()
-
- elif method == "basic":
- self.session.auth = credentials
-
- else:
- raise RuntimeError("Invalid authentication method")
-
- def __call__(self, *path: str) -> Resource:
- base_url = urljoin(self.base_url, *path)
- return self.__class__(base_url, session=self.session)
-
- def _check_result(self, response, result) -> None:
- try:
- error = result.get('error', None)
- reason = result.get('reason', None)
- except AttributeError:
- error = None
- reason = ''
-
- # This is here because couchdb can return http 201
- # but containing a list of conflict errors
- if error == 'conflict' or error == "file_exists":
- raise Conflict(reason or "Conflict")
-
- if response.status_code > 205:
- if response.status_code == 404 or error == 'not_found':
- raise NotFound(reason or 'Not found')
- elif error == 'bad_request':
- raise BadRequest(reason or "Bad request")
- raise GenericError(result)
-
-
- def request(self, method, path: Union[str, None], params=None, data=None,
- headers=None, stream=False, **kwargs) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
-
- if headers is None:
- headers = {}
-
- headers.setdefault('Accept', 'application/json')
-
- if path:
- if not isinstance(path, (list, tuple)):
- path = [path]
- url = urljoin(self.base_url, *path)
- else:
- url = self.base_url
-
- response = self.session.request(method, url, stream=stream,
- data=data, params=params,
- headers=headers, **kwargs)
- # Ignore result validation if
- # request is with stream mode
-
- if stream and response.status_code < 400:
- result = None
- self._check_result(response, result)
- else:
- result = as_json(response)
-
- if result is None:
- return response, result
-
- if isinstance(result, list):
- for res in result:
- self._check_result(response, res)
- else:
- self._check_result(response, result)
-
- return response, result
-
- def get(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
- return self.request("GET", path, **kwargs)
-
- def put(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
- return self.request("PUT", path, **kwargs)
-
- def post(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
- return self.request("POST", path, **kwargs)
-
- def delete(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
- return self.request("DELETE", path, **kwargs)
-
- def head(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]:
- return self.request("HEAD", path, **kwargs)
diff --git a/src/couch/utils.py b/src/couch/utils.py
deleted file mode 100644
index b3e5aa3..0000000
--- a/src/couch/utils.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# -*- coding: utf-8 -*-
-# Based on py-couchdb (https://github.com/histrio/py-couchdb)
-
-from typing import Tuple, Union, Dict, List, Any
-import json
-import sys
-
-import requests
-from urllib.parse import unquote as _unquote
-from urllib.parse import urlunsplit, urlsplit
-
-from functools import reduce
-
-URLSPLITTER = '/'
-
-
-json_encoder = json.JSONEncoder()
-
-
-def extract_credentials(url: str) -> Tuple[str, Union[Tuple[str, str], None]]:
- """
- Extract authentication (user name and password) credentials from the
- given URL.
-
- >>> extract_credentials('http://localhost:5984/_config/')
- ('http://localhost:5984/_config/', None)
- >>> extract_credentials('http://joe:secret@localhost:5984/_config/')
- ('http://localhost:5984/_config/', ('joe', 'secret'))
- >>> extract_credentials('http://joe%40example.com:secret@'
- ... 'localhost:5984/_config/')
- ('http://localhost:5984/_config/', ('joe@example.com', 'secret'))
- """
- parts = urlsplit(url)
- netloc = parts[1]
- if '@' in netloc:
- creds, netloc = netloc.split('@')
- credentials = tuple(_unquote(i) for i in creds.split(':'))
- parts_list = list(parts)
- parts_list[1] = netloc
- return urlunsplit(parts_list), (credentials[0], credentials[1])
-
- parts_list = list(parts)
- return urlunsplit(parts_list), None
-
-
-def _join(head: str, tail: str) -> str:
- parts = [head.rstrip(URLSPLITTER), tail.lstrip(URLSPLITTER)]
- return URLSPLITTER.join(parts)
-
-
-def urljoin(base: str, *path: str) -> str:
- """
- Assemble a uri based on a base, any number of path segments, and query
- string parameters.
-
- >>> urljoin('http://example.org', '_all_dbs')
- 'http://example.org/_all_dbs'
-
- A trailing slash on the uri base is handled gracefully:
-
- >>> urljoin('http://example.org/', '_all_dbs')
- 'http://example.org/_all_dbs'
-
- And multiple positional arguments become path parts:
-
- >>> urljoin('http://example.org/', 'foo', 'bar')
- 'http://example.org/foo/bar'
-
- >>> urljoin('http://example.org/', 'foo/bar')
- 'http://example.org/foo/bar'
-
- >>> urljoin('http://example.org/', 'foo', '/bar/')
- 'http://example.org/foo/bar/'
-
- >>> urljoin('http://example.com', 'org.couchdb.user:username')
- 'http://example.com/org.couchdb.user:username'
- """
- return reduce(_join, path, base)
-
-# Probably bugs here
-def as_json(response: requests.models.Response) -> Union[Dict[str, Any], None, str]:
- if "application/json" in response.headers['content-type']:
- response_src = response.content.decode('utf-8')
- print(response.content)
- if response.content != b'':
- ret: Dict[str, Any] = json.loads(response_src)
- return ret
- else:
- print("fff")
- print("fff")
- print(type(response_src))
- return response_src
- return None
-
-def _path_from_name(name: str, type: str) -> List[str]:
- """
- Expand a 'design/foo' style name to its full path as a list of
- segments.
-
- >>> _path_from_name("_design/test", '_view')
- ['_design', 'test']
- >>> _path_from_name("design/test", '_view')
- ['_design', 'design', '_view', 'test']
- """
- if name.startswith('_'):
- return name.split('/')
- design, name = name.split('/', 1)
- return ['_design', design, type, name]
-
-
-def encode_view_options(options: Dict[str, Any]) -> Dict[str, Any]:
- """
- Encode any items in the options dict that are sent as a JSON string to a
- view/list function.
-
- >>> opts = {'key': 'foo', "notkey":"bar"}
- >>> res = encode_view_options(opts)
- >>> res["key"], res["notkey"]
- ('"foo"', 'bar')
-
- >>> opts = {'startkey': 'foo', "endkey":"bar"}
- >>> res = encode_view_options(opts)
- >>> res['startkey'], res['endkey']
- ('"foo"', '"bar"')
- """
- retval = {}
-
- for name, value in options.items():
- if name in ('key', 'startkey', 'endkey'):
- value = json_encoder.encode(value)
- retval[name] = value
- return retval
-
-
-def force_bytes(data: Union[str, bytes], encoding: str = "utf-8") -> bytes:
- if isinstance(data, str):
- data = data.encode(encoding)
- return data
-
-
-def force_text(data: Union[str, bytes], encoding: str = "utf-8") -> str:
- if isinstance(data, bytes):
- data = data.decode(encoding)
- return data
diff --git a/src/quickstart_test.sh b/src/quickstart_test.sh
deleted file mode 100755
index 9254271..0000000
--- a/src/quickstart_test.sh
+++ /dev/null
@@ -1,67 +0,0 @@
-# Usage: ./quickstart_test.sh [-v] [-c] [-- <args to pytest>]
-
-export COUCHDB_NAME=unittest
-export COUCHDB_HOSTNAME=localhost
-export COUCHDB_USER=test
-export COUCHDB_PASSWORD=test
-
-export DOCKER_JWT_PUBKEY_PATH="`pwd`/test/unittest_cert/"
-export JWT_PUBKEY_PATH="`pwd`/test/unittest_cert/public.pem"
-
-virtualenv=no
-couchdb=no
-
-while getopts ":vc" flag
-do
- case "$flag" in
- v) virtualenv=yes;;
- c) couchdb=yes;;
- esac
-done
-
-if [ -d test/unittest_cert ]; then
- rm -r test/unittest_cert
-fi
-
-if [ $virtualenv == "yes" ]; then
- shift
- if [ -d test/unittest_venv ]; then
- rm -r test/unittest_venv
- fi
-
- virtualenv test/unittest_venv
- source test/unittest_venv/bin/activate
- pip3 install -r ../requirements.txt
-fi
-
-if [ $couchdb == "yes" ]; then
- shift
- docker run -it -p 6123:5984 --rm -d --name unittest_couchdb -e COUCHDB_USER=$COUCHDB_USER -e COUCHDB_PASSWORD=$COUCHDB_PASSWORD couchdb
-
- docker inspect unittest_couchdb > /dev/null
-
- if (( $? != 0 )); then
- echo "Failed to start CouchDB container."
- exit
- fi
-
- export COUCHDB_PORT=6123
-fi
-
-mkdir test/unittest_cert
-
-cat <<EOF > test/unittest_cert/public.pem
------BEGIN PUBLIC KEY-----
-MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEGHX8ipqVWtr49TXyX0f/L4GPhEpg
-N0Erzy7hHkXVrkgKpnHSRLYWgbW4rscLoJAJeEv7Be5iH0TM8l09w8Q3wQ==
------END PUBLIC KEY-----
-EOF
-
-shift
-pytest --capture=tee-sys "$@"
-
-rm -r test/unittest_cert
-
-if [ $couchdb == "yes" ]; then
- docker kill unittest_couchdb
-fi