Show mdiff_test.c syntax highlighted
#include "emrun/emrun.h"
#include "libdev/packet_client.h"
#include "libdev/status_client.h"
#include "libmisc/misc.h"
#include "../src/Mdiff.h"
#include "mdiff_test.h"
static int status_print(status_context_t *info, buf_t *buf);
static int status_binary(status_context_t *info, buf_t *buf);
static int md_pd_receive(void *pkt, ssize_t len, pd_client_context_t *pd_client);
static int md_test_command(status_context_t * info, char * command, size_t buf_size);
static void send_subscription(md_test_state_t * state);
static void md_test_send_data(md_test_state_t * state, int node, char * blob);
int main(int argc, char * argv[])
{
md_test_state_t state = {
foo: 0
};
misc_init(&argc, argv, CVSTAG);
// setup status
{
status_dev_opts_t status_opts = {
device: {
devname: sim_path("/dev/mdiff_test/status"),
device_info: &state
},
printable: status_print,
binary: status_binary,
write: md_test_command
};
if (g_status_dev(&status_opts, &(state.status_ref)) < 0) {
elog(LOG_CRIT, "unable to create status device: %m");
exit(1);
}
}
// set up packet device client
{
pd_client_opts_t opts = {
devname: MDIFF_APP_PACKET_DEV,
data: &state,
receive: md_pd_receive
};
if (pd_client_open(&opts, &(state.pd_ref)) < 0) {
elog(LOG_CRIT, "can't open %s: %m", MDIFF_APP_PACKET_DEV);
exit(1);
}
}
// set up emrun
{
emrun_opts_t emrun_opts = {
data: &state
};
emrun_init(&emrun_opts);
}
g_status_dev_notify(state.status_ref);
g_main();
elog(LOG_ALERT, "Event system terminated abnormally.");
return 1;
}
static int status_print(status_context_t *info, buf_t *buf)
{
bufprintf(buf, "Hi\n");
return STATUS_MSG_COMPLETE;
}
static int status_binary(status_context_t *info, buf_t *buf)
{
md_test_state_t * state = (md_test_state_t*)sd_data(info);
bufcpy(buf,state,sizeof(md_test_state_t));
return STATUS_MSG_COMPLETE;
}
static int md_pd_receive(void *pkt, ssize_t len, pd_client_context_t *pd_client)
{
MdMsg_t * msg = (MdMsg_t*)pkt;
elog(LOG_INFO, "md_pd_receive");
switch(msg->type)
{
case MdHandle:
elog(LOG_INFO, "Received Handle: %d\n", *((int8_t*)msg->data));
break;
case MdRecvMsg:
elog(LOG_INFO, "Received a msg!!\n");
elog(LOG_INFO, "msg is %s",
(char*)(((DiffBlob_t*)(&(((DiffMsg_t*)msg->data)->m_pData[2*sizeof(DiffAttr_t)])))->m_pData) );
break;
default:
elog(LOG_CRIT, "unknown msg type: %d", msg->type);
}
free(pkt);
return EVENT_RENEW;
}
static int md_test_command(status_context_t * info, char * command, size_t buf_size)
{
md_test_state_t * state = (md_test_state_t *)sd_data(info);
int node = 1;
int times = 1;
int delay = 1;
int i;
parser_state_t p_state = {
input: command,
input_len: buf_size
};
elog(LOG_INFO," ");
while (misc_parse_next_kvp(&p_state) >= 0) {
if (strcmp("subscribe", p_state.key) == 0) {
send_subscription(state);
}
else if (strcmp("node", p_state.key) == 0) {
node = atoi(p_state.value);
}
else if (strcmp("times", p_state.key) == 0) {
times = atoi(p_state.value);
}
else if (strcmp("delay", p_state.key) == 0) {
delay = atoi(p_state.value);
}
else if (strcmp("data", p_state.key) == 0) {
elog(LOG_CRIT,"Sending to node %d, times %d, delay %d:\"%s\"",node,times,delay,p_state.value);
for (i = 0; i < times; ++i) {
md_test_send_data(state, node, p_state.value);
usleep(delay);
}
}
}
return EVENT_RENEW;
}
static void send_subscription(md_test_state_t * state)
{
// craft a subscription packet.
size_t length = sizeof(MdMsg_t) + sizeof(DiffAttr_t);
MdMsg_t * msg = (MdMsg_t *)malloc(length);
elog(LOG_INFO," ");
memset(msg,0,length);
msg->type = MdSubscribe;
msg->length = sizeof(DiffAttr_t);
((DiffAttr_t*)msg->data)->m_uiAttr = TD_ATTR_USER_BASE_KEY + 1;
((DiffAttr_t*)msg->data)->m_uiOp = TD_EQ;
((DiffAttr_t*)msg->data)->m_iValue = my_node_id;
// use the state to get the pd_context.
pd_client_send(state->pd_ref, msg, length);
// send packet.
free (msg);
}
static void md_test_send_data(md_test_state_t * state, int node, char * blob)
{
// char blob[] = "Hello World";
size_t length = sizeof(MdMsg_t) + sizeof(DiffMsg_t);
MdMsg_t * msg = (MdMsg_t*)malloc(length);
DiffMsg_t * dmsg = (DiffMsg_t*)msg->data;
DiffAttr_t * pAttr = NULL;
DiffBlob_t * pBlob = NULL;
elog(LOG_INFO," ");
memset(msg,0,length);
// Setup MdMsg_t
msg->type = MdSendMsg;
msg->length = sizeof(DiffMsg_t);
// Setup DiffMsg_t
dmsg->m_uiDataLen = 2 * sizeof(DiffAttr_t) + sizeof(DiffBlob_t) + strlen(blob) + 1;
// Setup DiffAttr_t
pAttr = (DiffAttr_t*)dmsg->m_pData;
pAttr[0].m_uiAttr = TD_ATTR_CLASS;
pAttr[0].m_uiOp = TD_IS;
pAttr[0].m_iValue = TD_ATTR_DATA;
//pAttr = &dmsg->m_pData[1];
pAttr[1].m_uiAttr = TD_ATTR_USER_BASE_KEY + 1;
pAttr[1].m_uiOp = TD_IS;
pAttr[1].m_iValue = node;
// Setup blob
pBlob = (DiffBlob_t*)(&pAttr[2]);
pBlob->m_uiAttr = TD_ATTR_USER_BASE_BLOB;
pBlob->m_uiLen = strlen(blob) + 1;
memcpy(pBlob->m_pData, blob, strlen(blob)+1);
pd_client_send(state->pd_ref, msg, length);
free(msg);
}
See more files for this project here