/*
* Copyright (c) 2016, NORDUnet A/S.
* See LICENSE for licensing information.
*/
#define _GNU_SOURCE
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <stdarg.h>
#include <stdint.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <err.h>
#include <sys/file.h>
#include <errno.h>
#include <nettle/sha2.h>
#include "permdb.h"
#include "filebuffer.h"
#include "util.h"
struct buffered_file {
int fd;
const char *name;
uint64_t datasize;
uint64_t lastcommit;
uint64_t filesize;
char *writebuffer;
uint64_t writebufferalloc;
struct sha256_ctx commit_checksum_context;
};
void
bf_add_be64(buffered_file *file, uint64_t value) {
uint64_t value_be = HTONLL(value);
bf_add(file, &value_be, sizeof(uint64_t));
}
void
bf_add_be32(buffered_file *file, uint32_t value) {
uint32_t value_be = htonl(value);
bf_add(file, &value_be, sizeof(uint32_t));
}
void
bf_add_be16(buffered_file *file, uint16_t value) {
uint16_t value_be = htons(value);
bf_add(file, &value_be, sizeof(uint16_t));
}
uint64_t
bf_total_length(buffered_file *file)
{
return file->datasize;
}
uint64_t
bf_lastcommit(buffered_file *file)
{
return file->lastcommit;
}
void
bf_setlastcommit(buffered_file *file, uint64_t lastcommit)
{
file->lastcommit = lastcommit;
}
const char *
bf_name(buffered_file *file)
{
return file->name;
}
static uint64_t
bf_unwritten_length(buffered_file *file)
{
return file->datasize - file->filesize;
}
void
bf_add(buffered_file *file, const void *data, uint64_t length)
{
sha256_update(&file->commit_checksum_context, length, data);
dprintf(WRITE, (stderr, "adding data to %s: ", file->name));
dprinthex(WRITE, data, length);
uint64_t needspace = length + bf_unwritten_length(file);
if (needspace > file->writebufferalloc) {
int ret = bf_flush_nosync(file);
if (ret < 0) {
err(1, "bf_flush_nosync failed");
}
needspace = length + bf_unwritten_length(file);
if (needspace > file->writebufferalloc) {
uint64_t newsize = file->writebufferalloc * 2;
if (needspace > newsize) {
newsize = needspace;
}
file->writebuffer = realloc(file->writebuffer, newsize);
memset(file->writebuffer + file->writebufferalloc, 0,
newsize - file->writebufferalloc);
file->writebufferalloc = newsize;
}
}
memcpy(file->writebuffer + bf_unwritten_length(file), data, length);
file->datasize += length;
}
int
bf_flush_nosync(buffered_file *file)
{
ssize_t ret;
uint64_t length = bf_unwritten_length(file);
ret = write(file->fd, file->writebuffer, length);
if (ret != length) {
return -1;
}
file->filesize += (uint64_t) ret;
return 0;
}
int
bf_flush(buffered_file *file)
{
int ret;
ret = bf_flush_nosync(file);
if (ret) {
return ret;
}
ret = fsync(file->fd);
sha256_init(&file->commit_checksum_context);
dprintf(WRITE, (stderr, "clearing data for %s\n", file->name));
file->lastcommit = bf_total_length(file);
return ret;
}
unsigned char *
bf_read(buffered_file *file, uint64_t offset, size_t length, char **error)
{
unsigned char *result = malloc(length);
dprintf(READ, (stderr, "reading data: offset %llu length %llu\n",
(unsigned long long) offset, (unsigned long long) length));
if (offset >= file->filesize) {
uint64_t writebufferoffset = offset - file->filesize;
if (offset + length > file->datasize) {
free(result);
set_error(error,
"pread: not enough data for offset %llu and "
"length %zu\n",
(long long unsigned int) offset, length);
return NULL;
}
memcpy(result, file->writebuffer + writebufferoffset, length);
} else {
if (offset + length > file->filesize) {
free(result);
set_error(error,
"pread: trying to read over file/writebuffer "
"boundary for offset %llu and length %zu\n",
(long long unsigned int) offset, length);
return NULL;
}
size_t bytes_read = 0;
while (length > 0) {
ssize_t ret = pread(file->fd, result + bytes_read, length, (off_t) offset + bytes_read);
dprintf(READ, (stderr, "pread: offset %llu length %llu ret %zu\n",
(unsigned long long) offset + bytes_read, (unsigned long long) length, ret));
if (ret == 0) {
free(result);
set_error(error,
"eof reading %zu bytes at offset %llu\n",
length, (long long unsigned int) offset);
return NULL;
} else if (ret < 0) {
free(result);
set_error(error,
"error %d reading %zu bytes at offset %llu\n",
errno, length, (long long unsigned int) offset);
return NULL;
}
bytes_read += ret;
length -= ret;
}
}
return result;
}
buffered_file *
bf_open(const char *path, int flags, const char *name, int lock)
{
buffered_file *file = malloc(sizeof(buffered_file));
file->fd = open(path, flags, 0666);
if (file->fd == -1) {
warn("open %s", path);
return NULL;
}
if (lock) {
int ret;
ret = flock(file->fd, LOCK_EX|LOCK_NB);
if (ret == -1) {
warn("flock %s", path);
return NULL;
}
}
file->name = name;
off_t datafile_filesize = lseek(file->fd, 0, SEEK_END);
if (datafile_filesize < 0) {
warn("lseek %s", path);
return NULL;
}
file->filesize = (uint64_t) datafile_filesize;
file->datasize = file->filesize;
file->lastcommit = file->datasize;
file->writebufferalloc = 1024*1024;
file->writebuffer = calloc(file->writebufferalloc, 1);
sha256_init(&file->commit_checksum_context);
return file;
}
void
bf_reload(buffered_file *file)
{
off_t datafile_filesize = lseek(file->fd, 0, SEEK_END);
if (datafile_filesize < 0) {
err(1, "lseek %s", file->name);
}
file->filesize = (uint64_t) datafile_filesize;
file->datasize = file->filesize;
file->lastcommit = file->datasize;
}
void
bf_close(buffered_file *file)
{
bf_flush(file);
flock(file->fd, LOCK_UN);
close(file->fd);
free(file->writebuffer);
free(file);
}
void
bf_truncate(buffered_file *file)
{
file->filesize = 0;
file->datasize = 0;
file->lastcommit = 0;
sha256_init(&file->commit_checksum_context);
ftruncate(file->fd, 0);
}
void
bf_sha256(buffered_file *file, unsigned char *checksum)
{
sha256_digest(&file->commit_checksum_context, SHA256_DIGEST_SIZE,
checksum);
}
/* Local Variables: */
/* c-file-style: "BSD" */
/* End: */