Code Search for Developers
 
 
  

oob.c from Gtk-Gnutella at Krugle


Show oob.c syntax highlighted

/*
 * $Id: oob.c 13777 2007-05-29 00:23:11Z cbiere $
 *
 * Copyright (c) 2004, Raphael Manfredi
 *
 *----------------------------------------------------------------------
 * This file is part of gtk-gnutella.
 *
 *  gtk-gnutella is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  gtk-gnutella is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with gtk-gnutella; if not, write to the Free Software
 *  Foundation, Inc.:
 *      59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *----------------------------------------------------------------------
 */

/**
 * @ingroup core
 * @file
 *
 * Out of band query hits.
 *
 * @author Raphael Manfredi
 * @date 2004
 */

#include "common.h"

RCSID("$Id: oob.c 13777 2007-05-29 00:23:11Z cbiere $")

#include "oob.h"
#include "hosts.h"
#include "nodes.h"
#include "share.h"
#include "guid.h"
#include "pmsg.h"
#include "mq.h"
#include "mq_udp.h"
#include "vmsg.h"
#include "qhit.h"
#include "gmsg.h"
#include "gnet_stats.h"

#include "if/gnet_property_priv.h"

#include "lib/atoms.h"
#include "lib/cq.h"
#include "lib/fifo.h"
#include "lib/walloc.h"
#include "lib/override.h"		/* Must be the last header included */

#define OOB_EXPIRE_MS		(2*60*1000)		/**< 2 minutes at most */
#define OOB_TIMEOUT_MS		(45*1000)		/**< 45 secs for them to reply */
#define OOB_DELIVER_BASE_MS	2500			/**< 1 msg queued every 2.5 secs */
#define OOB_DELIVER_RAND_MS	5000			/**< ... + up to 5 random secs */

#define OOB_MAX_QUEUED		50				/**< Max # of messages per host */
#define OOB_MAX_RETRY		3				/**< Retry # if LIME/12v2 dropped */

#define OOB_MAX_QHIT_SIZE	645			/**< Flush hits larger than this */
#define OOB_MAX_DQHIT_SIZE	1075		/**< Flush limit for deflated hits */

typedef enum {
	OOB_RESULTS_MAGIC = 0x7ae5e685
} oob_results_magic_t;

/**
 * A set of hits awaiting delivery.
 */
struct oob_results {
	oob_results_magic_t	magic;
	gint refcount;
	cevent_t *ev_expire;	/**< Global expiration event */
	cevent_t *ev_timeout;	/**< Reply waiting timeout */
	const gchar *muid;		/**< (atom) MUID of the query that generated hits */
	GSList *files;			/**< List of shared_file_t */
	gnet_host_t dest;		/**< The host to which we must deliver */
	gint count;				/**< Amount of hits to deliver */
	gint notify_requeued;	/**< Amount of LIME/12v2 requeued after dropping */
	gboolean secure;		/**< TRUE -> secure OOB, FALSE -> normal OOB */
	gboolean ggep_h;		/**< TRUE -> use GGEP H, FALSE -> plain text */
};

/**
 * Indexes all OOB queries by MUID.
 * This hash table records MUID => "struct oob_results"
 */
static GHashTable *results_by_muid = NULL;

/**
 * Each servent, as identified by its IP:port, is given a FIFO for queuing
 * messages and sending them at a rate of 1 message every OOB_DELIVER_MS, to
 * avoid UDP flooding on the remote side.
 *
 * This hash table records gnet_host_t => "struct gservent"
 */
static GHashTable *servent_by_host = NULL;

/**
 * A servent entry, used as values in the `servent_by_host' table.
 */
struct gservent {
	cevent_t *ev_service; /**< Callout event for servicing FIFO */
	gnet_host_t *host;	  /**< The servent host (also used as key for table) */
	fifo_t *fifo;		  /**< The servent's FIFO, holding pmsg_t items */
	gboolean can_deflate; /**< Whether servent supports UDP compression */
};

