/* * Copyright (c) 2015-2016, NORDUnet A/S. * See LICENSE for licensing information. */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include "erlport.h" #include "permdb.h" #include "filebuffer.h" #include "util.h" #include "uthash.h" #include "utarray.h" #include "pstring.h" #define INDEX_COMMIT_TRAILER_SIZE (sizeof(uint64_t) \ + SHA256_DIGEST_SIZE \ + sizeof(index_commit_cookie)) static const int keylen = 32; struct permdb_object { struct nodecache *nodecache; struct nodecache *dirtynodes; buffered_file *datafile; buffered_file *indexfile; char *error; int write_enabled; }; static const node_object nullnode = {{0, 0, 0, 0}}; static const node_object errornode = {{NODE_ENTRY_ERROR_NODE, NODE_ENTRY_ERROR_NODE, NODE_ENTRY_ERROR_NODE, NODE_ENTRY_ERROR_NODE}}; struct nodecache { node_object value; ps_string *key; UT_hash_handle hh; /* Magic name, used by HASH_SORT. */ }; static node_object get_node_from_cache(permdb_object *state, const ps_string *key) { dprintf(CACHE, (stderr, "getting cache key %*s, ", PS_PRINTF(key))); struct nodecache *node; HASH_FIND(hh, state->nodecache, key->value, key->length, node); if (node == NULL) { dprintf(CACHE, (stderr, "found nothing in cache\n")); return errornode; } dprintf(CACHE, (stderr, "got cache key %*s: ", PS_PRINTF(node->key))); dprinthex(CACHE, &node->value, sizeof(struct node_object)); return node->value; } static node_object get_node_from_dirtynodes(permdb_object *state, const ps_string *key) { dprintf(CACHE, (stderr, "getting key %*s, ", PS_PRINTF(key))); struct nodecache *node; HASH_FIND(hh, state->dirtynodes, key->value, key->length, node); if (node == NULL) { dprintf(CACHE, (stderr, "found nothing\n")); return errornode; } dprintf(CACHE, (stderr, "got key %*s: ", PS_PRINTF(node->key))); dprinthex(CACHE, &node->value, sizeof(struct node_object)); return node->value; } static void put_node_in_cache(permdb_object *state, const ps_string *key, node_object value) { dprintf(CACHE, (stderr, "putting cache key %*s: ", PS_PRINTF(key))); dprinthex(CACHE, &value, sizeof(node_object)); struct nodecache *node; HASH_FIND(hh, state->nodecache, key->value, key->length, node); if (node) { node->value = value; return; } node = malloc(sizeof(struct nodecache)); node->value = value; node->key = ps_strdup(key); HASH_ADD(hh, state->nodecache, key->value[0], node->key->length, node); } static void put_node_in_dirtynodes_keypart(permdb_object *state, const ps_string *key, node_object value) { dprintf(CACHE, (stderr, "putting key %*s: ", PS_PRINTF(key))); dprinthex(CACHE, &value, sizeof(node_object)); struct nodecache *node; HASH_FIND(hh, state->dirtynodes, key->value, key->length, node); if (node) { node->value = value; return; } node = malloc(sizeof(struct nodecache)); node->value = value; node->key = ps_strdup(key); HASH_ADD(hh, state->dirtynodes, key->value[0], node->key->length, node); } void delete_all_nodes_in_cache(permdb_object *state) { struct nodecache *node, *tmp; HASH_ITER(hh, state->nodecache, node, tmp) { HASH_DEL(state->nodecache, node); free(node->key); free(node); } } static void delete_all_dirty_nodes(permdb_object *state) { struct nodecache *node, *tmp; HASH_ITER(hh, state->dirtynodes, node, tmp) { HASH_DEL(state->dirtynodes, node); free(node->key); free(node); } } static const uint64_t index_file_cookie = 0xb7e16b02ba8a6d1b; static const uint64_t index_commit_cookie = 0x2fb1778c74a402e4; static const uint64_t index_node_cookie = 0x2e0f555ad73210d1; static const uint64_t data_file_cookie = 0x99c99d588b1e7983; static const uint64_t data_entry_cookie = 0xe7c1cdc2ba3dc77c; static const uint64_t data_commit_start_cookie = 0x75c2e4b3d5f643a1; static const uint64_t data_commit_end_cookie = 0x2b05eed61b5af550; int committree(permdb_object *state); void indexfile_add_header(buffered_file *file) { bf_add_be64(file, index_file_cookie); bf_flush(file); } void datafile_add_header(buffered_file *file) { dprintf(WRITE, (stderr, "adding header to %s\n", bf_name(file))); bf_add_be64(file, data_file_cookie); bf_add_be32(file, 4096); /* Parameter 'blocksize'. */ bf_add_be32(file, 2); /* Parameter 'q'. */ bf_add_be32(file, 32); /* Parameter 'keylength'. */ bf_flush(file); } void initial_node(permdb_object *state) { put_node_in_dirtynodes_keypart(state, PS_STRING(""), nullnode); } int initial_commit(permdb_object *state) { initial_node(state); return committree(state); } struct commit_info { node_offset start; node_offset length; uint8_t checksum[SHA256_DIGEST_SIZE]; }; int validate_checksum(struct commit_info *commit, buffered_file *file) { dprintf(READ, (stderr, "validate_checksum: read from file: length %llu start %llu\n", (long long unsigned) commit->length, (long long unsigned) commit->start)); char *error = NULL; uint8_t checksum[SHA256_DIGEST_SIZE]; struct sha256_ctx commit_checksum_context; sha256_init(&commit_checksum_context); node_offset offset = commit->start; node_offset bytesleft = commit->length; node_offset chunksize = 1024*1024; while (bytesleft > 0) { node_offset length = MIN(chunksize, bytesleft); unsigned char *checksumdata = bf_read(file, offset, length, &error); if (checksumdata == NULL) { dprintf(READ, (stderr, "validate_checksum: could not read file: %s\n", error)); return -1; } sha256_update(&commit_checksum_context, length, checksumdata); offset += length; bytesleft -= length; free(checksumdata); } sha256_digest(&commit_checksum_context, SHA256_DIGEST_SIZE, checksum); if (memcmp(checksum, commit->checksum, SHA256_DIGEST_SIZE) == 0) { return 0; } return -1; } int verify_index_commit(buffered_file *file, node_offset offset) { dprintf(READ, (stderr, "verifying index file: commit verification\n")); offset -= INDEX_COMMIT_TRAILER_SIZE; unsigned char *data = bf_read(file, offset, INDEX_COMMIT_TRAILER_SIZE, NULL); uint64_t cookie = read_be64(data + sizeof(uint64_t) + SHA256_DIGEST_SIZE); if (cookie != index_commit_cookie) { fprintf(stderr, "verifying index file: incorrect commit cookie\n"); free(data); return -1; } struct commit_info commit; commit.length = read_be64(data); commit.start = offset + sizeof(uint64_t) - commit.length; memcpy(commit.checksum, data + sizeof(uint64_t), SHA256_DIGEST_SIZE); free(data); return validate_checksum(&commit, file); } node_offset search_index_commit(buffered_file *file, node_offset startoffset) { node_offset offset = startoffset; offset -= INDEX_COMMIT_TRAILER_SIZE; while (offset > 0) { unsigned char *data = bf_read(file, offset, INDEX_COMMIT_TRAILER_SIZE, NULL); uint64_t cookie = read_be64(data + sizeof(uint64_t) + SHA256_DIGEST_SIZE); if (cookie == index_commit_cookie) { return offset + INDEX_COMMIT_TRAILER_SIZE; } offset -= 8; } return 0; } int indexfile_verify_file(buffered_file *file) { dprintf(READ, (stderr, "verifying index file\n")); unsigned char *header = bf_read(file, 0, sizeof(index_file_cookie), NULL); if (read_be64(header) != index_file_cookie) { free(header); fprintf(stderr, "verifying index file: incorrect file cookie\n"); return -1; } free(header); node_offset offset = bf_total_length(file); while (offset) { offset = search_index_commit(file, offset); if (offset) { fprintf(stderr, "verifying index file: found potential commit at %lld\n", (unsigned long long) offset); } if (offset && verify_index_commit(file, offset) == 0) { bf_setlastcommit(file, offset); break; } } fprintf(stderr, "verifying index file: found commit at %lld\n", (unsigned long long) bf_lastcommit(file)); if (offset == 0) { fprintf(stderr, "verifying index file: commit verification failed\n"); return -1; } return 0; } struct commit_info * read_data_commit_backward(buffered_file *file, node_offset *offset); static int datafile_verify_file(buffered_file *file) { unsigned char *header = bf_read(file, 0, sizeof(data_file_cookie), NULL); if (header == NULL || data_file_cookie != read_be64(header)) { fprintf(stderr, "not a data file\n"); free(header); return -1; } free(header); node_offset offset = bf_lastcommit(file); dprintf(READ, (stderr, "verifying commit: %llu\n", (long long unsigned) offset)); struct commit_info *data_commit = read_data_commit_backward(file, &offset); if (data_commit == NULL) { dprintf(READ, (stderr, "did not find a commit\n")); } else { dprintf(READ, (stderr, "last commit: start %llu length %llu\n", (long long unsigned) data_commit->start, (long long unsigned) data_commit->length)); } if (data_commit == NULL || validate_checksum(data_commit, file) < 0) { fprintf(stderr, "commit broken: %llu\n", (long long unsigned) offset); free(data_commit); return -1; } free(data_commit); return 0; } static unsigned char * readdatakeyandlen(permdb_object *state, node_offset offset, size_t *datalen); struct commit_info * read_data_commit(buffered_file *file, node_offset *offset) { const size_t length = sizeof(uint64_t) + SHA256_DIGEST_SIZE + sizeof(data_commit_end_cookie); unsigned char *data = bf_read(file, *offset, length, NULL); if (data == NULL || (read_be64(data + sizeof(uint64_t) + SHA256_DIGEST_SIZE) != data_commit_end_cookie)) { free(data); return NULL; } *offset += sizeof(uint64_t); struct commit_info *commit = malloc(sizeof(struct commit_info)); dprintf(READ, (stderr, "read commit: %llu\n", (long long unsigned) *offset)); dprinthex(READ, data, sizeof(uint64_t) + SHA256_DIGEST_SIZE); commit->length = read_be64(data); commit->start = *offset - commit->length; memcpy(&commit->checksum, data + sizeof(uint64_t), SHA256_DIGEST_SIZE); *offset += SHA256_DIGEST_SIZE + sizeof(data_commit_end_cookie); free(data); return commit; } struct commit_info * read_data_commit_forward(buffered_file *file, node_offset *offset) { uint64_t padding = calc_padding(*offset, 4); *offset += sizeof(data_commit_start_cookie) + padding; return read_data_commit(file, offset); } struct commit_info * read_data_commit_backward(buffered_file *file, node_offset *offset) { *offset -= sizeof(uint64_t) + SHA256_DIGEST_SIZE + sizeof(data_commit_end_cookie); return read_data_commit(file, offset); } int rebuild_index_file(permdb_object *state) { bf_truncate(state->indexfile); indexfile_add_header(state->indexfile); initial_node(state); node_offset offset = sizeof(data_file_cookie) + sizeof(uint32_t) * 3; while (1) { unsigned char *cookie = bf_read(state->datafile, offset, sizeof(data_entry_cookie), NULL); if (cookie == NULL) { break; } if (data_entry_cookie == read_be64(cookie)) { size_t datalen; unsigned char *datakey = readdatakeyandlen(state, offset, &datalen); dprintf(REBUILD, (stderr, "entry %llu: %zu\n", (long long unsigned) offset, datalen)); int result = addvalue(state, datakey, keylen, NULL, 0, offset); free(datakey); if (result < 0) { fprintf(stderr, "error updating index tree for " "entry at %llu\n", (long long unsigned) offset); free(cookie); return -1; } if (result == 0) { fprintf(stderr, "duplicate key at %llu", (long long unsigned) offset); free(cookie); return -1; } offset += sizeof(data_entry_cookie) + keylen + sizeof(uint32_t) + datalen; } else if (data_commit_start_cookie == read_be64(cookie)) { struct commit_info *data_commit = read_data_commit_forward(state->datafile, &offset); dprintf(REBUILD, (stderr, "verifying commit: %llu %p\n", (long long unsigned) offset, data_commit)); if (data_commit == NULL || validate_checksum(data_commit, state->datafile) < 0) { free(data_commit); fprintf(stderr, "commit broken: %llu\n", (unsigned long long) offset); free(cookie); return -1; } free(data_commit); } else { fprintf(stderr, "not a cookie at %llu:\n", (long long unsigned) offset); print_hex(cookie, sizeof(data_entry_cookie)); free(cookie); return -1; } free(cookie); } fprintf(stderr, "index file rebuilt\n"); return committree(state); } unsigned int try_reload_database(permdb_object *state) { if (state->write_enabled) { errx(1, "try_reload_database called on write enabled database"); } uint64_t oldindexsize = bf_total_length(state->indexfile); fprintf(stderr, "reloading database: datafile %llu indexfile %llu\n", (unsigned long long) bf_total_length(state->datafile), (unsigned long long) bf_total_length(state->indexfile)); bf_reload(state->datafile); bf_reload(state->indexfile); fprintf(stderr, "reloaded database: datafile %llu indexfile %llu\n", (unsigned long long) bf_total_length(state->datafile), (unsigned long long) bf_total_length(state->indexfile)); if (bf_total_length(state->indexfile) == oldindexsize) { return 0; } else { #if 0 if (datafile_verify_file(state->datafile) < 0) { warnx("data file verification failed"); } #endif if (indexfile_verify_file(state->indexfile) < 0) { warnx("cannot rebuild in readonly mode"); } delete_all_nodes_in_cache(state); return 1; } } permdb_object * permdb_alloc(const char *dbpath, int lock) { char *idxpath = NULL; if (asprintf(&idxpath, "%s.idx", dbpath) == -1) { return NULL; } permdb_object *state = malloc(sizeof(permdb_object)); state->nodecache = NULL; state->dirtynodes = NULL; state->error = NULL; state->datafile = NULL; state->indexfile = NULL; state->write_enabled = lock; int mode = O_RDONLY; if (state->write_enabled) { mode = O_RDWR|O_CREAT; } state->datafile = bf_open(dbpath, mode, "datafile", state->write_enabled); if (state->datafile == NULL) { permdb_free(state); free(idxpath); return NULL; } state->indexfile = bf_open(idxpath, mode, "indexfile", state->write_enabled); if (state->indexfile == NULL) { permdb_free(state); free(idxpath); return NULL; } free(idxpath); if (bf_total_length(state->datafile) == 0 && bf_total_length(state->indexfile) == 0) { if (!state->write_enabled) { warnx("data and index files are empty: %s", dbpath); permdb_free(state); return NULL; } dprintf(WRITE, (stderr, "writing header\n")); indexfile_add_header(state->indexfile); datafile_add_header(state->datafile); initial_commit(state); } else if (bf_total_length(state->datafile) > 0 && bf_total_length(state->indexfile) == 0) { if (!state->write_enabled) { warnx("cannot rebuild in readonly mode: %s", dbpath); permdb_free(state); return NULL; } if (rebuild_index_file(state) < 0) { warnx("index file rebuilding failed: %s", dbpath); permdb_free(state); return NULL; } } if (datafile_verify_file(state->datafile) < 0) { warnx("data file verification failed: %s", dbpath); if (state->write_enabled) { permdb_free(state); return NULL; } } if (indexfile_verify_file(state->indexfile) < 0) { if (!state->write_enabled) { warnx("cannot rebuild in readonly mode: %s", dbpath); permdb_free(state); return NULL; } warnx("index file verification failed, rebuilding: %s", dbpath); if (rebuild_index_file(state) < 0) { warnx("index file rebuilding failed: %s", dbpath); permdb_free(state); return NULL; } } return state; } void permdb_free(permdb_object *state) { if (state->datafile != NULL) { bf_close(state->datafile); } if (state->indexfile != NULL) { bf_close(state->indexfile); } delete_all_nodes_in_cache(state); delete_all_dirty_nodes(state); free(state); } /* * Returns the two bits of KEY that are used for LEVEL, for a q=2 * tree. */ static unsigned char keybits(const unsigned char *key, unsigned int level) { /* Finding the proper octet in key. */ unsigned char b = key[level/4]; /* "Micro-level", for finding the proper two bits, below. */ unsigned int mlev = level % 4; /* Shift right and mask off the two lowest bits. */ return (b >> (6 - 2*mlev)) & 0x3; } /* * Extracts the parts of KEY used for traversing a tree down to LEVEL * and write an ASCII representation to S. */ static void keypart(const unsigned char *key, unsigned int level, ps_string *s) { unsigned int i; for (i = 0; i < level; i++) { unsigned char b = keybits(key, i); s->value[i] = b + (unsigned char) '0'; } s->length = level; } static void addentry(node_object *node, unsigned int n, node_entry entry) { assert(n < ENTRIESPERNODE); assert(node->data[n] == 0); node->data[n] = entry; } static void replaceentry(node_object *node, unsigned int n, node_entry entry) { assert(n < ENTRIESPERNODE); assert(node->data[n] != 0); node->data[n] = entry; } node_entry get_entry_in_node(node_object node, unsigned char n) { return node.data[n]; } static node_object unpacknode(permdb_object *state, const unsigned char *data, size_t datalen) { if (index_node_cookie != read_be64(data)) { print_hex(data, sizeof(index_node_cookie)); print_hex(&index_node_cookie, sizeof(index_node_cookie)); set_error(&state->error, "incorrect magic (node) %02x%02x\n", (unsigned char)data[0], (unsigned char)data[1]); return errornode; } if (datalen != sizeof(node_object) + sizeof(index_node_cookie)) { return errornode; } data += 8; /* Advance past cookie. */ node_object node; for (int i = 0; i < ENTRIESPERNODE; i++) { node.data[i] = read_be64(data); data += 8; } return node; } unsigned char * read_internal_data(permdb_object *state, node_offset offset, size_t length) { buffered_file *file = state->datafile; dprintf(READ, (stderr, "reading data: offset %llu length %llu\n", (unsigned long long) offset, (unsigned long long) length)); return bf_read(file, offset, length, &state->error); } static int iserrornode(node_object node) { return node.data[0] == NODE_ENTRY_ERROR_NODE && node.data[1] == NODE_ENTRY_ERROR_NODE && node.data[2] == NODE_ENTRY_ERROR_NODE && node.data[3] == NODE_ENTRY_ERROR_NODE; } /* * Returns node at OFFSET in the index file. * * If CACHEKEY is not NULL, looks for CACHEKEY in the dirtynodes list * and the cache before reading the index file. * * If CACHEKEY is not NULL, the node is put in the cache under that * key. */ static node_object readnode(permdb_object *state, node_offset offset, const ps_string *cachekey) { if (cachekey) { dprintf(READ, (stderr, "reading node: offset %llu cachekey '%*s'\n", (unsigned long long) offset, PS_PRINTF(cachekey))); node_object dirtynode = get_node_from_dirtynodes(state, cachekey); if (!iserrornode(dirtynode)) { dprintf(READ, (stderr, "reading node: found node in dirty " "nodes\n")); return dirtynode; } if (offset == NODE_ENTRY_DIRTY_NODE) { const char *errfmt = "referring to dirty node at key %*s, but node " "not in dirtynodes\n"; set_error(&state->error, errfmt, PS_PRINTF(cachekey)); dprintf(READ, (stderr, errfmt, PS_PRINTF(cachekey))); return errornode; } node_object cachednode = get_node_from_cache(state, cachekey); if (!iserrornode(cachednode)) { dprintf(READ, (stderr, "reading node: found node in cache\n")); return cachednode; } } else { dprintf(READ, (stderr, "reading node: offset %llu no cachekey\n", (unsigned long long) offset)); } size_t length = sizeof(index_node_cookie) + sizeof(node_object); unsigned char *buffer = bf_read(state->indexfile, offset, length, &state->error); if (buffer == NULL) { return errornode; } node_object result = unpacknode(state, buffer, length); free(buffer); if (cachekey) { put_node_in_cache(state, cachekey, result); } dprintf(READ, (stderr, "reading node: %s\n", iserrornode(result) ? "failure" : "success")); return result; } static int isdata(node_entry entry) { return (entry & NODE_ENTRY_ISDATA) == NODE_ENTRY_ISDATA; } static node_offset entryoffset(node_entry entry) { return entry & NODE_ENTRY_OFFSET_MASK; } UT_icd node_object_icd = {sizeof(node_object), NULL, NULL, NULL }; static node_entry getpath_(permdb_object *state, const unsigned char *key, UT_array *nodes_out) { node_offset rootoffset = bf_lastcommit(state->indexfile) - (sizeof(index_node_cookie) + sizeof(node_object) + INDEX_COMMIT_TRAILER_SIZE); node_object node = readnode(state, rootoffset, PS_STRING("")); if (iserrornode(node)) { fprintf(stderr, "getpath_: cannot find root node at offset %llu " "(lastcommit %llu)\n", (long long unsigned int) rootoffset, (long long unsigned int) bf_lastcommit(state->indexfile)); return NODE_ENTRY_ERROR_NODE; } dprintf(READ, (stderr, "getpath_: got root node\n")); unsigned int level = 0; while (1) { if (nodes_out) { utarray_push_back(nodes_out, &node); } unsigned char kb = keybits(key, level); node_entry entry = get_entry_in_node(node, kb); if (entry == 0 || isdata(entry)) { dprintf(READ, (stderr, "getpath_: return node\n")); return entry; } level++; ps_string kp; keypart(key, level, &kp); node = readnode(state, entryoffset(entry), &kp); if (iserrornode(node)) { dprintf(READ, (stderr, "getpath_: not found\n")); return NODE_ENTRY_ERROR_NODE; } } } /* * Writes the nodes in the path from the root to the node with key KEY * to NODES. * * Returns the entry in the last node, i.e. the node with key KEY, or * NODE_ENTRY_ERROR_NODE if not found. */ static node_entry getpath(permdb_object *state, const unsigned char *key, UT_array *nodes) { return getpath_(state, key, nodes); } /* * Returns the entry in the node with key KEY, or * NODE_ENTRY_ERROR_NODE if not found. */ static node_entry getlastnode(permdb_object *state, const unsigned char *key) { return getpath_(state, key, NULL); } static node_offset writenode(permdb_object *state, node_object node, ps_string *cachekey) { node_offset offset = bf_total_length(state->indexfile); put_node_in_cache(state, cachekey, node); dprintf(WRITE, (stderr, "writing node: offset %llu\n", (unsigned long long) offset)); bf_add_be64(state->indexfile, index_node_cookie); for (int i = 0; i < ENTRIESPERNODE; i++) { bf_add_be64(state->indexfile, node.data[i]); } return offset; } node_offset datasize(permdb_object *state) { return bf_total_length(state->indexfile); } static node_entry buildentry(int isdata, node_offset offset) { if (isdata) { return NODE_ENTRY_ISDATA | offset; } else { return offset; } } static void * memsub(void *src, size_t offset, size_t length) { void *result = malloc(length); memcpy(result, ((char *)src) + offset, length); return result; } static unsigned char * readdatakey(permdb_object *state, node_offset offset) { unsigned char *data = read_internal_data(state, offset, sizeof(data_entry_cookie) + keylen); if (data == NULL) { return NULL; } if (data_entry_cookie != read_be64(data)) { free(data); set_error(&state->error, "incorrect magic (entry) %02x%02x\n", (unsigned char)data[0], (unsigned char)data[1]); return NULL; } unsigned char *result = memsub(data, sizeof(data_entry_cookie), keylen); free(data); return result; } static unsigned char * readdatakeyandlen(permdb_object *state, node_offset offset, size_t *datalen) { unsigned char *data = read_internal_data(state, offset, sizeof(data_entry_cookie) + keylen + 4); if (data == NULL) { return NULL; } if (data_entry_cookie != read_be64(data)) { free(data); set_error(&state->error, "incorrect magic (entry) %02x%02x\n", (unsigned char)data[0], (unsigned char)data[1]); return NULL; } unsigned char *result = memsub(data, sizeof(data_entry_cookie), keylen); uint16_t nchunks = read_be16(data+sizeof(data_entry_cookie)+keylen); *datalen = read_be16(data + sizeof(data_entry_cookie) + keylen+sizeof(uint16_t)); if (nchunks != 1) { errx(1, "number of chunks is %d, but only one chunk is supported " "right now", nchunks); } free(data); return result; } static unsigned char * readdata(permdb_object *state, node_offset offset, size_t datalen) { const node_offset internal_offset = offset + sizeof(data_entry_cookie) + keylen + 4; return read_internal_data(state, internal_offset, datalen); } static node_offset writedata(permdb_object *state, const unsigned char *key, const unsigned char *data, size_t datalength) { if (datalength > 65535) { errx(1, "data length is %zu, but only < 64K lengths are " "supported right now", datalength); } node_offset offset = bf_total_length(state->datafile); dprintf(WRITE, (stderr, "writing data: offset %llu\n", (unsigned long long) offset)); bf_add_be64(state->datafile, data_entry_cookie); bf_add(state->datafile, key, keylen); bf_add_be16(state->datafile, 1); bf_add_be16(state->datafile, datalength); bf_add(state->datafile, data, datalength); return offset; } static void put_node_in_dirtynodes(permdb_object *state, const unsigned char *key, unsigned int level, node_object leafnode) { ps_string cachekey; keypart(key, level, &cachekey); put_node_in_dirtynodes_keypart(state, &cachekey, leafnode); } /* * Adds a key-value pair KEY + DATA by * 1) updating the data file buffer with KEY and DATA, * 2) if necessary, adding a new node to the index tree, * 3) marking the parent nodes as dirty and * 4) adding new and changed nodes to the dirtynodes list. * * If DATA is NULL, the data file buffer is not updated (step 1 above) * and DATAOFFSET is used instead. * * If a duplicate key is detected, nothing is written and 0 is * returned regardless of the data. * * Returns 0 if they key already exists, * 1 if the data was written, * -1 on error. */ int addvalue(permdb_object *state, const unsigned char *key, unsigned int keylength, const unsigned char *data, size_t datalength, node_offset dataoffset) { UT_array *nodes; utarray_new(nodes, &node_object_icd); node_entry lastentry = getpath(state, key, nodes); if (!state->write_enabled) { return -1; } if (lastentry == NODE_ENTRY_ERROR_NODE) { utarray_free(nodes); return -1; } unsigned int foundlevel = utarray_len(nodes) - 1; node_object lastnode = *(node_object *) utarray_back(nodes); if (lastentry == 0) { if (data != NULL) { dataoffset = writedata(state, key, data, datalength); } addentry(&lastnode, keybits(key, foundlevel), buildentry(1, dataoffset)); } else { node_offset olddataoffset = entryoffset(lastentry); unsigned char *olddatakey = readdatakey(state, olddataoffset); if (olddatakey == NULL) { utarray_free(nodes); return -1; } if (memcmp(olddatakey, key, keylen) == 0) { utarray_free(nodes); free(olddatakey); return 0; } if (data != NULL) { dataoffset = writedata(state, key, data, datalength); } unsigned int level = foundlevel + 1; while (keybits(key, level) == keybits(olddatakey, level)) { level++; } node_object leafnode = nullnode; addentry(&leafnode, keybits(key, level), buildentry(1, dataoffset)); addentry(&leafnode, keybits(olddatakey, level), buildentry(1, olddataoffset)); free(olddatakey); put_node_in_dirtynodes(state, key, level, leafnode); level--; while (level > foundlevel) { node_object node = nullnode; addentry(&node, keybits(key, level), NODE_ENTRY_DIRTY_NODE); put_node_in_dirtynodes(state, key, level, node); level--; } replaceentry(&lastnode, keybits(key, foundlevel), NODE_ENTRY_DIRTY_NODE); } int level = (int) foundlevel; put_node_in_dirtynodes(state, key, (unsigned int) level, lastnode); level--; while (level >= 0) { node_object node = *(node_object *)utarray_eltptr(nodes, level); replaceentry(&node, keybits(key, (unsigned int) level), NODE_ENTRY_DIRTY_NODE); put_node_in_dirtynodes(state, key, (unsigned int) level, node); level--; } utarray_free(nodes); return 1; } unsigned char * getvalue_try(permdb_object *state, const unsigned char *key, size_t keylength, size_t *datalen) { node_entry entry = getlastnode(state, key); if (entry == 0) { dprintf(READ, (stderr, "getvalue: no node\n")); return NULL; } dprintf(READ, (stderr, "getvalue: got node\n")); node_offset olddataoffset = entryoffset(entry); unsigned char *datakey = readdatakeyandlen(state, olddataoffset, datalen); if (datakey == NULL) { return NULL; } if (memcmp(datakey, key, keylength) != 0) { free(datakey); return NULL; } free(datakey); return readdata(state, olddataoffset, *datalen); } unsigned char * getvalue(permdb_object *state, const unsigned char *key, size_t keylength, size_t *datalen) { unsigned char *result = getvalue_try(state, key, keylength, datalen); if (result == NULL && !state->write_enabled) { int reloaded = try_reload_database(state); if (reloaded) { return getvalue_try(state, key, keylength, datalen); } } return result; } unsigned int keyexists_try(permdb_object *state, const unsigned char *key, size_t keylength) { node_entry entry = getlastnode(state, key); if (entry == 0) { dprintf(READ, (stderr, "keyexists: no node\n")); return 0; } dprintf(READ, (stderr, "keyexists: got node\n")); node_offset olddataoffset = entryoffset(entry); unsigned char *datakey = readdatakey(state, olddataoffset); if (datakey == NULL) { return 0; } if (memcmp(datakey, key, keylength) != 0) { free(datakey); return 0; } free(datakey); return 1; } unsigned int keyexists(permdb_object *state, const unsigned char *key, size_t keylength) { unsigned int result = keyexists_try(state, key, keylength); if (result == 0 && !state->write_enabled) { int reloaded = try_reload_database(state); if (reloaded) { return keyexists_try(state, key, keylength); } } return result; } static int string_length_comparison(struct nodecache *a, struct nodecache *b) { size_t a_len = a->key->length; size_t b_len = b->key->length; if (b_len > a_len) { return 1; } else if (b_len == a_len) { return 0; } else { return -1; } } /* * Commits outstanding data to disk by * 1. sorting the dirtynodes list on decreasing key length * 2. verifying that the last node is the root node (key "") * 3. iterating over all nodes from last node to root, writing each * node to the index file while updating the parent node to reflect * the new child offset * 4. writing commit trailer to the data file * 5. writing commit trailer to the index file * * Returns 0 on success, -1 on failure. */ int committree(permdb_object *state) { if (!state->write_enabled) { return -1; } if (state->dirtynodes == NULL) { return 0; } HASH_SORT(state->dirtynodes, string_length_comparison); struct nodecache *lastobject = ELMT_FROM_HH(state->dirtynodes->hh.tbl, state->dirtynodes->hh.tbl->tail); if (lastobject->key->length != 0) { fprintf(stderr, "sorted list doesn't end with root node\n"); return -1; } dprintf(WRITE, (stderr, "committing %d dirty nodes at offset %llu\n", HASH_COUNT(state->dirtynodes), (unsigned long long) bf_total_length(state->indexfile))); struct nodecache *node, *tmp; HASH_ITER(hh, state->dirtynodes, node, tmp) { assert(get_entry_in_node(node->value, 0)!=NODE_ENTRY_DIRTY_NODE); assert(get_entry_in_node(node->value, 1)!=NODE_ENTRY_DIRTY_NODE); assert(get_entry_in_node(node->value, 2)!=NODE_ENTRY_DIRTY_NODE); assert(get_entry_in_node(node->value, 3)!=NODE_ENTRY_DIRTY_NODE); node_offset offset = writenode(state, node->value, node->key); size_t keylength = node->key->length; if (keylength != 0) { /* Since q is 2, parent is easily found by * stripping last octet from ASCII * representation of key (4 bits). */ ps_string *parent = ps_resize(node->key, node->key->length - 1); unsigned int entrynumber = (unsigned int) (node->key->value[keylength - 1] - '0'); node_object parentnode = get_node_from_dirtynodes(state, parent); replaceentry(&parentnode, entrynumber, offset); put_node_in_dirtynodes_keypart(state, parent, parentnode); free(parent); } } dprintf(WRITE, (stderr, "writing data commit trailer at offset %llu\n", (unsigned long long) bf_total_length(state->datafile))); uint64_t data_commit_padding_size = calc_padding(bf_total_length(state->datafile), 4); uint8_t padding[4] = {0, 0, 0, 0}; unsigned char data_commit_checksum[SHA256_DIGEST_SIZE]; bf_add_be64(state->datafile, data_commit_start_cookie); bf_add(state->datafile, padding, data_commit_padding_size); bf_add_be64(state->datafile, bf_total_length(state->datafile) - bf_lastcommit(state->datafile) + sizeof(uint64_t)); /* Including the length field. */ bf_sha256(state->datafile, data_commit_checksum); bf_add(state->datafile, data_commit_checksum, SHA256_DIGEST_SIZE); bf_add_be64(state->datafile, data_commit_end_cookie); dprintf(WRITE, (stderr, "finished writing data commit trailer at offset %llu\n", (unsigned long long) bf_total_length(state->datafile))); if (bf_flush(state->datafile) == -1) { set_error(&state->error, "data file flushing failed\n"); return -1; } dprintf(WRITE, (stderr, "writing index commit trailer at offset %llu\n", (unsigned long long) bf_total_length(state->indexfile))); uint64_t index_commit_length = bf_total_length(state->indexfile) - bf_lastcommit(state->indexfile) + sizeof(uint64_t); /* Including the length field. */ unsigned char index_commit_checksum[SHA256_DIGEST_SIZE]; bf_add_be64(state->indexfile, index_commit_length); bf_sha256(state->indexfile, index_commit_checksum); bf_add(state->indexfile, index_commit_checksum, SHA256_DIGEST_SIZE); bf_add_be64(state->indexfile, index_commit_cookie); dprintf(WRITE, (stderr, "finished writing index commit trailer at offset %llu\n", (unsigned long long) bf_total_length(state->indexfile))); if (bf_flush(state->indexfile) == -1) { set_error(&state->error, "index file flushing failed\n"); return -1; } delete_all_dirty_nodes(state); return 0; } void portloop(permdb_object *state) { unsigned char buf[65536]; ssize_t len; while ((len = read_command(buf, sizeof(buf)-1, 4)) > 0) { if (buf[0] == 0) { dprintf(PORT, (stderr, "get\n")); if (len != keylen+1) { write_reply(NULL, 0, 4); fprintf(stderr, "invalid getvalue command, " "length was %zd\n", len); continue; } size_t datalen; unsigned char *result = getvalue(state, (buf+1), keylen, &datalen); if (result == NULL) { write_reply(NULL, 0, 4); } else { write_reply(result, datalen, 4); } dprintf(PORT, (stderr, "get reply\n")); free(result); } else if (buf[0] == 1) { dprintf(PORT, (stderr, "add\n")); if (len < (keylen + 1)) { write_reply(NULL, 0, 4); fprintf(stderr, "invalid addvalue command, " "length was %zd\n", len); continue; } size_t datalen = (size_t) (len - keylen - 1); unsigned char *key = buf + 1; unsigned char *data = key + keylen; int result = addvalue(state, key, keylen, data, datalen, 0); if (result < 0) { write_reply(NULL, 0, 4); } else { unsigned char result_byte = (unsigned char) result; write_reply(&result_byte, 1, 4); } dprintf(PORT, (stderr, "add reply\n")); } else if (buf[0] == 2) { dprintf(PORT, (stderr, "commit\n")); if (len != 1) { write_reply(NULL, 0, 4); fprintf(stderr, "invalid commit command, " "length was %zd\n", len); continue; } int result = committree(state); if (result < 0) { write_reply(NULL, 0, 4); continue; } else { unsigned char result_byte = (unsigned char) result; write_reply(&result_byte, 1, 4); } dprintf(PORT, (stderr, "commit reply\n")); } else if (buf[0] == 3) { dprintf(PORT, (stderr, "keyexists\n")); if (len != keylen+1) { write_reply(NULL, 0, 4); continue; } assert(len == keylen+1); unsigned char result_byte = keyexists(state, (buf+1), keylen); write_reply(&result_byte, 1, 4); dprintf(PORT, (stderr, "keyexists reply\n")); } else { dprintf(PORT, (stderr, "unknown command\n")); write_reply(NULL, 0, 4); } } if (len == -1) { fprintf(stderr, "read command failed\n"); } permdb_free(state); } /* Local Variables: */ /* c-file-style: "BSD" */ /* End: */