Show reassembler.c syntax highlighted
/* Reassembler: Reassemble files dropped off in ACKED_DIR_PATTERN
Periodically check ACKED_DIR for directories using scandir.
fopen in ACKED_DIR/NAME.tmp
Loop through files ACKED_DIR/NAME using ftw.
write to ACKED_DIR/NAME.tmp using fseek / fwrite
close when finished.
rename file to ASSEMBED_DIR/NAME
*/
#define _GNU_SOURCE
#include <ftw.h>
#include <stdlib.h>
#include <glib.h>
#include <stdint.h>
#include <errno.h>
#include <dirent.h>
#include "libmisc/file.h"
#include "libmisc/misc_buf.h"
#include "emrun/emrun.h"
#include "libmisc/misc_sim.h"
#include "libdev/status_client.h"
#include "devel/block_tree/block.h"
extern int errno;
extern int in_sim;
typedef struct
{
uint8_t sinkstatus;
status_client_context_t * status_context;
char * assembled_dir;
char * acked_dir;
FILE * fp;
} reassembler_state_t;
reassembler_state_t * global_state = NULL;
static FILE *
my_fopen (const char *fname, const char *opts)
{
elog (LOG_INFO, " ");
FILE *fp = fopen (fname, opts);
if (fp == NULL)
{
elog (LOG_ERR, "Error while trying to open %s: %m", fname);
}
return fp;
}
// Reads block into a buf_t
// Does sanity checking of block's length and offset.
// Writes to correct portion of the temporary file: state->fp
// Frees the buf_t.
int
ftw_process_file (const char * file, const struct stat * sb, int flag)
{
elog (LOG_INFO, " ");
// Setup
reassembler_state_t * state = global_state;
buf_t * block_buf = buf_new ();
if (file_to_buf (block_buf, file) < 0)
{
elog (LOG_ERR, "Unable to read file: %s: %m", file);
goto final;
}
// Check
if (block_buf->len < sizeof (block_t))
{
elog (LOG_ERR, "File is smaller than block_t: %s ", file);
goto final;
}
block_t * block = (block_t *) block_buf->buf;
if (block_buf->len != (sizeof (block_t) + block->length))
{
elog (LOG_ERR, "File size does not equal (block_t size + block->length): %s",
file);
goto final;
}
if ( (block->offset + block->length) > block->uid.total_length )
{
elog (LOG_ERR, "Block claims to have an offset and length that goes beyond the block's total_length: file: %s, offset: %d, length: %d, total_length: %d",
file, block->offset, block->length, block->uid.total_length);
goto final;
}
// Work
fseek (state->fp, block->offset, SEEK_SET);
size_t items_written = fwrite (block->data, 1, block->length, state->fp);
if (items_written != block->length)
{
elog (LOG_ERR,
"Error writing data from block to file: %s, offset %d, length %d",
file, block->offset, block->length);
goto final;
}
final:
buf_free (block_buf);
return 0;
}
int
nftw_remove_files (const char * file, const struct stat * sb, int flag,
struct FTW * s)
{
elog (LOG_INFO, "Removing file: %s", file);
if (remove (file) < 0)
elog (LOG_ERR, "Unable to remove file %s", file);
return 0;
}
static void
delete_dir (const char * dir)
{
elog (LOG_INFO, " ");
if (nftw (dir, nftw_remove_files, 4, FTW_DEPTH) < 0)
{
elog (LOG_INFO, "nftw encountered an error on %s: %m", dir);
}
}
// Open tmp file
// For each file in the fragmented dir, write to tmp file.
// Close the tmp file
// Rename tmp file to final file.
// Delete the tmp file.
// Delete the old directory
static void
reassembler_process_dir (reassembler_state_t * state, const char * dirname)
{
elog (LOG_INFO, " ");
buf_t * dirpath = buf_new ();
buf_t * tmpfile = buf_new ();
buf_t * finalfile = buf_new ();
bufprintf (dirpath, "%s/%s", state->acked_dir, dirname);
bufprintf (tmpfile, "%s/TEMPFILE", state->assembled_dir);
bufprintf (finalfile, "%s/%s", state->assembled_dir, dirname);
// make sure the parent directory exists before renaming tmp file
if (mkdir_with_parents (state->assembled_dir) < 0)
{
elog (LOG_ERR, "Unable to create directory: %s", state->assembled_dir);
goto final;
}
state->fp = my_fopen (tmpfile->buf, "w");
if (state->fp == NULL)
{
elog (LOG_ERR, "Error opening file: %s: %m", tmpfile->buf);
goto final;
}
if (ftw (dirpath->buf, ftw_process_file, 4) < 0)
{
elog (LOG_CRIT, "ftw encountered an error on %s: %m",
dirpath->buf);
goto final;
}
fclose (state->fp);
state->fp = NULL;
if (rename (tmpfile->buf, finalfile->buf) < 0)
{
elog (LOG_ERR, "Unable to rename %s --> %s: %m", tmpfile->buf, finalfile->buf);
goto final;
}
final:
if (state->fp)
fclose (state->fp);
state->fp = NULL;
// Clean up tmp file and directory
if (remove (tmpfile->buf) < 0)
{
elog (LOG_ERR, "Unable to remove file: %s", tmpfile->buf);
}
delete_dir (dirpath->buf);
buf_free (dirpath);
buf_free (tmpfile);
buf_free (finalfile);
}
// Filter out everything except directories not equal to "." and ".."
static int
scandir_filter (const struct dirent * ent)
{
elog (LOG_INFO, " ");
if (strcmp (ent->d_name, ".") == 0)
return 0;
if (strcmp (ent->d_name, "..") == 0)
return 0;
else
return 1;
}
// Scan acked_dir for directories.
// For each found directory, attempt to reassemble the associated file
// and place it into assembled_dir.
static void
reassembler_timer_helper (reassembler_state_t * state)
{
elog (LOG_INFO, " ");
struct dirent **namelist = NULL;
int n;
n = scandir(state->acked_dir, &namelist, scandir_filter, NULL);
if (n < 0)
{
elog (LOG_CRIT, "scandir failed while scanning: %s: %m", state->acked_dir);
goto final;
}
while (n--)
{
reassembler_process_dir (state, namelist[n]->d_name);
free (namelist [n]);
}
free (namelist);
final:
return;
}
// If I'm the sink, don't bother.
static int
reassembler_timer_cb (gpointer data, int interval, g_event_t * ev)
{
elog (LOG_INFO, " ");
reassembler_state_t * state = (reassembler_state_t *) data;
if (state->sinkstatus == 0)
goto final;
reassembler_timer_helper (state);
final:
return TIMER_RENEW;
}
static void
usage (char *name)
{
misc_print_usage (name, "TODO", "TODO");
exit (1);
}
static reassembler_state_t
init_state ()
{
elog (LOG_INFO, " ");
buf_t * acked_dir_buf = buf_new ();
bufprintf (acked_dir_buf, ACKED_DIR_PATTERN, my_node_id);
buf_t * assembled_dir_buf = buf_new ();
if (in_sim)
bufprintf (assembled_dir_buf, ASSEMBLED_DIR_PATTERN_SIM, my_node_id);
else
bufprintf (assembled_dir_buf, ASSEMBLED_DIR);
reassembler_state_t state = {
status_context: NULL,
sinkstatus:0,
acked_dir: strdup (acked_dir_buf->buf),
assembled_dir: strdup (assembled_dir_buf->buf)
};
buf_free (assembled_dir_buf);
buf_free (acked_dir_buf);
return state;
}
// Notice when our sink status changes, and updates state.
static int
status_handler (void *new_buf, size_t size, void *data)
{
elog (LOG_INFO,"Received %s, size %d", (char *) new_buf, size);
if (size == 0)
{
elog (LOG_ERR, "Size is 0!");
goto final;
}
if (!new_buf)
{
elog (LOG_ERR, "new_buf is NULL");
goto final;
}
reassembler_state_t *state = (reassembler_state_t *) data;
int new_status = atoi ((char *) new_buf);
if ((new_status == 0) || (new_status == 1))
{
if (new_status != state->sinkstatus)
{
elog (LOG_NOTICE, "Changing status to %d", new_status);
state->sinkstatus = new_status;
}
else
{
elog (LOG_NOTICE, "Status is the same.");
}
}
else
{
elog (LOG_ERR, "Unrecognized status");
goto final;
}
final:
free (new_buf);
return EVENT_RENEW;
}
static void
init_status (reassembler_state_t * state)
{
elog (LOG_INFO, " ");
status_client_opts_t opts = {
devname:sim_path ("/dev/block/sinkstatus"),
handler:status_handler,
private_data:state
};
if (g_status_client_full (&opts, &state->status_context) < 0)
{
elog (LOG_CRIT, "Unable to open status client to %s", opts.devname);
exit (1);
}
}
int
main (int argc, char * argv[])
{
misc_init (&argc, argv, CVSTAG);
{
emrun_opts_t emrun_opts = {
shutdown: block_generic_shutdown,
data: "reassembler"
};
emrun_init (&emrun_opts);
}
if (argc > 1)
{
usage (argv [0]);
}
reassembler_state_t state = init_state();
if (g_timer_add (REASSEMBLER_TIMER, reassembler_timer_cb, &state, NULL, NULL)
< 0)
{
elog (LOG_CRIT, "Unable to create timer event.");
exit (1);
}
init_status (&state);
global_state = &state;
g_main ();
elog (LOG_ALERT, "Event system terminated abnormally.");
return 1;
}
See more files for this project here