/*
 * High-level description of what's happening here.
 *
 * When we get notified by share.c about a set of hits, we create the
 * struct oob_results, set the global expire to OOB_EXPIRE_MS and
 * send a LIME/12v2 to the querying, arming OOB_TIMEOUT_MS only AFTER
 * we get notified by the MQ that we sent the message.  If message was
 * dropped, requeue.  Do that OOB_MAX_RETRY times at most, then discard
 * the results.
 *
 * On reception of LIME/11v2, prepare all hits, put them in the FIFO
 * for this servent, then free the list.
 * Every OOB_DELIVER_MS, enqueue a hit to the UDP MQ for sending.
 */

static void results_destroy(cqueue_t *cq, gpointer obj);
static void servent_free(struct gservent *s);
static void oob_send_reply_ind(struct oob_results *r);

static gint num_oob_records;	/**< Leak and duplicate free detector */
static gboolean oob_shutdown_running;

static void
oob_results_check(const struct oob_results *r)
{
	g_assert(r);
	g_assert(OOB_RESULTS_MAGIC == r->magic);
	g_assert(r->refcount >= 0);
	g_assert(r->muid);
}

/**
 * Create new "struct oob_results" to handle the initial negotiation of
 * results delivery via the sent LIME/12v2 and the expected LIME/11v2 reply.
 */
static struct oob_results *
results_make(const gchar *muid, GSList *files, gint count, gnet_host_t *to,
	gboolean secure, gboolean ggep_h)
{
	static const struct oob_results zero_results;
	struct oob_results *r;

	g_return_val_if_fail(!g_hash_table_lookup(results_by_muid, muid), NULL);

	r = walloc(sizeof *r);
	*r = zero_results;
	r->magic = OOB_RESULTS_MAGIC;
	r->muid = atom_guid_get(muid);
	r->files = files;
	r->count = count;
	r->dest = *to;			/* Struct copy */
	r->secure = secure;
	r->ggep_h = ggep_h;

	r->ev_expire = cq_insert(callout_queue, OOB_EXPIRE_MS, results_destroy, r);
	r->refcount++;

	gm_hash_table_insert_const(results_by_muid, r->muid, r);

	g_assert(num_oob_records >= 0);
	num_oob_records++;
	if (GNET_PROPERTY(query_debug) > 1)
		g_message("results_make: num_oob_records=%d", num_oob_records);

	return r;
}

/**
 * Dispose of results.
 */
static void
results_free_remove(struct oob_results *r)
{
	GSList *sl;

	oob_results_check(r);
	
	if (r->ev_expire) {
		cq_cancel(callout_queue, &r->ev_expire);
		g_assert(r->refcount > 0);
		r->refcount--;
	}
	if (r->ev_timeout) {
		cq_cancel(callout_queue, &r->ev_timeout);
		g_assert(r->refcount > 0);
		r->refcount--;
	}

	if (0 == r->refcount) {
		/* We must not modify the hash table whilst iterating over it */
		if (!oob_shutdown_running) {
			g_assert(r == g_hash_table_lookup(results_by_muid, r->muid));
			g_hash_table_remove(results_by_muid, r->muid);
		}
		atom_guid_free_null(&r->muid);

		for (sl = r->files; sl; sl = g_slist_next(sl)) {
			shared_file_t *sf = sl->data;
			shared_file_unref(&sf);
		}
		g_slist_free(r->files);
		r->files = NULL;

		g_assert(num_oob_records > 0);
		num_oob_records--;
		if (GNET_PROPERTY(query_debug) > 2)
			g_message("results_free: num_oob_records=%d", num_oob_records);

		r->magic = 0;
		wfree(r, sizeof *r);
	}
}

/**
 * Callout queue callback to free the results.
 */
static void
results_destroy(cqueue_t *unused_cq, gpointer obj)
{
	struct oob_results *r = obj;

	(void) unused_cq;
	oob_results_check(r);

	if (GNET_PROPERTY(query_debug))
		g_message("OOB query %s from %s expired with unclaimed %d hit%s",
			guid_hex_str(r->muid), gnet_host_to_string(&r->dest),
			r->count, r->count == 1 ? "" : "s");

	gnet_stats_count_general(GNR_UNCLAIMED_OOB_HITS, 1);

	r->ev_expire = NULL;		/* The timer which just triggered */
	r->refcount--;

	results_free_remove(r);
}

