Show fragmenter.c syntax highlighted
// Fragmenter: files dropped off in one directory are fragmented and
// copied into another directory.
char fragmenter_c_cvsid[] =
"$Id: fragmenter.c,v 1.6 2005/06/21 09:47:24 adparker Exp $";
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <stdio.h>
#include <glib.h>
#include <ftw.h>
#include "libmisc/file.h"
#include "libmisc/misc_buf.h"
#include "libmisc/elog.h"
#include "emrun/emrun.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"
extern int errno;
extern int in_sim;
// Later, these are to be read from a configuration file.
buf_t * OUTPUT_DIR = NULL;
typedef struct
{
uint8_t sinkstatus;
status_client_context_t * status_context;
} fragmenter_state_t;
typedef struct
{
char str_buf [MAX_FILE_STRLEN];
} str;
// Sorry. This is the only way I can have a function called by FTW to
// return data.
GArray * fname_array = NULL;
static FILE *
my_fopen (const char *fname, const char *opts)
{
elog (LOG_NOTICE, " ");
FILE *fp = fopen (fname, opts);
if (fp == NULL)
{
elog (LOG_ERR, "Error while trying to open %s: %m", fname);
}
return fp;
}
// Writes a fragment under dir, based on block_number.
static void
write_frag (block_t block, int block_number, int length, const char * dir,
const buf_t * data)
{
elog (LOG_NOTICE, " ");
buf_t * file_base = buf_new ();
buf_t * file_name_buf = buf_new ();
const char * file_name = NULL;
FILE * fout = NULL;
// Setup block info
block.offset = block_number * FRAG_SIZE;
block.length = length;
// Setup output file name is: OUTDIR / block.name / (offset)_(length)_(total)
bufprintf (file_base, "%d_%d_%d", block.offset, block.length,
block.uid.total_length);
bufprintf (file_name_buf, "%s/%s/%s", OUTPUT_DIR->buf, block.uid.name,
file_base->buf);
file_name = file_name_buf->buf;
fout = my_fopen (file_name, "w");
if (fout == NULL)
{
elog (LOG_ERR, "Error opening file for write: %s: %m", file_name);
goto done;
}
// Work
size_t items_written = fwrite (&block, sizeof (block_t), 1, fout);
if (items_written != 1)
{
elog (LOG_ERR, "Error writing block to head of fragment: %s: %m",
file_name);
goto done;
}
items_written = fwrite (&data->buf[block.offset], 1, block.length, fout);
if (items_written != block.length)
{
elog (LOG_ERR, "Error writing data to fragment: %s: %m",
file_name);
goto done;
}
done:
if (fclose (fout) != 0)
{
elog (LOG_ERR, "Error closing FILE handle for %s: %m", file_name);
}
buf_free (file_name_buf);
buf_free (file_base);
return;
}
// create block header
// fill out block header with appropriate data
// open file
// for every FRAG_SIZE amount of data from file
// CALL write_frag (FILE * dest, FILE * src, block, offset, length);
static void
frag_file_to_dir (const char * file, const char * dir)
{
elog (LOG_NOTICE, " ");
// Setup
block_t block;
struct stat mystat;
buf_t * input_buf = buf_new ();
gchar * basename = NULL;
if (stat (file, &mystat) < 0)
{
elog (LOG_ERR, "Unable to stat file: %s: %m", file);
goto done;
}
basename = g_path_get_basename (file);
int basename_length = strlen ((char*)basename) + 1;
if (basename_length > BLOCK_UID_NAME_LEN)
{
elog (LOG_ERR, "Basename of %s has length: %d, which is longer than BLOCK_UID_NAME_LEN: %d.", basename, basename_length, BLOCK_UID_NAME_LEN);
goto done;
}
memcpy (block.uid.name, basename, basename_length);
block.uid.node_id = my_node_id;
block.uid.total_length = mystat.st_size;
int filetobuf_res = file_to_buf (input_buf, file);
if (filetobuf_res < 0)
{
elog (LOG_ERR, "Unable to read file: %s: %m", file);
goto done;
}
if (block.uid.total_length != input_buf->len)
{
elog (LOG_ERR,
"Weird. The input_buf size != size reported by stat for file %s",
file);
goto done;
}
div_t div_result = div (block.uid.total_length, FRAG_SIZE);
int total_blocks = div_result.quot;
// Real work
int block_number;
for (block_number = 0; block_number < total_blocks; ++block_number)
{
write_frag (block, block_number, FRAG_SIZE, dir, input_buf);
}
if (div_result.rem > 0)
{
write_frag (block, block_number, div_result.rem, dir, input_buf);
}
done:
buf_free (input_buf);
g_free (basename);
return;
}
// if the directory LOCAL_OUTPUT_DIR/file_name doesn't exist:
// create it (and parents)
static void
frag_file (const char * file)
{
elog (LOG_NOTICE, " ");
// Setup
gchar * file_base = g_path_get_basename ((const gchar*)file);
buf_t * new_directory_name_buf = buf_new ();
bufprintf (new_directory_name_buf, "%s/%s", OUTPUT_DIR->buf, file_base);
const char * new_directory_name = new_directory_name_buf->buf;
DIR * new_directory = opendir (new_directory_name);
// Work
if ((new_directory == NULL) && (ENOENT == errno))
{
// Directory doesn't exist, so create and populate
if (mkdir_with_parents (new_directory_name) != 0)
{
elog (LOG_ERR, "Unable to create directory with parents: %s: %m",
new_directory_name);
goto done;
}
frag_file_to_dir (file, new_directory_name);
}
// Done
done:
if (new_directory && (closedir (new_directory) != 0))
{
elog (LOG_ERR, "Error closing directory handle for %s: %m",
new_directory_name);
}
buf_free (new_directory_name_buf);
g_free (file_base);
return;
}
// Modifies fname_array
static int
collect_fnames (const char * file, const struct stat * sb, int flag)
{
elog (LOG_NOTICE, " ");
if ((FTW_F != flag) && (FTW_SL != flag))
{
return 0;
}
if (strlen (file) >= MAX_FILE_STRLEN)
{
elog (LOG_CRIT, "Encountered a file whose string length is longer "
"than expected:%s \n Skipping it. I'm lame", file);
return 0;
}
str foo;
strncpy (foo.str_buf, file, MAX_FILE_STRLEN);
g_array_append_val (fname_array, foo);
return 0;
}
// For each file in INCOMING_DIR (or INCOMING_DIR_PATTERN_SIM)
// CALL collect_fnames
// Append the file name (can be files or symlinks) to an array.
// Sort the array of file names.
// CALL frag_file (array[0])
static void
fragmenter_timer_helper (const char * incoming_dir)
{
elog (LOG_NOTICE, " ");
fname_array = g_array_new (0, 1, sizeof (str));
// FTW FTW FTW FTW FTW FTW FTW FTW FTW FTW FTW FTW
if (ftw (incoming_dir, collect_fnames, 4) < 0)
{
elog (LOG_CRIT, "ftw encountered an error on %s: %m", incoming_dir);
}
else if (fname_array->len > 0)
{
frag_file(g_array_index (fname_array, str, 0).str_buf );
}
g_array_free (fname_array, 1);
}
static int
fragmenter_timer_cb (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_NOTICE, " ");
fragmenter_state_t * state = (fragmenter_state_t *) data;
if (state->sinkstatus == 1)
goto final;
buf_t * buf = buf_new ();
if (in_sim)
{
bufprintf (buf, INCOMING_DIR_PATTERN_SIM, my_node_id);
}
else
{
bufprintf (buf, INCOMING_DIR);
}
fragmenter_timer_helper (buf->buf);
buf_free (buf);
final:
return TIMER_RENEW;
}
static void
usage (char *name)
{
misc_print_usage (name, "TODO", "TODO");
exit (1);
}
static fragmenter_state_t
init_state ()
{
fragmenter_state_t state = {
status_context: NULL,
sinkstatus:0
};
return state;
}
// Notice when our sink status changes, and updates state.
static int
status_handler (void *new_buf, size_t size, void *data)
{
elog (LOG_NOTICE,"Received %s, size %d", (char *) new_buf, size);
if (size == 0)
{
elog (LOG_ERR, "Size is 0!");
goto final;
}
if (!new_buf)
{
elog (LOG_ERR, "new_buf is NULL");
goto final;
}
fragmenter_state_t *state = (fragmenter_state_t *) data;
int new_status = atoi ((char *) new_buf);
if ((new_status == 0) || (new_status == 1))
{
if (new_status != state->sinkstatus)
{
elog (LOG_NOTICE, "Changing status to %d", new_status);
state->sinkstatus = new_status;
}
else
{
elog (LOG_NOTICE, "Status is the same.");
}
}
else
{
elog (LOG_ERR, "Unrecognized status");
goto final;
}
final:
if (new_buf)
free (new_buf);
return EVENT_RENEW;
}
// Status client of /dev/block/sinkstatus, to notify me when sink
// status is changed.
static void
init_status (fragmenter_state_t * state)
{
status_client_opts_t opts = {
devname:sim_path ("/dev/block/sinkstatus"),
handler:status_handler,
private_data:state
};
if (g_status_client_full (&opts, &state->status_context) < 0)
{
elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
exit (1);
}
}
int
main (int argc, char * argv[])
{
misc_init (&argc, argv, CVSTAG);
{
emrun_opts_t emrun_opts = {
shutdown: block_generic_shutdown,
data: "fragmenter"
};
emrun_init (&emrun_opts);
}
if (argc > 1)
{
usage (argv [0]);
}
fragmenter_state_t state = init_state();
if (g_timer_add (FRAGMENTER_TIMER, fragmenter_timer_cb, &state, NULL, NULL)
< 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
OUTPUT_DIR = buf_new();
bufprintf (OUTPUT_DIR, BLOCK_DIR_PATTERN, my_node_id);
init_status (&state);
g_main ();
elog (LOG_ALERT, "Event system terminated abnormally.");
buf_free (OUTPUT_DIR);
return 1;
}
See more files for this project here