diff options
Diffstat (limited to 'src')
-rw-r--r--[-rwxr-xr-x] | src/collector/db.py | 185 | ||||
-rwxr-xr-x | src/collector/main.py | 332 | ||||
-rw-r--r-- | src/collector/schema.py | 57 | ||||
-rw-r--r-- | src/couch/__init__.py | 11 | ||||
-rw-r--r-- | src/couch/client.py | 801 | ||||
-rw-r--r-- | src/couch/exceptions.py | 38 | ||||
-rw-r--r-- | src/couch/feedreader.py | 52 | ||||
-rw-r--r-- | src/couch/resource.py | 139 | ||||
-rw-r--r-- | src/couch/utils.py | 144 | ||||
-rwxr-xr-x | src/quickstart_test.sh | 67 |
10 files changed, 172 insertions, 1654 deletions
diff --git a/src/collector/db.py b/src/collector/db.py index 0bfa014..3b16ef5 100755..100644 --- a/src/collector/db.py +++ b/src/collector/db.py @@ -1,148 +1,39 @@ -# A database storing dictionaries, keyed on a timestamp. value = A -# dict which will be stored as a JSON object encoded in UTF-8. Note -# that dict keys of type integer or float will become strings while -# values will keep their type. - -# Note that there's a (slim) chance that you'd stomp on the previous -# value if you're too quick with generating the timestamps, ie -# invoking time.time() several times quickly enough. - -from typing import Dict, List, Tuple, Union, Any -import os -import sys -import time - -from src import couch -from .schema import as_index_list, validate_collector_data - - -class DictDB: - def __init__(self) -> None: - """ - Check if the database exists, otherwise we will create it together - with the indexes specified in index.py. - """ - - print(os.environ) - - try: - self.database = os.environ["COUCHDB_NAME"] - self.hostname = os.environ["COUCHDB_HOSTNAME"] - self.username = os.environ["COUCHDB_USER"] - self.password = os.environ["COUCHDB_PASSWORD"] - except KeyError: - print( - "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME," - + " COUCHDB_USER and COUCHDB_PASSWORD must be set." - ) - sys.exit(-1) - - if "COUCHDB_PORT" in os.environ: - couchdb_port = os.environ["COUCHDB_PORT"] - else: - couchdb_port = "5984" - - self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") - - try: - self.couchdb = self.server.database(self.database) - print("Database already exists") - except couch.exceptions.NotFound: - print("Creating database and indexes.") - self.couchdb = self.server.create(self.database) - - for i in as_index_list(): - self.couchdb.index(i) - - self._ts = time.time() - - def unique_key(self) -> int: - """ - Create a unique key based on the current time. We will use this as - the ID for any new documents we store in CouchDB. - """ - - ts = time.time() - while round(ts * 1000) == self._ts: - ts = time.time() - self._ts = round(ts * 1000) - - return self._ts - - # Why batch_write??? - def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]: - """ - Store a document in CouchDB. - """ - - if isinstance(data, List): - for item in data: - error = validate_collector_data(item) - if error != "": - return error - item["_id"] = str(self.unique_key()) - ret: Tuple[str, str] = self.couchdb.save_bulk(data) +"""Our database module""" +from time import sleep +from sys import exit as app_exit +from dataclasses import dataclass + +from motor.motor_asyncio import ( + AsyncIOMotorClient, + AsyncIOMotorCollection, +) +from bson import ObjectId + + +@dataclass() +class DBClient: + """Class to hold database connections for us.""" + + client: AsyncIOMotorClient + collection: AsyncIOMotorCollection + + def __init__(self, username: str, password: str, collection: str) -> None: + self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000) + self.collection = self.client["production"][collection] + + async def check_server(self) -> None: + """Try query the DB and exit the program if we fail after 5 times. + + :return: None + """ + for i in range(5): + try: + await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")}) + print("Connection to DB - OK") + break + except: # pylint: disable=bare-except + print(f"WARNING failed to connect to DB - {i} / 4", flush=True) + sleep(1) else: - error = validate_collector_data(data) - if error != "": - return error - data["_id"] = str(self.unique_key()) - ret = self.couchdb.save(data) - - return ret - - def get(self, key: int) -> Dict[str, Any]: - """ - Get a document based on its ID, return an empty dict if not found. - """ - - try: - doc: Dict[str, Any] = self.couchdb.get(key) - except couch.exceptions.NotFound: - doc = {} - - return doc - - # - # def slice(self, key_from=None, key_to=None): - # pass - - def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]: - """ - Execute a Mango query, ideally we should have an index matching - the query otherwise things will be slow. - """ - - data: List[Dict[str, Any]] = [] - selector: Dict[str, Any] = {} - - try: - limit = int(limit) - skip = int(skip) - except ValueError: - limit = 25 - skip = 0 - - if kwargs: - selector = {"limit": limit, "skip": skip, "selector": {}} - - for key in kwargs: - if kwargs[key] and kwargs[key].isnumeric(): - kwargs[key] = int(kwargs[key]) - selector["selector"][key] = {"$eq": kwargs[key]} - - for doc in self.couchdb.find(selector, wrapper=None, limit=5): - data.append(doc) - - return data - - def delete(self, key: int) -> Union[int, None]: - """ - Delete a document based on its ID. - """ - try: - self.couchdb.delete(key) - except couch.exceptions.NotFound: - return None - - return key + print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production") + app_exit(1) diff --git a/src/collector/main.py b/src/collector/main.py index c363885..096b788 100755 --- a/src/collector/main.py +++ b/src/collector/main.py @@ -1,267 +1,175 @@ -from typing import Dict, Union, List, Callable, Awaitable, Any -import json -import os +"""Our main module""" +from typing import Dict, Optional, List, Any +from os import environ +import asyncio import sys -import time +from json.decoder import JSONDecodeError -import uvicorn -from fastapi import Depends, FastAPI, Request, Response -from fastapi.middleware.cors import CORSMiddleware +from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from fastapi_jwt_auth import AuthJWT -from fastapi_jwt_auth.auth_config import AuthConfig -from fastapi_jwt_auth.exceptions import AuthJWTException from pydantic import BaseModel - -from .db import DictDB -from .schema import get_index_keys, validate_collector_data - -app = FastAPI() - -app.add_middleware( - CORSMiddleware, - allow_origins=["http://localhost:8001"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - expose_headers=["X-Total-Count"], +from pymongo.errors import OperationFailure +from bson import ( + ObjectId, + json_util, ) +from dotenv import load_dotenv -# TODO: X-Total-Count - - -@app.middleware("http") -async def mock_x_total_count_header(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response: - - print(type(call_next)) - - response: Response = await call_next(request) - response.headers["X-Total-Count"] = "100" - return response - - -for i in range(10): - try: - db = DictDB() - except Exception as e: - print(f"Database not responding, will try again soon. Attempt {i + 1} of 10.") - else: - break - time.sleep(1) -else: - print("Database did not respond after 10 attempts, quitting.") - sys.exit(-1) - - -def get_pubkey() -> str: - try: - if "JWT_PUBKEY_PATH" in os.environ: - keypath = os.environ["JWT_PUBKEY_PATH"] - else: - keypath = "/opt/certs/public.pem" - - with open(keypath, "r") as fd: - pubkey = fd.read() - except FileNotFoundError: - print(f"Could not find JWT certificate in {keypath}") - sys.exit(-1) - - return pubkey - - -def get_data( - key: Union[int, None] = None, - limit: int = 25, - skip: int = 0, - ip: Union[str, None] = None, - port: Union[int, None] = None, - asn: Union[str, None] = None, - domain: Union[str, None] = None, -) -> List[Dict[str, Any]]: - if key: - return [db.get(key)] +from .db import DBClient +from .schema import valid_schema - selectors: Dict[str, Any] = {} - indexes = get_index_keys() - selectors["domain"] = domain - if ip and "ip" in indexes: - selectors["ip"] = ip - if port and "port" in indexes: - selectors["port"] = port - if asn and "asn" in indexes: - selectors["asn"] = asn +load_dotenv() +# Get credentials +if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ: + print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env") + sys.exit(1) - data: List[Dict[str, Any]] = db.search(**selectors, limit=limit, skip=skip) +# Create DB object +db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"]) - return data - - -class JWTConfig(BaseModel): - authjwt_algorithm: str = "ES256" - authjwt_public_key: str = get_pubkey() +# Check DB +loop = asyncio.get_running_loop() +startup_task = loop.create_task(db.check_server()) +app = FastAPI() -@AuthJWT.load_config # type: ignore -def jwt_config(): - return JWTConfig() +# @app.exception_handler(RuntimeError) +# def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse: +# print(exc, flush=True) +# return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400) +# return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400) -@app.exception_handler(AuthJWTException) -def authjwt_exception_handler(request: Request, exc: AuthJWTException) -> JSONResponse: - return JSONResponse(content={"status": "error", "message": exc.message}, status_code=400) +class SearchInput(BaseModel): + """Handle search data for HTTP request""" -@app.exception_handler(RuntimeError) -def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse: - return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400) + search: Optional[Dict[str, Any]] + limit: int = 25 + skip: int = 0 -@app.get("/sc/v0/get") -async def get( - key: Union[int, None] = None, - limit: int = 25, - skip: int = 0, - ip: Union[str, None] = None, - port: Union[int, None] = None, - asn: Union[str, None] = None, - Authorize: AuthJWT = Depends(), -) -> JSONResponse: +@app.post("/sc/v0/search") +async def search(search_data: SearchInput) -> JSONResponse: + """/sc/v0/search, POST method - Authorize.jwt_required() + :param search_data: The search data. + :return: JSONResponse + """ + data: List[Dict[str, Any]] = [] - data = [] - raw_jwt = Authorize.get_raw_jwt() + cursor = db.collection.find(search_data.search) + cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) - if "read" not in raw_jwt: + try: + async for document in cursor: + data.append(document) + except OperationFailure as exc: + print(f"DB failed to process: {exc.details}") return JSONResponse( content={ "status": "error", - "message": "Could not find read claim in JWT token", + "message": "Probably wrong syntax, note the dictionary for find: " + + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", }, status_code=400, ) - else: - domains = raw_jwt["read"] - for domain in domains: - data.extend(get_data(key, limit, skip, ip, port, asn, domain)) + if not data: + return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) - return JSONResponse(content={"status": "success", "docs": data}) + return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)}) -@app.get("/sc/v0/get/{key}") -async def get_key(key: Union[int, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse: +@app.post("/sc/v0") +async def create(request: Request) -> JSONResponse: + """/sc/v0, POST method - Authorize.jwt_required() + :param request: The request where we get the json body. + :return: JSONResponse + """ - raw_jwt = Authorize.get_raw_jwt() + try: + json_data = await request.json() + except JSONDecodeError: + return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400) - if "read" not in raw_jwt: - return JSONResponse( - content={ - "status": "error", - "message": "Could not find read claim in JWT token", - }, - status_code=400, - ) - else: - allowed_domains = raw_jwt["read"] + if not valid_schema(json_data): + return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400) - data_list = get_data(key) + result = await db.collection.insert_one(json_data) + return JSONResponse(content={"status": "success", "key": str(result.inserted_id)}) - # Handle if missing - data = data_list[0] - if data and data["domain"] not in allowed_domains: - return JSONResponse( - content={ - "status": "error", - "message": "User not authorized to view this object", - }, - status_code=400, - ) - - return JSONResponse(content={"status": "success", "docs": data}) +@app.put("/sc/v0") +async def update(request: Request) -> JSONResponse: + """/sc/v0, PUT method - -# WHY IS AUTH OUTCOMMENTED??? -@app.post("/sc/v0/add") -async def add(data: Request, Authorize: AuthJWT = Depends()) -> JSONResponse: - # Authorize.jwt_required() + :param request: The request where we get the json body. + :return: JSONResponse + """ try: - json_data = await data.json() - except json.decoder.JSONDecodeError: - return JSONResponse( - content={ - "status": "error", - "message": "Invalid JSON.", - }, - status_code=400, - ) - - key = db.add(json_data) - - if isinstance(key, str): - return JSONResponse( - content={ - "status": "error", - "message": key, - }, - status_code=400, - ) - - return JSONResponse(content={"status": "success", "docs": key}) - + json_data = await request.json() + except JSONDecodeError: + return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400) + + if "_id" not in json_data: + return JSONResponse(content={"status": "error", "message": "Missing key '_id'"}, status_code=400) + + # Get the key + if isinstance(json_data["_id"], str): + object_id = ObjectId(json_data["_id"]) + elif ( + isinstance(json_data["_id"], dict) and "$oid" in json_data["_id"] and isinstance(json_data["_id"]["$oid"], str) + ): + object_id = ObjectId(json_data["_id"]["$oid"]) + else: + return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400) -@app.delete("/sc/v0/delete/{key}") -async def delete(key: int, Authorize: AuthJWT = Depends()) -> JSONResponse: + # Ensure the updating key exist + document = await db.collection.find_one({"_id": object_id}) + if document is None: + return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) - Authorize.jwt_required() + # Ensure valid schema + del json_data["_id"] + if not valid_schema(json_data): + return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400) - raw_jwt = Authorize.get_raw_jwt() + # Replace the data + json_data["_id"] = object_id + await db.collection.replace_one({"_id": object_id}, json_data) + return JSONResponse(content={"status": "success", "key": str(object_id)}) - if "write" not in raw_jwt: - return JSONResponse( - content={ - "status": "error", - "message": "Could not find write claim in JWT token", - }, - status_code=400, - ) - else: - allowed_domains = raw_jwt["write"] - data_list = get_data(key) +@app.get("/sc/v0/{key}") +async def get(key: str) -> JSONResponse: + """/sc/v0, POST method - # Handle if missing - data = data_list[0] + :param key: The document key in the database. + :return: JSONResponse + """ - if data and data["domain"] not in allowed_domains: - return JSONResponse( - content={ - "status": "error", - "message": "User not authorized to delete this object", - }, - status_code=400, - ) + document = await db.collection.find_one({"_id": ObjectId(key)}) - if db.delete(key) is None: + if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) - return JSONResponse(content={"status": "success", "docs": data}) + return JSONResponse(content={"status": "success", "docs": json_util.dumps(document)}) -# def main(standalone: bool = False): -# print(type(app)) -# if not standalone: -# return app +@app.delete("/sc/v0/{key}") +async def delete(key: str) -> JSONResponse: + """/sc/v0, POST method -# uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug") + :param key: The document key in the database. + :return: JSONResponse + """ + result = await db.collection.delete_one({"_id": ObjectId(key)}) + if result.deleted_count == 0: + return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) -# if __name__ == "__main__": -# main(standalone=True) -# else: -# app = main() + return JSONResponse(content={"status": "success", "key": key}) diff --git a/src/collector/schema.py b/src/collector/schema.py index e291f10..221990a 100644 --- a/src/collector/schema.py +++ b/src/collector/schema.py @@ -1,8 +1,5 @@ -from typing import List, Any, Dict -import json -import sys -import traceback - +"""Our schema module""" +from typing import Any, Dict import jsonschema # fmt:off @@ -64,7 +61,8 @@ schema = { ] }, { - "required": [ + "required": + [ "display_name", "investigation_needed", # "reliability", # TODO: reliability is required if investigation_needed = true @@ -93,44 +91,17 @@ schema = { "result", ], } -# fmt:on - - -def get_index_keys() -> List[str]: - keys: List[str] = [] - for key in schema["properties"]: - keys.append(key) - return keys -def as_index_list() -> List[Dict[str, Any]]: - index_list: List[Dict[str, Any]] = [] - for key in schema["properties"]: - name = f"{key}-json-index" - index = { - "index": { - "fields": [ - key, - ] - }, - "name": name, - "type": "json", - } - index_list.append(index) - - return index_list +def valid_schema(json_data: Dict[str, Any]) -> bool: + """Check if json data follows the schema. - -def validate_collector_data(json_blob: Dict[str, Any]) -> str: + :param json_data: Json object + :return: bool + """ try: - jsonschema.validate(json_blob, schema, format_checker=jsonschema.FormatChecker()) - except jsonschema.exceptions.ValidationError as e: - return f"Validation failed with error: {e.message}" - return "" - - -if __name__ == "__main__": - with open(sys.argv[1]) as fd: - json_data = json.loads(fd.read()) - - print(validate_collector_data(json_data)) + jsonschema.validate(json_data, schema, format_checker=jsonschema.FormatChecker()) + except jsonschema.exceptions.ValidationError as exc: + print(f"Validation failed with error: {exc.message}") + return False + return True diff --git a/src/couch/__init__.py b/src/couch/__init__.py deleted file mode 100644 index 64e0252..0000000 --- a/src/couch/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -# -*- coding: utf-8 -*- - -__author__ = "Andrey Antukh" -__license__ = "BSD" -__version__ = "1.14.1" -__maintainer__ = "Rinat Sabitov" -__email__ = "rinat.sabitov@gmail.com" -__status__ = "Development" - - -from .client import Server # noqa: F401 diff --git a/src/couch/client.py b/src/couch/client.py deleted file mode 100644 index 96dc78a..0000000 --- a/src/couch/client.py +++ /dev/null @@ -1,801 +0,0 @@ -# -*- coding: utf-8 -*- -# Based on py-couchdb (https://github.com/histrio/py-couchdb) - -import os -import json -import uuid -import copy -import mimetypes -import warnings - -from .utils import ( - force_bytes, - force_text, - encode_view_options, - extract_credentials, -) -from .feedreader import ( - SimpleFeedReader, - BaseFeedReader, -) - -from .exceptions import ( - Conflict, - NotFound, - FeedReaderExited, - UnexpectedError, -) -from .resource import Resource - - -DEFAULT_BASE_URL = os.environ.get('COUCHDB_URL', 'http://localhost:5984/') - - -def _id_to_path(_id: str) -> str: - if _id[:1] == "_": - return _id.split("/", 1) - return [_id] - - -def _listen_feed(object, node, feed_reader, **kwargs): - if not callable(feed_reader): - raise UnexpectedError("feed_reader must be callable or class") - - if isinstance(feed_reader, BaseFeedReader): - reader = feed_reader(object) - else: - reader = SimpleFeedReader()(object, feed_reader) - - # Possible options: "continuous", "longpoll" - kwargs.setdefault("feed", "continuous") - data = force_bytes(json.dumps(kwargs.pop('data', {}))) - - (resp, result) = object.resource(node).post( - params=kwargs, data=data, stream=True) - try: - for line in resp.iter_lines(): - # ignore heartbeats - if not line: - reader.on_heartbeat() - else: - reader.on_message(json.loads(force_text(line))) - except FeedReaderExited: - reader.on_close() - - -class _StreamResponse(object): - """ - Proxy object for python-requests stream response. - - See more on: - http://docs.python-requests.org/en/latest/user/advanced/#streaming-requests - """ - - def __init__(self, response): - self._response = response - - def iter_content(self, chunk_size=1, decode_unicode=False): - return self._response.iter_content(chunk_size=chunk_size, - decode_unicode=decode_unicode) - - def iter_lines(self, chunk_size=512, decode_unicode=None): - return self._response.iter_lines(chunk_size=chunk_size, - decode_unicode=decode_unicode) - - @property - def raw(self): - return self._response.raw - - @property - def url(self): - return self._response.url - - -class Server(object): - """ - Class that represents a couchdb connection. - - :param verify: setup ssl verification. - :param base_url: a full url to couchdb (can contain auth data). - :param full_commit: If ``False``, couchdb not commits all data on a - request is finished. - :param authmethod: specify a authentication method. By default "basic" - method is used but also exists "session" (that requires - some server configuration changes). - - .. versionchanged: 1.4 - Set basic auth method as default instead of session method. - - .. versionchanged: 1.5 - Add verify parameter for setup ssl verificaton - - """ - - def __init__(self, base_url=DEFAULT_BASE_URL, full_commit=True, - authmethod="basic", verify=False): - - self.base_url, credentials = extract_credentials(base_url) - self.resource = Resource(self.base_url, full_commit, - credentials=credentials, - authmethod=authmethod, - verify=verify) - - def __repr__(self): - return '<CouchDB Server "{}">'.format(self.base_url) - - def __contains__(self, name): - try: - self.resource.head(name) - except NotFound: - return False - else: - return True - - def __iter__(self): - (r, result) = self.resource.get('_all_dbs') - return iter(result) - - def __len__(self): - (r, result) = self.resource.get('_all_dbs') - return len(result) - - def info(self): - """ - Get server info. - - :returns: dict with all data that couchdb returns. - :rtype: dict - """ - (r, result) = self.resource.get() - return result - - def delete(self, name): - """ - Delete some database. - - :param name: database name - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a database does not exists - """ - - self.resource.delete(name) - - def database(self, name): - """ - Get a database instance. - - :param name: database name - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a database does not exists - - :returns: a :py:class:`~pycouchdb.client.Database` instance - """ - (r, result) = self.resource.head(name) - if r.status_code == 404: - raise NotFound("Database '{0}' does not exists".format(name)) - - db = Database(self.resource(name), name) - return db - - # TODO: Config in 2.0 are applicable for nodes only - # TODO: Reimplement when nodes endpoint will be ready - # def config(self): - # pass - - def version(self): - """ - Get the current version of a couchdb server. - """ - (resp, result) = self.resource.get() - return result["version"] - - # TODO: Stats in 2.0 are applicable for nodes only - # TODO: Reimplement when nodes endpoint will be ready - # def stats(self, name=None): - # pass - - def create(self, name): - """ - Create a database. - - :param name: database name - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` - if a database already exists - :returns: a :py:class:`~pycouchdb.client.Database` instance - """ - (resp, result) = self.resource.put(name) - if resp.status_code in (200, 201): - return self.database(name) - - def replicate(self, source, target, **kwargs): - """ - Replicate the source database to the target one. - - .. versionadded:: 1.3 - - :param source: full URL to the source database - :param target: full URL to the target database - """ - - data = {'source': source, 'target': target} - data.update(kwargs) - - data = force_bytes(json.dumps(data)) - - (resp, result) = self.resource.post('_replicate', data=data) - return result - - def changes_feed(self, feed_reader, **kwargs): - """ - Subscribe to changes feed of the whole CouchDB server. - - Note: this method is blocking. - - - :param feed_reader: callable or :py:class:`~BaseFeedReader` - instance - - .. [Ref] http://docs.couchdb.org/en/1.6.1/api/server/common.html#db-updates - .. versionadded: 1.10 - """ - object = self - _listen_feed(object, "_db_updates", feed_reader, **kwargs) - - -class Database(object): - """ - Class that represents a couchdb database. - """ - - def __init__(self, resource, name): - self.resource = resource - self.name = name - - def __repr__(self): - return '<CouchDB Database "{}">'.format(self.name) - - def __contains__(self, doc_id): - try: - (resp, result) = self.resource.head(_id_to_path(doc_id)) - return resp.status_code < 206 - except NotFound: - return False - - def config(self): - """ - Get database status data such as document count, update sequence etc. - :return: dict - """ - (resp, result) = self.resource.get() - return result - - def __nonzero__(self): - """Is the database available""" - resp, _ = self.resource.head() - return resp.status_code == 200 - - def __len__(self): - return self.config()['doc_count'] - - def delete(self, doc_or_id): - """ - Delete document by id. - - .. versionchanged:: 1.2 - Accept document or id. - - :param doc_or_id: document or id - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document - not exists - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if delete with - wrong revision. - """ - - _id = None - if isinstance(doc_or_id, dict): - if "_id" not in doc_or_id: - raise ValueError("Invalid document, missing _id attr") - _id = doc_or_id['_id'] - else: - _id = doc_or_id - - resource = self.resource(*_id_to_path(_id)) - - (r, result) = resource.head() - (r, result) = resource.delete( - params={"rev": r.headers["etag"].strip('"')}) - - def delete_bulk(self, docs, transaction=True): - """ - Delete a bulk of documents. - - .. versionadded:: 1.2 - - :param docs: list of docs - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if a delete - is not success - :returns: raw results from server - """ - - _docs = copy.copy(docs) - for doc in _docs: - if "_deleted" not in doc: - doc["_deleted"] = True - - data = force_bytes(json.dumps({"docs": _docs})) - params = {"all_or_nothing": "true" if transaction else "false"} - (resp, results) = self.resource.post( - "_bulk_docs", data=data, params=params) - - for result, doc in zip(results, _docs): - if "error" in result: - raise Conflict("one or more docs are not saved") - - return results - - def get(self, doc_id, params=None, **kwargs): - """ - Get a document by id. - - .. versionadded: 1.5 - Now the prefered method to pass params is via **kwargs - instead of params argument. **params** argument is now - deprecated and will be deleted in future versions. - - :param doc_id: document id - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` if a document - not exists - - :returns: document (dict) - """ - - if params: - warnings.warn("params parameter is now deprecated in favor to" - "**kwargs usage.", DeprecationWarning) - - if params is None: - params = {} - - params.update(kwargs) - - (resp, result) = self.resource(*_id_to_path(doc_id)).get(params=params) - return result - - def save(self, doc, batch=False): - """ - Save or update a document. - - .. versionchanged:: 1.2 - Now returns a new document instead of modify the original. - - :param doc: document - :param batch: allow batch=ok inserts (default False) - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` if save with wrong - revision. - :returns: doc - """ - - _doc = copy.copy(doc) - if "_id" not in _doc: - _doc['_id'] = uuid.uuid4().hex - - if batch: - params = {'batch': 'ok'} - else: - params = {} - - data = force_bytes(json.dumps(_doc)) - - print("gg1", flush=True) - print(data, flush=True) - print("vv1", flush=True) - - (resp, result) = self.resource(_doc['_id']).put( - data=data, params=params) - - print("gg3", flush=True) - print(resp.status_code) - print(resp.content) - #print(resp.contents) - - print("gg2", flush=True) - print(data, flush=True) - print(result, flush=True) - print("vv2", flush=True) - - if resp.status_code == 409: - raise Conflict(result['reason']) - - if "rev" in result and result["rev"] is not None: - _doc["_rev"] = result["rev"] - - return _doc - - def save_bulk(self, docs, try_setting_ids=True, transaction=True): - """ - Save a bulk of documents. - - .. versionchanged:: 1.2 - Now returns a new document list instead of modify the original. - - :param docs: list of docs - :param try_setting_ids: if ``True``, we loop through docs and generate/set - an id in each doc if none exists - :param transaction: if ``True``, couchdb do a insert in transaction - model. - :returns: docs - """ - - _docs = copy.deepcopy(docs) - - # Insert _id field if it not exists and try_setting_ids is true - if try_setting_ids: - for doc in _docs: - if "_id" not in doc: - doc["_id"] = uuid.uuid4().hex - - data = orce_bytes(json.dumps({"docs": _docs})) - params = {"all_or_nothing": "true" if transaction else "false"} - - (resp, results) = self.resource.post("_bulk_docs", data=data, - params=params) - - for result, doc in zip(results, _docs): - if "rev" in result: - doc['_rev'] = result['rev'] - - return _docs - - def all(self, wrapper=None, flat=None, as_list=False, **kwargs): - """ - Execute a builtin view for get all documents. - - :param wrapper: wrap result into a specific class. - :param as_list: return a list of results instead of a - default lazy generator. - :param flat: get a specific field from a object instead - of a complete object. - - .. versionadded: 1.4 - Add as_list parameter. - Add flat parameter. - - :returns: generator object - """ - - params = {"include_docs": "true"} - params.update(kwargs) - - data = None - - if "keys" in params: - data = {"keys": params.pop("keys")} - data = force_bytes(json.dumps(data)) - - params = encode_view_options(params) - if data: - (resp, result) = self.resource.post( - "_all_docs", params=params, data=data) - else: - (resp, result) = self.resource.get("_all_docs", params=params) - - if wrapper is None: - def wrapper(doc): return doc - - if flat is not None: - def wrapper(doc): return doc[flat] - - def _iterate(): - for row in result["rows"]: - yield wrapper(row) - - if as_list: - return list(_iterate()) - return _iterate() - - def cleanup(self): - """ - Execute a cleanup operation. - """ - (r, result) = self.resource('_view_cleanup').post() - return result - - def commit(self): - """ - Send commit message to server. - """ - (resp, result) = self.resource.post('_ensure_full_commit') - return result - - def compact(self): - """ - Send compact message to server. Compacting write-heavy databases - should be avoided, otherwise the process may not catch up with - the writes. Read load has no effect. - """ - (r, result) = self.resource("_compact").post() - return result - - def compact_view(self, ddoc): - """ - Execute compact over design view. - - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a view does not exists. - """ - (r, result) = self.resource("_compact", ddoc).post() - return result - - def revisions(self, doc_id, status='available', params=None, **kwargs): - """ - Get all revisions of one document. - - :param doc_id: document id - :param status: filter of revision status, set empty to list all - :raises: :py:exc:`~pycouchdb.exceptions.NotFound` - if a view does not exists. - - :returns: generator object - """ - if params: - warnings.warn("params parameter is now deprecated in favor to" - "**kwargs usage.", DeprecationWarning) - - if params is None: - params = {} - - params.update(kwargs) - - if not params.get('revs_info'): - params['revs_info'] = 'true' - - resource = self.resource(doc_id) - (resp, result) = resource.get(params=params) - if resp.status_code == 404: - raise NotFound("Document id `{0}` not found".format(doc_id)) - - for rev in result['_revs_info']: - if status and rev['status'] == status: - yield self.get(doc_id, rev=rev['rev']) - elif not status: - yield self.get(doc_id, rev=rev['rev']) - - def delete_attachment(self, doc, filename): - """ - Delete attachment by filename from document. - - .. versionchanged:: 1.2 - Now returns a new document instead of modify the original. - - :param doc: document dict - :param filename: name of attachment. - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` - if save with wrong revision. - :returns: doc - """ - - _doc = copy.deepcopy(doc) - resource = self.resource(_doc['_id']) - - (resp, result) = resource.delete( - filename, params={'rev': _doc['_rev']}) - if resp.status_code == 404: - raise NotFound("filename {0} not found".format(filename)) - - if resp.status_code > 205: - raise Conflict(result['reason']) - - _doc['_rev'] = result['rev'] - try: - del _doc['_attachments'][filename] - - if not _doc['_attachments']: - del _doc['_attachments'] - except KeyError: - pass - - return _doc - - def get_attachment(self, doc, filename, stream=False, **kwargs): - """ - Get attachment by filename from document. - - :param doc: document dict - :param filename: attachment file name. - :param stream: setup streaming output (default: False) - - .. versionchanged: 1.5 - Add stream parameter for obtain very large attachments - without load all file to the memory. - - :returns: binary data or - """ - - params = {"rev": doc["_rev"]} - params.update(kwargs) - - r, result = self.resource(doc['_id']).get(filename, stream=stream, - params=params) - if stream: - return _StreamResponse(r) - - return r.content - - def put_attachment(self, doc, content, filename=None, content_type=None): - """ - Put a attachment to a document. - - .. versionchanged:: 1.2 - Now returns a new document instead of modify the original. - - :param doc: document dict. - :param content: the content to upload, either a file-like object or - bytes - :param filename: the name of the attachment file; if omitted, this - function tries to get the filename from the file-like - object passed as the `content` argument value - :raises: :py:exc:`~pycouchdb.exceptions.Conflict` - if save with wrong revision. - :raises: ValueError - :returns: doc - """ - - if filename is None: - if hasattr(content, 'name'): - filename = os.path.basename(content.name) - else: - raise ValueError('no filename specified for attachment') - - if content_type is None: - content_type = ';'.join( - filter(None, mimetypes.guess_type(filename))) - - headers = {"Content-Type": content_type} - resource = self.resource(doc['_id']) - - (resp, result) = resource.put( - filename, data=content, params={'rev': doc['_rev']}, headers=headers) - - if resp.status_code < 206: - return self.get(doc["_id"]) - - raise Conflict(result['reason']) - - def one(self, name, flat=None, wrapper=None, **kwargs): - """ - Execute a design document view query and returns a first - result. - - :param name: name of the view (eg: docidname/viewname). - :param wrapper: wrap result into a specific class. - :param flat: get a specific field from a object instead - of a complete object. - - .. versionadded: 1.4 - - :returns: object or None - """ - - params = {"limit": 1} - params.update(kwargs) - - path = _path_from_name(name, '_view') - data = None - - if "keys" in params: - data = {"keys": params.pop('keys')} - - if data: - data = force_bytes(json.dumps(data)) - - params = encode_view_options(params) - result = list(self._query(self.resource(*path), wrapper=wrapper, - flat=flat, params=params, data=data)) - - return result[0] if len(result) > 0 else None - - def _query(self, resource, data=None, params=None, headers=None, - flat=None, wrapper=None): - - if data is None: - (resp, result) = resource.get(params=params, headers=headers) - else: - (resp, result) = resource.post( - data=data, params=params, headers=headers) - - if wrapper is None: - def wrapper(row): return row - - if flat is not None: - def wrapper(row): return row[flat] - - for row in result["rows"]: - yield wrapper(row) - - def query(self, name, wrapper=None, flat=None, as_list=False, **kwargs): - """ - Execute a design document view query. - - :param name: name of the view (eg: docidname/viewname). - :param wrapper: wrap result into a specific class. - :param as_list: return a list of results instead of a - default lazy generator. - :param flat: get a specific field from a object instead - of a complete object. - - .. versionadded: 1.4 - Add as_list parameter. - Add flat parameter. - - :returns: generator object - """ - params = copy.copy(kwargs) - path = _path_from_name(name, '_view') - data = None - - if "keys" in params: - data = {"keys": params.pop('keys')} - - if data: - data = force_bytes(json.dumps(data)) - - params = encode_view_options(params) - result = self._query(self.resource(*path), wrapper=wrapper, - flat=flat, params=params, data=data) - - if as_list: - return list(result) - return result - - def changes_feed(self, feed_reader, **kwargs): - """ - Subscribe to changes feed of couchdb database. - - Note: this method is blocking. - - - :param feed_reader: callable or :py:class:`~BaseFeedReader` - instance - - .. versionadded: 1.5 - """ - - object = self - _listen_feed(object, "_changes", feed_reader, **kwargs) - - def changes_list(self, **kwargs): - """ - Obtain a list of changes from couchdb. - - .. versionadded: 1.5 - """ - - (resp, result) = self.resource("_changes").get(params=kwargs) - return result['last_seq'], result['results'] - - def find(self, selector, wrapper=None, **kwargs): - """ - Execute Mango querys using _find. - - :param selector: data to search - :param wrapper: wrap result into a specific class. - - """ - path = '_find' - data = force_bytes(json.dumps(selector)) - - (resp, result) = self.resource.post(path, data=data, params=kwargs) - - if wrapper is None: - def wrapper(doc): return doc - - for doc in result["docs"]: - yield wrapper(doc) - - def index(self, index_doc, **kwargs): - path = '_index' - data = force_bytes(json.dumps(index_doc)) - - (resp, result) = self.resource.post(path, data=data, params=kwargs) - - return result diff --git a/src/couch/exceptions.py b/src/couch/exceptions.py deleted file mode 100644 index d7e037b..0000000 --- a/src/couch/exceptions.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# Based on py-couchdb (https://github.com/histrio/py-couchdb) - - -class Error(Exception): - pass - - -class UnexpectedError(Error): - pass - - -class FeedReaderExited(Error): - pass - - -class ApiError(Error): - pass - - -class GenericError(ApiError): - pass - - -class Conflict(ApiError): - pass - - -class NotFound(ApiError): - pass - - -class BadRequest(ApiError): - pass - - -class AuthenticationFailed(ApiError): - pass diff --git a/src/couch/feedreader.py b/src/couch/feedreader.py deleted file mode 100644 index aac51d3..0000000 --- a/src/couch/feedreader.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding: utf-8 -*- -# Based on py-couchdb (https://github.com/histrio/py-couchdb) -from __future__ import annotations - -class BaseFeedReader: - """ - Base interface class for changes feed reader. - """ - - def __call__(self, db) -> BaseFeedReader: - self.db = db - return self - - def on_message(self, message: str) -> None: - """ - Callback method that is called when change - message is received from couchdb. - - :param message: change object - :returns: None - """ - - raise NotImplementedError() - - def on_close(self) -> None: - """ - Callback method that is received when connection - is closed with a server. By default, does nothing. - """ - pass - - def on_heartbeat(self) -> None: - """ - Callback method invoked when a hearbeat (empty line) is received - from the _changes stream. Override this to purge the reader's internal - buffers (if any) if it waited too long without receiving anything. - """ - pass - - -class SimpleFeedReader(BaseFeedReader): - """ - Simple feed reader that encapsule any callable in - a valid feed reader interface. - """ - - def __call__(self, db, callback) -> BaseFeedReader: - self.callback = callback - return super(SimpleFeedReader, self).__call__(db) - - def on_message(self, message: str) -> None: - self.callback(message, db=self.db) diff --git a/src/couch/resource.py b/src/couch/resource.py deleted file mode 100644 index f110c8d..0000000 --- a/src/couch/resource.py +++ /dev/null @@ -1,139 +0,0 @@ -# -*- coding: utf-8 -*- -# Based on py-couchdb (https://github.com/histrio/py-couchdb) - -from __future__ import annotations -from __future__ import unicode_literals -from typing import Union, Tuple, Dict, Any - -import json -import requests - -from .utils import ( - urljoin, - as_json, - force_bytes, -) -from .exceptions import ( - GenericError, - NotFound, - BadRequest, - Conflict, - AuthenticationFailed, -) - - -class Resource: - def __init__(self, base_url: str, full_commit: bool = True, session: Union[requests.sessions.Session, None] = None, - credentials: Union[Tuple[str, str], None] = None, authmethod: str = "session", verify: bool = False) -> None: - - self.base_url = base_url -# self.verify = verify - - if not session: - self.session = requests.session() - - self.session.headers.update({"accept": "application/json", - "content-type": "application/json"}) - self._authenticate(credentials, authmethod) - - if not full_commit: - self.session.headers.update({'X-Couch-Full-Commit': 'false'}) - else: - self.session = session - self.session.verify = verify - - def _authenticate(self, credentials: Union[Tuple[str, str], None], method: str) -> None: - if not credentials: - return - - if method == "session": - data = {"name": credentials[0], "password": credentials[1]} - - post_url = urljoin(self.base_url, "_session") - r = self.session.post(post_url, data=force_bytes(json.dumps(data))) - if r and r.status_code != 200: - raise AuthenticationFailed() - - elif method == "basic": - self.session.auth = credentials - - else: - raise RuntimeError("Invalid authentication method") - - def __call__(self, *path: str) -> Resource: - base_url = urljoin(self.base_url, *path) - return self.__class__(base_url, session=self.session) - - def _check_result(self, response, result) -> None: - try: - error = result.get('error', None) - reason = result.get('reason', None) - except AttributeError: - error = None - reason = '' - - # This is here because couchdb can return http 201 - # but containing a list of conflict errors - if error == 'conflict' or error == "file_exists": - raise Conflict(reason or "Conflict") - - if response.status_code > 205: - if response.status_code == 404 or error == 'not_found': - raise NotFound(reason or 'Not found') - elif error == 'bad_request': - raise BadRequest(reason or "Bad request") - raise GenericError(result) - - - def request(self, method, path: Union[str, None], params=None, data=None, - headers=None, stream=False, **kwargs) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: - - if headers is None: - headers = {} - - headers.setdefault('Accept', 'application/json') - - if path: - if not isinstance(path, (list, tuple)): - path = [path] - url = urljoin(self.base_url, *path) - else: - url = self.base_url - - response = self.session.request(method, url, stream=stream, - data=data, params=params, - headers=headers, **kwargs) - # Ignore result validation if - # request is with stream mode - - if stream and response.status_code < 400: - result = None - self._check_result(response, result) - else: - result = as_json(response) - - if result is None: - return response, result - - if isinstance(result, list): - for res in result: - self._check_result(response, res) - else: - self._check_result(response, result) - - return response, result - - def get(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: - return self.request("GET", path, **kwargs) - - def put(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: - return self.request("PUT", path, **kwargs) - - def post(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: - return self.request("POST", path, **kwargs) - - def delete(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: - return self.request("DELETE", path, **kwargs) - - def head(self, path: Union[str, None] = None, **kwargs: Any) -> Tuple[requests.models.Response, Union[Dict[str, Any], None]]: - return self.request("HEAD", path, **kwargs) diff --git a/src/couch/utils.py b/src/couch/utils.py deleted file mode 100644 index b3e5aa3..0000000 --- a/src/couch/utils.py +++ /dev/null @@ -1,144 +0,0 @@ -# -*- coding: utf-8 -*- -# Based on py-couchdb (https://github.com/histrio/py-couchdb) - -from typing import Tuple, Union, Dict, List, Any -import json -import sys - -import requests -from urllib.parse import unquote as _unquote -from urllib.parse import urlunsplit, urlsplit - -from functools import reduce - -URLSPLITTER = '/' - - -json_encoder = json.JSONEncoder() - - -def extract_credentials(url: str) -> Tuple[str, Union[Tuple[str, str], None]]: - """ - Extract authentication (user name and password) credentials from the - given URL. - - >>> extract_credentials('http://localhost:5984/_config/') - ('http://localhost:5984/_config/', None) - >>> extract_credentials('http://joe:secret@localhost:5984/_config/') - ('http://localhost:5984/_config/', ('joe', 'secret')) - >>> extract_credentials('http://joe%40example.com:secret@' - ... 'localhost:5984/_config/') - ('http://localhost:5984/_config/', ('joe@example.com', 'secret')) - """ - parts = urlsplit(url) - netloc = parts[1] - if '@' in netloc: - creds, netloc = netloc.split('@') - credentials = tuple(_unquote(i) for i in creds.split(':')) - parts_list = list(parts) - parts_list[1] = netloc - return urlunsplit(parts_list), (credentials[0], credentials[1]) - - parts_list = list(parts) - return urlunsplit(parts_list), None - - -def _join(head: str, tail: str) -> str: - parts = [head.rstrip(URLSPLITTER), tail.lstrip(URLSPLITTER)] - return URLSPLITTER.join(parts) - - -def urljoin(base: str, *path: str) -> str: - """ - Assemble a uri based on a base, any number of path segments, and query - string parameters. - - >>> urljoin('http://example.org', '_all_dbs') - 'http://example.org/_all_dbs' - - A trailing slash on the uri base is handled gracefully: - - >>> urljoin('http://example.org/', '_all_dbs') - 'http://example.org/_all_dbs' - - And multiple positional arguments become path parts: - - >>> urljoin('http://example.org/', 'foo', 'bar') - 'http://example.org/foo/bar' - - >>> urljoin('http://example.org/', 'foo/bar') - 'http://example.org/foo/bar' - - >>> urljoin('http://example.org/', 'foo', '/bar/') - 'http://example.org/foo/bar/' - - >>> urljoin('http://example.com', 'org.couchdb.user:username') - 'http://example.com/org.couchdb.user:username' - """ - return reduce(_join, path, base) - -# Probably bugs here -def as_json(response: requests.models.Response) -> Union[Dict[str, Any], None, str]: - if "application/json" in response.headers['content-type']: - response_src = response.content.decode('utf-8') - print(response.content) - if response.content != b'': - ret: Dict[str, Any] = json.loads(response_src) - return ret - else: - print("fff") - print("fff") - print(type(response_src)) - return response_src - return None - -def _path_from_name(name: str, type: str) -> List[str]: - """ - Expand a 'design/foo' style name to its full path as a list of - segments. - - >>> _path_from_name("_design/test", '_view') - ['_design', 'test'] - >>> _path_from_name("design/test", '_view') - ['_design', 'design', '_view', 'test'] - """ - if name.startswith('_'): - return name.split('/') - design, name = name.split('/', 1) - return ['_design', design, type, name] - - -def encode_view_options(options: Dict[str, Any]) -> Dict[str, Any]: - """ - Encode any items in the options dict that are sent as a JSON string to a - view/list function. - - >>> opts = {'key': 'foo', "notkey":"bar"} - >>> res = encode_view_options(opts) - >>> res["key"], res["notkey"] - ('"foo"', 'bar') - - >>> opts = {'startkey': 'foo', "endkey":"bar"} - >>> res = encode_view_options(opts) - >>> res['startkey'], res['endkey'] - ('"foo"', '"bar"') - """ - retval = {} - - for name, value in options.items(): - if name in ('key', 'startkey', 'endkey'): - value = json_encoder.encode(value) - retval[name] = value - return retval - - -def force_bytes(data: Union[str, bytes], encoding: str = "utf-8") -> bytes: - if isinstance(data, str): - data = data.encode(encoding) - return data - - -def force_text(data: Union[str, bytes], encoding: str = "utf-8") -> str: - if isinstance(data, bytes): - data = data.decode(encoding) - return data diff --git a/src/quickstart_test.sh b/src/quickstart_test.sh deleted file mode 100755 index 9254271..0000000 --- a/src/quickstart_test.sh +++ /dev/null @@ -1,67 +0,0 @@ -# Usage: ./quickstart_test.sh [-v] [-c] [-- <args to pytest>] - -export COUCHDB_NAME=unittest -export COUCHDB_HOSTNAME=localhost -export COUCHDB_USER=test -export COUCHDB_PASSWORD=test - -export DOCKER_JWT_PUBKEY_PATH="`pwd`/test/unittest_cert/" -export JWT_PUBKEY_PATH="`pwd`/test/unittest_cert/public.pem" - -virtualenv=no -couchdb=no - -while getopts ":vc" flag -do - case "$flag" in - v) virtualenv=yes;; - c) couchdb=yes;; - esac -done - -if [ -d test/unittest_cert ]; then - rm -r test/unittest_cert -fi - -if [ $virtualenv == "yes" ]; then - shift - if [ -d test/unittest_venv ]; then - rm -r test/unittest_venv - fi - - virtualenv test/unittest_venv - source test/unittest_venv/bin/activate - pip3 install -r ../requirements.txt -fi - -if [ $couchdb == "yes" ]; then - shift - docker run -it -p 6123:5984 --rm -d --name unittest_couchdb -e COUCHDB_USER=$COUCHDB_USER -e COUCHDB_PASSWORD=$COUCHDB_PASSWORD couchdb - - docker inspect unittest_couchdb > /dev/null - - if (( $? != 0 )); then - echo "Failed to start CouchDB container." - exit - fi - - export COUCHDB_PORT=6123 -fi - -mkdir test/unittest_cert - -cat <<EOF > test/unittest_cert/public.pem ------BEGIN PUBLIC KEY----- -MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEGHX8ipqVWtr49TXyX0f/L4GPhEpg -N0Erzy7hHkXVrkgKpnHSRLYWgbW4rscLoJAJeEv7Be5iH0TM8l09w8Q3wQ== ------END PUBLIC KEY----- -EOF - -shift -pytest --capture=tee-sys "$@" - -rm -r test/unittest_cert - -if [ $couchdb == "yes" ]; then - docker kill unittest_couchdb -fi |