/**
 * Callout queue callback to free the results.
 */
static void
results_timeout(cqueue_t *unused_cq, gpointer obj)
{
	struct oob_results *r = obj;

	(void) unused_cq;
	oob_results_check(r);

	if (GNET_PROPERTY(query_debug))
		g_message("OOB query %s, no ACK from %s to claim %d hit%s",
			guid_hex_str(r->muid), gnet_host_to_string(&r->dest),
			r->count, r->count == 1 ? "" : "s");

	gnet_stats_count_general(GNR_UNCLAIMED_OOB_HITS, 1);

	r->ev_timeout = NULL;		/* The timer which just triggered */
	r->refcount--;

	results_free_remove(r);
}

/**
 * Dispose of servent, removing entry from the `servent_by_host' table.
 */
static void
servent_free_remove(struct gservent *s)
{
	g_hash_table_remove(servent_by_host, s->host);
	servent_free(s);
}

/**
 * Computes the amount of milliseconds before the next OOB hit delivery,
 *
 * Per a suggestion of Daniel Stutzbach, we wait BASE + RAND*random secs,
 * where "random" is a real random number between 0 and 1.
 */
static gint
deliver_delay(void)
{
	return OOB_DELIVER_BASE_MS + random_value(OOB_DELIVER_RAND_MS);
}

/**
 * Service servent's FIFO: send next packet, and re-arm servicing callback
 * if there are more data to send.
 */
static void
servent_service(cqueue_t *cq, gpointer obj)
{
	struct gservent *s = obj;
	pmsg_t *mb;
	mqueue_t *q;

	s->ev_service = NULL;		/* The callback that just triggered */

	mb = fifo_remove(s->fifo);
	if (mb == NULL)
		goto remove;

	q = node_udp_get_outq(host_addr_net(gnet_host_get_addr(s->host)));
	if (q == NULL)
		goto udp_disabled;

	if (GNET_PROPERTY(udp_debug) > 19)
		g_message("UDP queuing OOB %s to %s for %s",
			gmsg_infostr_full(pmsg_start(mb)), gnet_host_to_string(s->host),
			guid_hex_str(pmsg_start(mb)));

	/*
	 * Count enqueued deflated payloads, only when server was marked as
	 * supporting compression anyway...
	 */

	if (s->can_deflate) {
		if (gnutella_header_get_ttl(pmsg_start(mb)) & GTA_UDP_DEFLATED)
			gnet_stats_count_general(GNR_UDP_TX_COMPRESSED, 1);
	}

	mq_udp_putq(q, mb, s->host);

	if (0 == fifo_count(s->fifo))
		goto remove;

	s->ev_service = cq_insert(cq, deliver_delay(), servent_service, s);

	return;

udp_disabled:
	pmsg_free(mb);
	/* FALL THROUGH */

remove:
	servent_free_remove(s);
}

/**
 * Create a new servent structure.
 *
 * @param host the servent's IP:port.  Caller may free it upon return.
 */
static struct gservent *
servent_make(gnet_host_t *host, gboolean can_deflate)
{
	struct gservent *s;

	s = walloc(sizeof *s);
	s->host = walloc(sizeof *s->host);
	*s->host = *host;		/* Struct copy */
	s->fifo = fifo_make();
	s->ev_service = NULL;
	s->can_deflate = can_deflate;

	return s;
}

/**
 * Cleanup items from FIFO.
 * -- fifo_free_all() callback.
 */
static void
free_pmsg(gpointer item, gpointer unused_udata)
{
	pmsg_t *mb = item;

	(void) unused_udata;
	pmsg_free(mb);
}

/**
 * Free servent structure.
 */
static void
servent_free(struct gservent *s)
{
	cq_cancel(callout_queue, &s->ev_service);
	wfree(s->host, sizeof *s->host);
	fifo_free_all(s->fifo, free_pmsg, NULL);
	wfree(s, sizeof *s);
}

/**
 * Invoked via qhit_build_results() for each fully built query hit message.
 * Hit is enqueued in the FIFO, for slow delivery.
 */
