misc_ringbuff.c from EmStar at Krugle
Show misc_ringbuff.c syntax highlighted
/*
*
* Copyright (c) 2003 The Regents of the University of California. All
* rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Neither the name of the University nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "misc_ringbuff.h"
/* Used only for variable - length samples */
typedef struct rbe {
void *data;
/* size of this sample */
int sample_size;
uint64_t sample_num;
QUEUE_ELEMENT_DECL(_rbe_el, struct rbe);
} rbe_t;
struct rb_ctx {
/* see constructor for def */
int sample_size;
int num_samples;
/* If client wants to run searches on samples stored,
* specifies comparator function */
rb_compare comparator;
uint64_t sample_clock;
uint64_t first_sample;
/* ONLY used for variable-length samples */
QUEUE_DECL(_element_list, rbe_t);
/*** ONLY used for fixed-len samples ***/
/* requested size and current size of data buffer */
int size;
int current_size;
/* holds fixed-length samples and their sample-numbers */
uint64_t *sample_num;
char *data_buf;
};
QUEUE_INST(var_len_sample, _rbe_el, _element_list, rbe_t, rb_ctx_t);
/* Multiple interfaces/configurations for RB:
* - Non-Isochronous/fixed-length samples
* - Non-ISO / variable - len samples
* - ISO / fixed - length samples
*
* Various config parameters are configurable at initialization:
* - Sample size
* - size of RB
* Parameters configurable during run-time:
* - sample-rate
* - preferred-clock
* Architecture of RB:
* Two vectors - one for data-samples, one for time/hdr info
*
* add:
* module/function to convert between samples and time-stamps
* - take care of back-annotation
* - converting bet diff clocks
* - timestamping samples
* - storing history of time-stamps for samples, so rb only deal wiht
* sample numbers.
*/
/* Utility Functions */
static int rb_comparator(void *num1, void *num2)
{
return (*(int *) num1 - *(int *) num2);
}
static int calc_data_index(rb_ctx_t * rb, uint64_t sample_num)
{
int index;
if (!rb)
return -1;
index = ((sample_num % rb->num_samples) * rb->sample_size);
elog(LOG_DEBUG(3), "for sample: %lld, index: %d\n", sample_num, index);
if (index >= rb->size) {
index = 0;
elog(LOG_DEBUG(2), "index changed to: %d\n", index);
}
return (index);
}
/* If return 0, then skip to next iteration of loop
* If return -1, then break out of loop
* Else continue
*/
static int
iterate_loop_range(int num_samples, uint64_t sample_num, void *sample,
rb_rsp_t * rsp, void *req_start, void *req_stop,
rb_compare comparator)
{
/* Advance to next iteration , if start > sample */
if (comparator(req_start, sample) > 0)
return 0;
/* If sample > stop, no more data can be retrieved from rb -
* artifically say we have completed request: */
if (comparator(sample, req_stop) > 0)
return -1;
if ((rsp->hdr.num_samples < num_samples) || (num_samples == 0)) {
/* first element */
if (rsp->hdr.num_samples == 0)
rsp->hdr.first_sample = sample_num;
rsp->hdr.num_samples++;
}
else
return -1;
return 1;
}
/* Can only be called for fixed-length samples because
* variable-length samples don't have a meaningfully
* calcualted rb->num_samples */
static int calc_rb_index(rb_ctx_t * rb)
{
if (!rb)
return -1;
return (rb->sample_clock % rb->num_samples);
}
static int
rb_fixed_sample_get(rb_ctx_t * rb, uint64_t first_sample,
int num_samples, int max_data_len, rb_rsp_t * rsp)
{
/* if num_samples is 0, return them all */
if (num_samples == 0)
num_samples = rb->num_samples;
/* configure the first sample for the response struct.
* if the request starts before the start of the buffer,
* bump up to the start of the buffer. */
if (first_sample < rb->first_sample) {
uint64_t trim = rb->first_sample - first_sample;
if (trim > num_samples) num_samples = 0;
else num_samples -= trim;
rsp->hdr.first_sample = rb->first_sample;
}
else
rsp->hdr.first_sample = first_sample;
/* if last_sample extends past the end of the buffer, then trim it */
uint64_t last_sample = rsp->hdr.first_sample + (uint64_t)num_samples - 1;
if (last_sample > rb_get_last_sampleNum(rb)) {
uint64_t trim = last_sample - rb_get_last_sampleNum(rb);
if (num_samples > trim)
num_samples -= trim;
else
num_samples = 0;
}
/* If max data_len is specified, then get the smaller number:
* the number of samples limited by size, or the number of samples
* specified in the request hdr */
if (max_data_len > 0)
num_samples = i_min(max_data_len / rb->sample_size, num_samples);
/* configure the sample count in response header */
rsp->hdr.num_samples = num_samples;
elog(LOG_DEBUG(2), "hdr spec first sample : %lld, num sample: %d\n",
rsp->hdr.first_sample, rsp->hdr.num_samples);
if (rsp->hdr.num_samples > 0) {
/* 2-segment direct copy */
int data_start = calc_data_index(rb, rsp->hdr.first_sample);
int data_size = rsp->hdr.num_samples * rb->sample_size;
int data_end_nowrap = data_start + data_size;
int buffer_size = rb->num_samples * rb->sample_size;
int data_end = data_end_nowrap % buffer_size;
/* reserve space */
bufcpy(rsp->Dbuf, NULL, data_size);
/* no wrap? */
if (data_end_nowrap <= buffer_size) {
bufcpy(rsp->Dbuf, &rb->data_buf[data_start], data_size);
}
/* wrap case */
else {
bufcpy(rsp->Dbuf, &rb->data_buf[data_start], buffer_size - data_start);
bufcpy(rsp->Dbuf, &rb->data_buf[0], data_end);
}
}
elog(LOG_DEBUG(3), "num_samples: %d first_sample: %lld\n",
rsp->hdr.num_samples, rsp->hdr.first_sample);
elog(LOG_DEBUG(3), "first sample: %s\n",
misc_tv_to_str((struct timeval *) rsp->Dbuf->buf));
return rsp->hdr.num_samples;
}
static int rb_var_sample_get(rb_ctx_t * rb, uint64_t first_sample,
int num_samples, size_t max_data_len, rb_rsp_t * rsp)
{
rbe_t *tmpH;
int i, result, len = 0;
uint64_t last_sample = first_sample + (uint64_t)num_samples - 1;
i = rb->first_sample;
tmpH = var_len_sample_top(rb);
while (i < rb->sample_clock) {
if ((max_data_len != 0)
&& ((len + tmpH->sample_size) > max_data_len))
break;
elog(LOG_DEBUG(2),"first sample: %lld, last_sample: %lld\n",
first_sample, last_sample);
result = iterate_loop_range(num_samples, i, &i, rsp,&first_sample,
&last_sample, rb_comparator);
elog(LOG_DEBUG(2), "iterate ret: %d\n", result);
if (result < 0)
break;
else if (result == 0)
goto next_it;
len += tmpH->sample_size;
/* Copy data and hdr information over */
bufcpy(rsp->Dbuf, tmpH->data, tmpH->sample_size);
if (rsp->sample_size)
bufcpy(rsp->sample_size, &tmpH->sample_size, sizeof(int));
next_it:
tmpH = var_len_sample_next(tmpH);
i++;
}
return rsp->hdr.num_samples;
}
static void var_len_remove_top(rb_ctx_t * rb)
{
rbe_t *tmpH = var_len_sample_top(rb);
if (tmpH) {
rb->current_size -= tmpH->sample_size;
var_len_sample_remove(rb, tmpH);
g_free(tmpH->data);
g_free(tmpH);
tmpH = NULL;
elog(LOG_DEBUG(2), "Aging out sample num: %lld\n", rb->first_sample);
rb->first_sample++;
}
}
static
uint64_t rb_fixed_sample_push(rb_ctx_t * rb, void *sample, int num_samples)
{
int data_index, data_len, data_offset,new_num_samples = num_samples;
void* tmp_sample = NULL;
int num_aged_out, excess_data;
data_len = num_samples * rb->sample_size;
/* If size of data inserted > ring-buff size, then just
* insert the last bytes of data that will fit */
if (data_len > rb->size) {
/* Calculate how many samples can be included them */
new_num_samples = rb->size / rb->sample_size;
/* Calculuate how much data this really is */
data_offset = data_len - new_num_samples * rb->sample_size;
if (sample)
tmp_sample = sample + data_offset;
elog(LOG_WARNING,"Samplelen=%d > rbsize:%d, so aging out: %dB\n",
data_len, rb->size, data_offset);
/* We rely on integer arithmetic to take care of rounding issues
* so that we include whole-numbers of samples */
data_len = new_num_samples * rb->sample_size;
elog(LOG_DEBUG(2),"New data-length: %d\n", data_len);
}
else
tmp_sample = sample;
/* adjust sample-clock to take into account if samples were
* aged out before they were even added - because the data to add
* is greater than the size of the ring-buffer. So its as if
* we inserted all the data starting at the original sample-clock
* but we avoid the step of over-writing data if we know that amt
* of data > size of the ring-buffer */
rb->sample_clock += num_samples - new_num_samples;
rb->first_sample += num_samples - new_num_samples;
elog(LOG_DEBUG(2),"new sample-clk after incl overize smpl: %lld\n",
rb->sample_clock);
/* Age out old data :
* for fixed-len case, data that is overwritten is automatically
* aged out, but have to adjust first_sample */
/* Determine how much data is in excess */
excess_data = (rb->current_size + data_len) - rb->size;
if (excess_data > 0) {
/* Calculate number of samples to age out */
num_aged_out = (excess_data / rb->sample_size);
if (excess_data % rb->sample_size > 0)
num_aged_out++;
rb->current_size -= (num_aged_out * rb->sample_size);
elog(LOG_DEBUG(2), "Aging out %d samples starting at: %lld\n",
num_aged_out, rb->first_sample);
/* Adjust first_sample to number of samples aged out */
rb->first_sample += num_aged_out;
}
/* calculate index to insert new data */
data_index = calc_data_index(rb, rb->sample_clock);
/* Save sample-number */
rb->sample_num[calc_rb_index(rb)] = rb->sample_clock;
if (data_index + data_len <= rb->size) {
if (tmp_sample)
memcpy(&rb->data_buf[data_index], tmp_sample, data_len);
else
memset(&rb->data_buf[data_index], 0, data_len);
}
else {
if (tmp_sample) {
memcpy(&rb->data_buf[data_index], tmp_sample, rb->size - data_index);
memcpy(&rb->data_buf[0],tmp_sample + (rb->size - data_index),
(data_len + data_index) - rb->size);
}
else {
memset(&rb->data_buf[data_index], 0, rb->size - data_index);
memset(&rb->data_buf[0], 0, (data_len + data_index) - rb->size);
}
}
elog(LOG_DEBUG(2), "adding %d samples starting at : %lld\n",
new_num_samples, rb->sample_clock);
elog(LOG_DEBUG(2), "data_index: %d\n", data_index);
rb->current_size += data_len;
/* If new_num_samples != num_samples, then we only want to
* add the incremental diff bet them here. But the return-val
* subtracts num_samples because we want to indicate what the
* first-sample is that the data was technically inserted at */
rb->sample_clock += new_num_samples;
return (rb->sample_clock - num_samples);
}
static
uint64_t rb_var_sample_push(rb_ctx_t * rb, void *sample, size_t data_len)
{
rbe_t *tmpH;
/* Pop off elements corresponding to aged out data. */
while (rb_get_last_sampleNum(rb) - rb->first_sample + 1
>= rb->num_samples) {
elog(LOG_DEBUG(1), "Aging out sample: %lld\n", rb->first_sample);
var_len_remove_top(rb);
}
/* place data in buffer */
tmpH = g_new0(rbe_t, 1);
tmpH->sample_size = data_len;
tmpH->sample_num = rb->sample_clock;
tmpH->data = g_new0(char, data_len);
memcpy(tmpH->data, sample, data_len);
var_len_sample_push(rb, tmpH);
elog(LOG_DEBUG(1), "add new sampl: %d\n", (int) rb->sample_clock);
rb->sample_clock++;
rb->current_size += data_len;
return (rb->sample_clock - 1);
}
void* rb_error_check_for_match(rb_ctx_t* rb)
{
void* data;
if (!rb) {
elog(LOG_ERR,"No rb specified!\n");
return NULL;
}
if (!rb->comparator) {
elog(LOG_ERR,"No comparator function specified!\n");
return NULL;
}
if (rb_variable_len_samples(rb)) {
elog(LOG_ERR,"Can't perform search for variable length samples!\n");
return NULL;
}
else
data = g_new0(char, rb->sample_size);
return data;
}
/*************************************************************
* Interface
*************************************************************/
/* This function will look for an approximate match. So, for
* example, if req_stop > last_sample, it will still return
* the last_sample as a best-effort match. Or if the start
* and stop fall between two samples, it will still return the
* lower-sample as a best-effort match */
int
rb_ret_range_best_eff(rb_ctx_t* rb, rb_rsp_t * rsp, void *rstart,
void *rstop)
{
int i, data_size, start_copying = 0;
int stop_copying = 0;
void *data2;
void *data = rb_error_check_for_match(rb);
if (!data)
return -1;
data2 = g_new0(char, rb->sample_size);
/* Init */
i = rb->first_sample;
rsp->hdr.num_samples = 0;
while (i < rb->sample_clock) {
data_size = rb_get_sample(rb, i, data, rb->sample_size + 1);
if (i + 1 < rb->sample_clock)
rb_get_sample(rb, i + 1, data2, rb->sample_size + 1);
/* This is the last sample. If we are still iterating, then
* just include this last sample. It means that rstop > the
* last sample */
else
goto copy;
if ((rb->comparator(rstart, data) >= 0)
&& (rb->comparator(rstart, data2) < 0))
start_copying = 1;
if ((rb->comparator(rstop, data) >= 0)
&& (rb->comparator(rstop, data2) < 0))
stop_copying = 1;
if (start_copying == 1)
goto copy;
goto next_it;
copy:
/* first element */
if (rsp->hdr.num_samples == 0)
rsp->hdr.first_sample = i;
rsp->hdr.num_samples++;
/* Copy data and hdr information over */
bufcpy(rsp->Dbuf, data, data_size);
next_it:
i++;
if (stop_copying == 1)
break;
}
g_free(data);
return 0;
}
void* rb_return_matching_sample(rb_ctx_t* rb, void* match_value)
{
int i;
void *data = rb_error_check_for_match(rb);
if (!data)
return data;
for (i = rb->first_sample; i < rb->sample_clock; i++) {
rb_get_sample(rb, i, data, rb->sample_size + 1);
/* Found a match */
if (data) {
if (rb->comparator(match_value, data) == 0)
return (data);
}
}
g_free(data);
return 0;
}
int
rb_return_range(rb_ctx_t * rb, rb_rsp_t * rsp, void *rstart, void *rstop)
{
int i, result, data_size;
void *data = rb_error_check_for_match(rb);
if (!data)
return -1;
/* Init */
i = rb->first_sample;
rsp->hdr.num_samples = 0;
while (i < rb->sample_clock) {
data_size = rb_get_sample(rb, i, data, rb->sample_size + 1);
/* Iterate through loop to find samples that match request
* range */
result =
iterate_loop_range(0, i, data, rsp, rstart, rstop, rb->comparator);
/* This request is complete */
if ((result < 0) && (rsp->hdr.num_samples == 0)) {
rsp->hdr.num_samples = -1;
break;
}
else if (result == 0)
goto next_it;
/* Copy data and hdr information over */
bufcpy(rsp->Dbuf, data, data_size);
next_it:
i++;
}
g_free(data);
return 0;
}
int rb_sample_push(rb_ctx_t * rb, void *sample, int num_samples,
size_t data_len, uint64_t* sample_clk)
{
uint64_t retval;
if (!rb) {
elog(LOG_ERR, "RB does not exist!\n");
return -EINVAL;
}
/* We don't know how long each sample is, so can't even push
* one sample onto rb */
if ((num_samples > 1) && (rb->sample_size == 0)) {
elog(LOG_ERR,"ERROR Can't push > 1 sample for var-len samples\n");
return -1;
}
if (rb_variable_len_samples(rb)) {
if (data_len <= 0) {
elog(LOG_ERR, "ERROR, len of sample: %d must be > 0\n",
data_len);
return -EINVAL;
}
retval = rb_var_sample_push(rb, sample, data_len);
}
else
retval = rb_fixed_sample_push(rb,sample,num_samples);
if (sample_clk)
*sample_clk = retval;
return 0;
}
int rb_get_data(rb_ctx_t * rb, uint64_t first_sample, int num_samples,
size_t max_data_len, rb_rsp_t * rsp)
{
if (!rsp) {
elog(LOG_ERR, "ERROR No response struct allocated!\n");
return -1;
}
if (rb_empty(rb)) {
elog(LOG_DEBUG(1), "RB is empty, cannot satisfy request!\n");
return 0;
}
/* Request is too old if the last sample requested has a sample-number
* less than the first sample-number currently in the rb */
if (first_sample + num_samples - 1 < rb_get_first_sampleNum(rb)) {
elog(LOG_DEBUG(2), "request too old\n");
return -1;
}
/* Request is in the future */
if (rb_get_last_sampleNum(rb) < first_sample) {
elog(LOG_DEBUG(1), "Request for sample: %lld is in the future",
first_sample);
elog(LOG_DEBUG(1), "Last sample:%lld\n", rb_get_last_sampleNum(rb));
return 0;
}
if (num_samples <= 0) {
elog(LOG_DEBUG(1), "Number of samples <= 0, request is done\n");
return -1;
}
if (rb_variable_len_samples(rb))
return (rb_var_sample_get(rb, first_sample, num_samples,
max_data_len, rsp));
else
return (rb_fixed_sample_get(rb, first_sample, num_samples,
max_data_len, rsp));
}
/* clears a response, optionally deallocating buffers */
void rb_response_clear(rb_rsp_t *rsp, int dealloc)
{
if (!dealloc) {
memset(&(rsp->hdr), 0, sizeof(rsp->hdr));
if (rsp->sample_size) buf_clear(rsp->sample_size, 0);
if (rsp->Dbuf) buf_clear(rsp->Dbuf, 0);
}
else {
if (rsp->sample_size) buf_free(rsp->sample_size);
if (rsp->Dbuf) buf_free(rsp->Dbuf);
memset(rsp, 0, sizeof(*rsp));
}
}
/**** Constructor/Destructor ****/
int rb_new(rb_opts_t * opts, rb_ctx_t ** rb_ref)
{
rb_ctx_t *rb;
rb = g_new0(rb_ctx_t, 1);
if (rb == NULL) {
elog(LOG_ERR, "ERROR unable to allocate mem for ringBuff\n");
return -1;
}
/* if sample_size = 0, assume variable length */
rb->sample_size = opts->sample_size;
rb->comparator = opts->comparator;
rb->sample_clock = 0;
rb->num_samples = opts->num_samples;
/* Fixed-sample size case */
if (rb->sample_size > 0) {
/* Calculate number of samples - round down */
// rb->num_samples = opts->size / rb->sample_size;
rb->size = rb->num_samples * rb->sample_size;
rb->data_buf = g_new0(char, rb->size);
if (rb->data_buf == NULL) {
elog(LOG_ERR, "ERROR cant to allocate mem for ringBuff data\n");
goto error;
}
rb->sample_num = g_new0(uint64_t, rb->num_samples);
if (rb->sample_num == NULL) {
elog(LOG_ERR, "ERROR unable to allocate mem for ringBuff hdr\n");
goto error;
}
}
/* Variable length samples */
else {
elog(LOG_DEBUG(1), "Sample-Size=0;Assume variable len samples\n");
/* init the Q which holds the samples and hdr information */
var_len_sample_init(rb);
}
if (rb_ref) {
if (*rb_ref) {
elog(LOG_ERR, "Destroying rb-reference before re-using\n");
rb_destroy(*rb_ref);
}
*rb_ref = rb;
}
return (rb->sample_clock);
error:
if (rb->data_buf)
g_free(rb->data_buf);
if (rb->sample_num)
g_free(rb->sample_num);
if (rb)
g_free(rb);
return -1;
}
void rb_destroy(rb_ctx_t * rb)
{
rbe_t *tmpH, *tmpH2 = NULL;
if (!rb)
return;
elog(LOG_DEBUG(1),"rb_destroy called\n");
tmpH = var_len_sample_top(rb);
while(tmpH) {
tmpH2 = var_len_sample_next(tmpH);
var_len_sample_remove(rb, tmpH);
g_free(tmpH->data);
g_free(tmpH);
tmpH = tmpH2;
}
if (rb->data_buf)
g_free(rb->data_buf);
if (rb->sample_num)
g_free(rb->sample_num);
g_free(rb);
}
uint64_t rb_get_last_sampleNum(rb_ctx_t * rb)
{
if (!rb)
return 0;
return (rb->sample_clock - 1);
}
uint64_t rb_get_first_sampleNum(rb_ctx_t * rb)
{
if (!rb)
return 1;
return (rb->first_sample);
}
size_t rb_get_sample(rb_ctx_t * rb, uint64_t sample_num, void *data,
int len)
{
void* data_copied;
int len_copied;
if (!rb)
return -1;
if (rb_empty(rb))
return -1;
if (!data)
return -1;
if (len <= 0)
return -1;
if (rb_variable_len_samples(rb)) {
rbe_t* tmp = var_len_sample_last(rb);
data_copied = tmp->data;
len_copied = tmp->sample_size;
}
else {
data_copied = &rb->data_buf[calc_data_index(rb, sample_num)];
len_copied = rb->sample_size;
}
if (len < len_copied) {
elog(LOG_WARNING,"Sample-size:%d is greater than requested length: %d!"
"Only copying over %d Bytes\n", len_copied, len, len);
len_copied = len;
}
memcpy(data, data_copied, len_copied);
return (len_copied);
}
size_t rb_get_first_sample(rb_ctx_t * rb, void *data, int len)
{
return (rb_get_sample(rb, rb_get_first_sampleNum(rb), data, len));
}
size_t rb_get_last_sample(rb_ctx_t * rb, void *data, int len)
{
return(rb_get_sample(rb, rb_get_last_sampleNum(rb), data, len));
}
int rb_variable_len_samples(rb_ctx_t * rb)
{
if (!rb)
return -1;
return (rb->sample_size == 0);
}
int rb_empty(rb_ctx_t * rb)
{
if (!rb)
return -1;
return(rb->sample_clock == rb->first_sample);
}
int rb_full(rb_ctx_t * rb)
{
if (!rb)
return -1;
return(rb->sample_clock - rb->first_sample >=rb->num_samples);
}
int rb_set_start_sample(rb_ctx_t *rb, uint64_t start_sample)
{
/* Assume if it is != 0, then samples have already been pushed
* onto the ring-buffer. In which case, we can't change the
* sample-number of the first-sample */
if (rb->sample_clock != 0)
return -1;
rb->first_sample = start_sample;
rb->sample_clock = start_sample;
return 0;
}
See more files for this project here