static void
oob_record_hit(gpointer data, size_t len, gpointer udata)
{
	struct gservent *s = udata;

	g_assert(len <= INT_MAX);
	fifo_put(s->fifo, s->can_deflate ?
		gmsg_to_deflated_pmsg(data, len) :
		gmsg_to_pmsg(data, len));
}

/**
 * The remote host acknowledges that we have some hits for it and wishes
 * to get the specified amount.
 *
 * @param n			where we got the message from
 * @param muid		the query identifier
 * @param wanted	the amount of results they want delivered
 * @param token		the token for secure OOB
 */
void
oob_deliver_hits(struct gnutella_node *n, const gchar *muid, guint8 wanted,
	const struct array *token)
{
	struct oob_results *r;
	struct gservent *s;
	gint deliver_count;
	gboolean servent_created = FALSE;

	g_assert(NODE_IS_UDP(n));
	g_assert(token);

	r = g_hash_table_lookup(results_by_muid, muid);

	if (r == NULL) {
		gnet_stats_count_general(GNR_SPURIOUS_OOB_HIT_CLAIM, 1);
		if (GNET_PROPERTY(query_debug))
			g_warning("OOB got spurious LIME/11 from %s for %s, "
				"asking for %d hit%s",
				node_addr(n), guid_hex_str(muid),
				wanted, wanted == 1 ? "" : "s");
		return;
	}
	oob_results_check(r);

	/*
	 * Here's what could happen with proxied OOB queries:
	 *
	 *                 query               query
	 *      Queryier  ------> Proxying UP -------> Server
	 *               <--TCP--             <--UDP--
	 *               GTKG/12v2            LIME/12v2
	 *
	 *                        LIME/11v2
	 *      Queryier ------------UDP------------> Server
	 *               <-----------UDP-------------
	 *                        query hits
	 *
	 * The above forwarding by the Proxying UP can only be done when
	 * the server has mentionned that it could receive unsolicited UDP
	 * in its LIME/12v2 message.
	 *
	 * This means that we MUST not reply to the IP:port held in the
	 * GUID of the message, but really to the origin of the LIME/11v2
	 * message.
	 *
	 *		--RAM, 2004-09-10
	 */

	if (!host_addr_equal(n->addr, gnet_host_get_addr(&r->dest))) {
		/**
		 * The sender's IP address can of course change any time as
		 * dynamic IP addresses are very common. The sender might also
		 * have multiple network interfaces.
		 */
		
		g_warning("OOB query %s might have been proxied: it had IP %s, "
			"but the LIME/11v2 ACK comes from %s", guid_hex_str(muid),
			gnet_host_to_string(&r->dest), node_addr(n));

		/*
		 * We'll send the hits to the host from where the ACK comes.
		 */

		gnet_host_set(&r->dest, n->addr, n->port);
	}

	/*
	 * Fetch the proper servent, create one if none exists yet.
	 *
	 * N.B: We assume that for a given host address, UDP deflation support
	 * will never change: if the host has marked support for deflation in
	 * the claim message once, we assume that it will always support it.
	 * Likewise, if it did not request it the first time, no matter what we
	 * get next, we will never deflate hits for this OOB delivery.
	 *		--RAM, 2006-08-13
	 */

	s = g_hash_table_lookup(servent_by_host, &r->dest);
	if (s == NULL) {
		gboolean can_deflate = NODE_CAN_INFLATE(n);	/* Can we deflate? */
		s = servent_make(&r->dest, can_deflate);
		g_hash_table_insert(servent_by_host, s->host, s);
		servent_created = TRUE;
	}

	g_assert(servent_created || s->ev_service != NULL);

	/*
	 * Build the query hits, enqueuing them to the servent's FIFO.
	 */

	deliver_count = (wanted == 255) ? r->count : MIN(wanted, r->count);

	if (GNET_PROPERTY(query_debug) || GNET_PROPERTY(udp_debug))
		g_message("OOB query %s: host %s wants %d hit%s, delivering %d",
			guid_hex_str(r->muid), node_addr(n), wanted, wanted == 1 ? "" : "s",
			deliver_count);

	if (deliver_count)
		qhit_build_results(
			r->files, deliver_count,
			s->can_deflate ? OOB_MAX_DQHIT_SIZE : OOB_MAX_QHIT_SIZE,
			oob_record_hit, s, r->muid, r->ggep_h, token);

	if (wanted < r->count)
		gnet_stats_count_general(GNR_PARTIALLY_CLAIMED_OOB_HITS, 1);

	/*
	 * We're now done with the "oob_results" structure, since all the
	 * to-be-delivered hits have been queued as Gnutella messages in
	 * the servent's FIFO.
	 */

	results_free_remove(r);

	/*
	 * If we just created a new servent entry, service it to send a
	 * first query hit.  Otherwise, we already have a callback installed
	 * for servicing it at regular interval.
	 */

	if (servent_created)
		servent_service(callout_queue, s);
}

/**
 * Callback invoked when the LIME/12v2 message we queued is freed.
 */
static void
oob_pmsg_free(pmsg_t *mb, gpointer arg)
{
	struct oob_results *r = arg;

	g_assert(pmsg_is_extended(mb));
	oob_results_check(r);
	r->refcount--;

	/*
	 * If we sent the message, great!  Arm a timer to ensure we get a
	 * reply within the next OOB_TIMEOUT_MS.
	 */

	if (pmsg_was_sent(mb)) {

		/* There may have been up to OOB_MAX_RETRY in the queue, ``r''
		 * is shared between all of them. So r->ev_timeout may have been
		 * set already.
		 */
		if (r->ev_timeout) {
			results_free_remove(r);
		} else {

			if (GNET_PROPERTY(query_debug) || GNET_PROPERTY(udp_debug))
				g_message("OOB query %s, notified %s about %d hit%s",
					guid_hex_str(r->muid), gnet_host_to_string(&r->dest),
					r->count, r->count == 1 ? "" : "s");

			/*
			 * If we don't get any ACK back, we'll discard the results.
			 */

			r->ev_timeout = cq_insert(callout_queue, OOB_TIMEOUT_MS,
					results_timeout, r);
			r->refcount++;
		}
	} else {
		/*
		 * If we were not able to send the message,
		 */

		if (GNET_PROPERTY(query_debug))
			g_message("OOB query %s, previous LIME12/v2 #%d was dropped",
					guid_hex_str(r->muid), r->notify_requeued);

		if (++r->notify_requeued < OOB_MAX_RETRY)
			oob_send_reply_ind(r);
		else
			results_free_remove(r);
	}
}

/**
 * Send them a LIME/12v2, monitoring progress in queue via a callback.
 */
static void
oob_send_reply_ind(struct oob_results *r)
{
	mqueue_t *q;
	pmsg_t *mb;
	pmsg_t *emb;

	oob_results_check(r);

	q = node_udp_get_outq(host_addr_net(gnet_host_get_addr(&r->dest)));
	if (q == NULL)
		return;

	mb = vmsg_build_oob_reply_ind(r->muid, MIN(r->count, 255), r->secure);
	emb = pmsg_clone_extend(mb, oob_pmsg_free, r);
	r->refcount++;
	pmsg_free(mb);

	if (GNET_PROPERTY(query_debug) || GNET_PROPERTY(udp_debug))
		g_message("OOB query %s, notifying %s about %d hit%s, try #%d",
			guid_hex_str(r->muid), gnet_host_to_string(&r->dest),
			r->count, r->count == 1 ? "" : "s", r->notify_requeued);

	mq_udp_putq(q, emb, &r->dest);
}

/**
 * Notification that we got matches for a query from some node that needs
 * to be replied to using out-of-band delivery.
 *
 * @param n				the node from which we got the query
 * @param files			the list of shared_file_t entries that make up results
 * @param count			the amount of results
 * @param secure		whether secure OOB was requested
 * @param ggep_h		whether GGEP H is understood
 */
void
oob_got_results(struct gnutella_node *n, GSList *files,
	gint count, gboolean secure, gboolean ggep_h)
{
	struct oob_results *r;
	gnet_host_t to;
	host_addr_t addr;
	guint16 port;

	g_assert(count > 0);
	g_assert(files != NULL);

	guid_oob_get_addr_port(gnutella_header_get_muid(&n->header), &addr, &port);
	gnet_host_set(&to, addr, port);
	r = results_make(gnutella_header_get_muid(&n->header), files, count, &to,
			secure, ggep_h);
	if (r) {
		oob_send_reply_ind(r);
	}
}

/**
 * Initialize out-of-band query hit delivery.
 */
void
oob_init(void)
{
	results_by_muid = g_hash_table_new(guid_hash, guid_eq);
	servent_by_host = g_hash_table_new(host_hash, host_eq);
}

/**
 * Cleanup oob_results -- hash table iterator callback
 */
static void
free_oob_kv(gpointer key, gpointer value, gpointer unused_udata)
{
	struct oob_results *r = value;

	(void) unused_udata;
	oob_results_check(r);
	g_assert(key == r->muid);		/* Key is same as results's MUID */

	r->refcount = 0; /* Enforce release */
	if (r->ev_timeout) {
		r->refcount++;
	}
	if (r->ev_expire) {
		r->refcount++;
	}
	results_free_remove(r);
}

/**
 * Cleanup servent -- hash table iterator callback
 */
static void
free_servent_kv(gpointer key, gpointer value, gpointer unused_udata)
{
	gnet_host_t *host = key;
	struct gservent *s = value;

	(void) unused_udata;
	g_assert(host == s->host);		/* Key is same as servent's host */

	servent_free(s);
}

/**
 * Cleanup at shutdown time.
 */
void
oob_shutdown(void)
{
	oob_shutdown_running = TRUE;

	g_hash_table_foreach(results_by_muid, free_oob_kv, NULL);
	g_hash_table_destroy(results_by_muid);
	results_by_muid = NULL;

	g_hash_table_foreach(servent_by_host, free_servent_kv, NULL);
	g_hash_table_destroy(servent_by_host);
	servent_by_host = NULL;

	g_assert(num_oob_records >= 0);
	if (num_oob_records > 0)
		g_warning("%d OOB reply records possibly leaked", num_oob_records);
}

/**
 * Final cleanup.
 */
void
oob_close(void)
{
}

/* vi: set ts=4 sw=4 cindent: */




See more files for this project here

Gtk-Gnutella

A GTK+ Gnutella client for Unix, efficient, reliable and fast, written in C. It has been optimized for speed and scalability, with low-memory consumption. It is meant to be left running 24x7, using little CPU and only the configured bandwidth.

Project homepage: http://sourceforge.net/projects/gtk-gnutella
Programming language(s): C
License: other

  Jmakefile
  Makefile.SH
  alive.c
  alive.h
  ban.c
  ban.h
  bh_download.c
  bh_download.h
  bh_upload.c
  bh_upload.h
  bitzi.c
  bitzi.h
  bogons.c
  bogons.h
  bsched.c
  bsched.h
  clock.c
  clock.h
  dh.c
  dh.h
  dime.c
  dime.h
  dmesh.c
  dmesh.h
  downloads.c
  downloads.h
  dq.c
  dq.h
  extensions.c
  extensions.h
  features.c
  features.h
  file_object.c
  file_object.h
  fileinfo.c
  fileinfo.h
  geo_ip.c
  geo_ip.h
  ggep.c
  ggep.h
  ggep_type.c
  ggep_type.h
  gmsg.c
  gmsg.h
  gnet_stats.c
  gnet_stats.h
  gnutella.h
  guid.c
  guid.h
  hcache.c
  hcache.h
  hostiles.c
  hostiles.h
  hosts.c
  hosts.h
  hsep.c
  hsep.h
  http.c
  http.h
  huge.c
  huge.h
  ignore.c
  ignore.h
  inet.c
  inet.h
  ioheader.c
  ioheader.h
  local_shell.c
  local_shell.h
  matching.c
  matching.h
  mime_types.h
  move.c
  move.h
  mq.c
  mq.h
  mq_tcp.c
  mq_tcp.h
  mq_udp.c
  mq_udp.h
  namesize.c
  namesize.h
  nodes.c
  nodes.h
  ntp.c
  ntp.h
  oob.c
  oob.h
  oob_proxy.c
  oob_proxy.h
  parq.c
  parq.h
  pcache.c
  pcache.h
  pmsg.c
  pmsg.h
  pproxy.c
  pproxy.h
  qhit.c
  qhit.h
  qrp.c
  qrp.h