Logo Search packages:      
Sourcecode: ganglia version File versions

gmond.c

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <ganglia.h> /* for the libgmond messaging */
#include <gm_metric.h>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <syslog.h>
#include <math.h>

#include <apr.h>
#include <apr_strings.h>
#include <apr_hash.h>
#include <apr_time.h>
#include <apr_pools.h>
#include <apr_poll.h>
#include <apr_network_io.h>
#include <apr_signal.h>       
#include <apr_thread_proc.h>  /* for apr_proc_detach(). no threads used. */
#include <apr_tables.h>
#include <apr_dso.h>
#include <apr_version.h>

#include "cmdline.h"   /* generated by cmdline.sh which runs gengetopt */
#include "become_a_nobody.h"
#include "apr_net.h"   /* our private network functions based on apr */
#include "dtd.h"       /* the DTD definition for our XML */
#include "g25_config.h" /* for converting old file formats to new */
#include "daemon_init.h"
#include "gm_scoreboard.h"
#include "ganglia_priv.h"

/* Specifies a single value metric callback */
#define CB_NOINDEX -1

/* When this gmond was started */
apr_time_t started;
/* My name */
char myname[APRMAXHOSTLEN+1];
/* The commandline options */
struct gengetopt_args_info args_info;
/* The configuration file */
cfg_t *config_file;
/* The debug level (in debug_msg.c) */
static int debug_level;
/* The global context */
apr_pool_t *global_context = NULL;
/* Deaf mode boolean */
int deaf;
/* Mute mode boolean */
int mute;
/* Cluster tag boolean */
int cluster_tag = 0;
/* This host's location */
char *host_location = NULL;
/* Boolean. Will this host received gexec requests? */
int gexec_on = 0;
/* This is tweakable by globals{max_udp_msg_len=...} */
int max_udp_message_len = 1472;
/* The default configuration for gmond.  Found in conf.c. */
extern char *default_gmond_configuration;
/* The number of seconds to hold "dead" hosts in the hosts hash */
int host_dmax = 0;
/* The amount of time between cleanups */
int cleanup_threshold = 300;
/* Time interval before send another metadata packet */
int send_metadata_interval = 0;
/* The directory where DSO modules are located */
char *module_dir = NULL;

/* The array for outgoing UDP message channels */
Ganglia_udp_send_channels udp_send_channels = NULL;

/* TODO: The array for outgoing TCP message channels (later) */
apr_array_header_t *tcp_send_array = NULL;

enum Ganglia_action_types {
  GANGLIA_ACCESS_DENY = 0,
  GANGLIA_ACCESS_ALLOW = 1
};
typedef enum Ganglia_action_types Ganglia_action_types;

/* This is the structure used for the access control lists */
struct Ganglia_access {
  apr_ipsubnet_t *ipsub;
  Ganglia_action_types action;
};
typedef struct Ganglia_access Ganglia_access;

struct Ganglia_acl {
  apr_array_header_t *access_array;
  Ganglia_action_types default_action;
};
typedef struct Ganglia_acl Ganglia_acl;

/* This is the channel definitions */
enum Ganglia_channel_types {
  TCP_ACCEPT_CHANNEL,
  UDP_RECV_CHANNEL
};
typedef enum Ganglia_channel_types Ganglia_channel_types;

struct Ganglia_channel {
  Ganglia_channel_types type;
  Ganglia_acl *acl;
  int timeout;
};
typedef struct Ganglia_channel Ganglia_channel;

/* This pollset holds the tcp_accept and udp_recv channels */
apr_pollset_t *listen_channels = NULL;

/* The hash to hold the hosts (key = host IP) */
apr_hash_t *hosts = NULL;

/* The "hosts" hash contains values of type "hostdata" */
struct Ganglia_host {
  /* Name of the host */
  char *hostname;
  /* The IP of this host */
  char *ip;
  /* The location of this host */
  char *location;
  /* Timestamp of when the remote host gmond started */
  unsigned int gmond_started;
  /* The pool used to malloc memory for this host */
  apr_pool_t *pool;
  /* A hash containing the full metric data from the host */
  apr_hash_t *metrics;
  /* A hash containing the last data update from the host */
  apr_hash_t *gmetrics;
  /* First heard from */
  apr_time_t first_heard_from;
  /* Last heard from */
  apr_time_t last_heard_from;
};
typedef struct Ganglia_host Ganglia_host;

/* This is the structure of the data save to each host->metric hash */
struct Ganglia_metadata {
  /* The pool used for allocating memory */
  apr_pool_t *pool;
  /* The name of the metric */
  char *name;
  union {
      /* The ganglia message */
      Ganglia_metadata_msg f_message;
      Ganglia_value_msg v_message;
  } message_u;
  /* Last heard from */
  apr_time_t last_heard_from;
};
typedef struct Ganglia_metadata Ganglia_metadata;

/* The hash to hold the metrics available on this platform */
apr_hash_t *metric_callbacks = NULL;

/* The "metrics" hash contains values of type "Ganglia_metric_callback" */
/* This is where libmetrics meets gmond */
struct Ganglia_metric_callback {
   char *name;          /* metric name */
   float value_threshold;/* the value threshold */
   char *title;         /* Altername metric name or short description */
   Ganglia_25metric *info;/* the information about this metric */
   metric_func_void cb; /* callback function (deprecated) */
   metric_func cbindexed; /* multi-metric callback function */
   g_val_t now;         /* the current value */
   g_val_t last;        /* the last value */
   Ganglia_value_msg msg;     /* the message to send */
   mmodule *modp;       /* dynamic module info struct */
   int multi_metric_index; /* index identifying which metric is wanted */
   apr_time_t metadata_last_sent; /* when the metadata was last sent */
};
typedef struct Ganglia_metric_callback Ganglia_metric_callback;

/* This is the structure of a collection group */
struct Ganglia_collection_group {
  apr_time_t next_collect;  /* When to collect next */
  apr_time_t next_send;     /* When to send next (tmax) */
  int once;
  int collect_every;
  int time_threshold;
  apr_array_header_t *metric_array;
};
typedef struct Ganglia_collection_group Ganglia_collection_group;

/* This is the array of collection groups that we are processing... */
apr_array_header_t *collection_groups = NULL;

mmodule *metric_modules = NULL;
extern int daemon_proc;       /* defined in error.c */

/* this is just a temporary function */
void
process_configuration_file(void)
{
  cfg_t *tmp;

  /* this is a global for now */
  config_file = (cfg_t*)Ganglia_gmond_config_create( args_info.conf_arg, !args_info.conf_given );

  /* Initialize a few variables */
  tmp = cfg_getsec( config_file, "globals");
  /* Get the maximum UDP message size */
  max_udp_message_len = cfg_getint( tmp, "max_udp_msg_len");
  /* Get the gexec status requested */
  gexec_on            = cfg_getbool(tmp, "gexec");
  /* Get the host dmax ... */
  host_dmax           = cfg_getint( tmp, "host_dmax");
  /* Get the cleanup threshold */
  cleanup_threshold   = cfg_getint( tmp, "cleanup_threshold");
  /* Get the send meta data packet interval */
  send_metadata_interval = cfg_getint( tmp, "send_metadata_interval");
  /* Get the DSO module dir */
  module_dir = cfg_getstr(tmp, "module_dir");

  /* Commandline for debug_level trumps configuration file behaviour ... */
  if (args_info.debug_given) 
    {
      debug_level = args_info.debug_arg;
    }
  else
    {
      debug_level = cfg_getint ( tmp, "debug_level");
    }
  set_debug_msg_level(debug_level);

}

static void
daemonize_if_necessary( char *argv[] )
{
  int should_daemonize;
  cfg_t *tmp;
  tmp = cfg_getsec( config_file, "globals");
  should_daemonize = cfg_getbool( tmp, "daemonize");

  /* Daemonize if needed */
  if(!args_info.foreground_flag && should_daemonize && !debug_level)
    {
      char *cwd;

      apr_filepath_get(&cwd, 0, global_context);
      apr_proc_detach(1);
      apr_filepath_set(cwd, global_context);

      /* enable errmsg logging to syslog */
      daemon_proc = 1;  
      openlog (argv[0], LOG_PID, LOG_DAEMON);
    }
}

static void
setuid_if_necessary( void )
{
  cfg_t *tmp;
  int setuid;
#ifndef CYGWIN
  char *user;
#endif

  tmp    = cfg_getsec( config_file, "globals");
  setuid = cfg_getbool( tmp, "setuid" );
  if(setuid)
    {
#ifdef CYGWIN
      fprintf(stderr,"Windows does not support setuid.\n");
#else
      user = cfg_getstr(tmp, "user" );
      become_a_nobody(user);
#endif
    }
}

static void
process_deaf_mute_mode( void )
{
  cfg_t *tmp = cfg_getsec( config_file, "globals");
  deaf =       cfg_getbool( tmp, "deaf");
  mute =       cfg_getbool( tmp, "mute");
  if(deaf && mute)
    {
      err_msg("Configured to run both deaf and mute. Nothing to do. Exiting.\n");
      exit(1);
    }
}

static Ganglia_acl *
Ganglia_acl_create ( cfg_t *channel, apr_pool_t *pool )
{
  apr_status_t status;
  Ganglia_acl *acl = NULL;
  cfg_t *acl_config;
  char *default_action;
  int num_access = 0;
  int i;

  if(!channel || !pool)
    {
      return acl;
    }


  acl_config = cfg_getsec(channel, "acl");
  if(!acl_config)
    {
      return acl;
    }

  /* Find out the number of access entries */
  num_access = cfg_size( acl_config, "access" );
  if(!num_access)
    {
      return acl;
    }

  /* Create a new ACL from the pool */
  acl = apr_pcalloc( pool, sizeof(Ganglia_acl));
  if(!acl)
    {
      err_msg("Unable to allocate memory for ACL. Exiting.\n");
      exit(1);
    }

  default_action = cfg_getstr( acl_config, "default");
  if(!apr_strnatcasecmp( default_action, "deny"))
    {
      acl->default_action = GANGLIA_ACCESS_DENY;
    }
  else if(!apr_strnatcasecmp( default_action, "allow"))
    {
      acl->default_action = GANGLIA_ACCESS_ALLOW;
    }
  else
    {
      err_msg("Invalid default ACL '%s'. Exiting.\n", default_action);
      exit(1);
    }

  /* Create an array to hold each of the access instructions */
  acl->access_array  = apr_array_make( pool, num_access, sizeof(Ganglia_acl *));
  if(!acl->access_array)
    {
      err_msg("Unable to malloc access array. Exiting.\n");
      exit(1);
    }
  for(i=0; i< num_access; i++)
    {
      cfg_t *access_config   = cfg_getnsec( acl_config, "access", i);
      Ganglia_access *access = apr_pcalloc( pool, sizeof(Ganglia_access));
      char *ip, *mask, *action;

      if(!access_config)
        {
          /* This shouldn't happen unless maybe acl is empty and
           * the safest thing to do it exit */
          err_msg("Unable to process ACLs. Exiting.\n");
          exit(1);
        }

      ip     = cfg_getstr( access_config, "ip");
      mask   = cfg_getstr( access_config, "mask");
      action = cfg_getstr( access_config, "action");
      if(!ip && !mask && !action)
        {
          err_msg("An access record requires an ip, mask and action. Exiting.\n");
          exit(1);
        }

      /* Process the action first */
      if(!apr_strnatcasecmp( action, "deny" ))
        {
          access->action = GANGLIA_ACCESS_DENY;
        }
      else if(!apr_strnatcasecmp( action, "allow"))
        {
          access->action = GANGLIA_ACCESS_ALLOW;
        }
      else
        {
          err_msg("ACL access entry has action '%s'. Must be deny|allow. Exiting.\n", action);
          exit(1);
        }  

      /* Create the subnet */
      access->ipsub = NULL;
      status = apr_ipsubnet_create( &(access->ipsub), ip, mask, pool);
      if(status != APR_SUCCESS)
        {
          err_msg("ACL access entry has invalid ip('%s')/mask('%s'). Exiting.\n", ip, mask);
          exit(1);
        }

      /* Save this access entry to the acl */
      *(Ganglia_access **)apr_array_push( acl->access_array ) = access;
    }
  return acl;
}


static int
Ganglia_acl_action( Ganglia_acl *acl, apr_sockaddr_t *addr )
{
  int i;

  if(!acl)
    {
      /* If no ACL is specified, we assume there is no access control */
      return GANGLIA_ACCESS_ALLOW; 
    }

  for(i=0; i< acl->access_array->nelts; i++)
    {
      Ganglia_access *access = ((Ganglia_access **)(acl->access_array->elts))[i];
      if(!apr_ipsubnet_test( access->ipsub, addr ))
        {
          /* no action will occur because addr is not in this subnet */
          continue;
        }
      else
        {
          return access->action;
        }
    }

  /* No matches in the access list so we return the default */
  return acl->default_action;
}

static int32_t
get_sock_family( char *family )
{
  if( strchr( family, '4' ))
    {
      return APR_INET;
    }
  else if( strchr( family, '6'))
    {
#if APR_INET6
      return APR_INET6;
#else
      err_msg("IPv6 is not supported on this host. Exiting.\n");
      exit(1);
#endif
    }

  err_msg("Unknown family '%s'. Try inet4|inet6. Exiting.\n", family);
  exit(1);
  /* shouldn't get here */
  return APR_UNSPEC;
}

static void
setup_listen_channels_pollset( void )
{
  apr_status_t status;
  int i;
  int num_udp_recv_channels   = cfg_size( config_file, "udp_recv_channel");
  int num_tcp_accept_channels = cfg_size( config_file, "tcp_accept_channel");
  int total_listen_channels   = num_udp_recv_channels + num_tcp_accept_channels;
  Ganglia_channel *channel;

  /* Create my incoming pollset */
  apr_pollset_create(&listen_channels, total_listen_channels, global_context, 0);

  /* Process all the udp_recv_channels */
  for(i = 0; i< num_udp_recv_channels; i++)
    {
      cfg_t *udp_recv_channel;
      char *mcast_join, *mcast_if, *bindaddr, *family;
      int port;
      apr_socket_t *socket = NULL;
      apr_pollfd_t socket_pollfd;
      apr_pool_t *pool = NULL;
      int32_t sock_family = APR_INET;

      udp_recv_channel = cfg_getnsec( config_file, "udp_recv_channel", i);
      mcast_join     = cfg_getstr( udp_recv_channel, "mcast_join" );
      mcast_if       = cfg_getstr( udp_recv_channel, "mcast_if" );
      port           = cfg_getint( udp_recv_channel, "port");
      bindaddr       = cfg_getstr( udp_recv_channel, "bind");
      family         = cfg_getstr( udp_recv_channel, "family");

      debug_msg("udp_recv_channel mcast_join=%s mcast_if=%s port=%d bind=%s",
                mcast_join? mcast_join:"NULL", 
                mcast_if? mcast_if:"NULL", port,
                bindaddr? bindaddr: "NULL");

      /* Create a sub-pool for this channel */
      apr_pool_create(&pool, global_context);

      sock_family = get_sock_family(family);

      if( mcast_join )
        {
          /* Listen on the specified multicast channel */
          socket = create_mcast_server(pool, sock_family, mcast_join, port, bindaddr, mcast_if );
          if(!socket)
            {
              err_msg("Error creating multicast server mcast_join=%s port=%d mcast_if=%s family='%s'. Exiting.\n",
                  mcast_join? mcast_join: "NULL", port, mcast_if? mcast_if:"NULL",family);
              exit(1);
            }
        }
      else
        {
          /* Create a UDP server */
          socket = create_udp_server( pool, sock_family, port, bindaddr );
          if(!socket)
            {
              err_msg("Error creating UDP server on port %d bind=%s. Exiting.\n",
                  port, bindaddr? bindaddr: "unspecified");
              exit(1);
            }
        }

      /* Build the socket poll file descriptor structure */
      socket_pollfd.desc_type   = APR_POLL_SOCKET;
      socket_pollfd.reqevents   = APR_POLLIN;
      socket_pollfd.desc.s      = socket;

      channel = apr_pcalloc( pool, sizeof(Ganglia_channel));
      if(!channel)
        {
          err_msg("Unable to malloc memory for channel.  Exiting. \n");
          exit(1);
        }

      /* Mark this channel as a udp_recv_channel */
      channel->type = UDP_RECV_CHANNEL;

      /* Make sure this socket never blocks */
      channel->timeout = 0;
      apr_socket_timeout_set( socket, channel->timeout);

      /* Save the ACL information */
      channel->acl = Ganglia_acl_create ( udp_recv_channel, pool );

      /* Save the pointer to this socket specific data */
      socket_pollfd.client_data = channel;

      /* Add the socket to the pollset */
      status = apr_pollset_add(listen_channels, &socket_pollfd);
      if(status != APR_SUCCESS)
        {
          err_msg("Failed to add socket to pollset. Exiting.\n");
          exit(1);
        }
    }

  /* Process all the tcp_accept_channels */ 
  for(i=0; i< num_tcp_accept_channels; i++)
    {
      cfg_t *tcp_accept_channel = cfg_getnsec( config_file, "tcp_accept_channel", i);
      char *bindaddr, *interface, *family;
      int port, timeout;
      apr_socket_t *socket = NULL;
      apr_pollfd_t socket_pollfd;
      apr_pool_t *pool = NULL;
      int32_t sock_family;

      port           = cfg_getint( tcp_accept_channel, "port");
      bindaddr       = cfg_getstr( tcp_accept_channel, "bind");
      interface      = cfg_getstr( tcp_accept_channel, "interface"); 
      timeout        = cfg_getint( tcp_accept_channel, "timeout");
      family         = cfg_getstr( tcp_accept_channel, "family");

      debug_msg("tcp_accept_channel bind=%s port=%d",
                bindaddr? bindaddr: "NULL", port);

      /* Create a subpool context */
      apr_pool_create(&pool, global_context);

      sock_family = get_sock_family(family);

      /* Create the socket for the channel, blocking w/timeout */
      socket = create_tcp_server(pool, sock_family, port, bindaddr, 
                                 interface, 1);
      if(!socket)
        {
          err_msg("Unable to create tcp_accept_channel. Exiting.\n");
          exit(1);
        }

      /* Build the socket poll file descriptor structure */
      socket_pollfd.desc_type   = APR_POLL_SOCKET;
      socket_pollfd.reqevents   = APR_POLLIN;
      socket_pollfd.desc.s      = socket;

      channel = apr_pcalloc( pool, sizeof(Ganglia_channel));
      if(!channel)
        {
          err_msg("Unable to malloc data for channel. Exiting.\n");
          exit(1);
        }
      
      channel->type = TCP_ACCEPT_CHANNEL;

      /* Save the timeout for this socket */
      channel->timeout = timeout;

      /* Save the ACL information */
      channel->acl = Ganglia_acl_create( tcp_accept_channel, pool ); 

      /* Save the pointer to this channel data */
      socket_pollfd.client_data = channel;

      /* Add the socket to the pollset */
      status = apr_pollset_add(listen_channels, &socket_pollfd);
      if(status != APR_SUCCESS)
         {
            err_msg("Failed to add socket to pollset. Exiting.\n");
            exit(1);
         }
    }
}

static Ganglia_host *
Ganglia_host_get( char *remIP, apr_sockaddr_t *sa, Ganglia_metric_id *metric_id)
{
  apr_status_t status;
  Ganglia_host *hostdata;
  apr_pool_t *pool;
  char *hostname = NULL;
  char *remoteip = remIP;
  char *buff = NULL;
 
  if(!remoteip || !sa)
    {
      return NULL;
    }

  /* split out the spoofed host name and ip address so that it can
   * be used to get the spoofed host. */
  if(metric_id && metric_id->spoof)
    {
      char *spoofName;
      char *spoofIP;
      int spoof_info_len;

      spoof_info_len = strlen(metric_id->host);
      buff = malloc(spoof_info_len+1);
      strcpy(buff,metric_id->host);
      spoofIP = buff;
      if( !(spoofName = strchr(buff+1,':')) ){
          err_msg("Incorrect format for spoof argument. exiting.\n");
          if (buff) free(buff);
          return NULL;
      }
      *spoofName = 0;
      spoofName++;
      if(!(*spoofName)){
          err_msg("Incorrect format for spoof argument. exiting.\n");
          if (buff) free(buff);
          return NULL;
      }
      debug_msg(" spoofName: %s    spoofIP: %s \n",spoofName,spoofIP);

      hostname = spoofName;
      remoteip = spoofIP;
    }

  hostdata =  (Ganglia_host *)apr_hash_get( hosts, remoteip, APR_HASH_KEY_STRING );
  if(!hostdata)
    {
      /* Lookup the hostname or use the proxy information if available */
      if( !hostname )
        {
          /* We'll use the resolver to find the hostname */
          status = apr_getnameinfo(&hostname, sa, 0);
          if(status != APR_SUCCESS)
            {
              /* If hostname lookup fails.. set it to the ip */
              hostname = remoteip;
            }
        }

      /* This is the first time we've heard from this host.. create a new pool */
      status = apr_pool_create( &pool, global_context );
      if(status != APR_SUCCESS)
        {
          if (buff) free(buff);
          return NULL;
        }

      /* Malloc the hostdata_t from the new pool */
      hostdata = apr_pcalloc( pool, sizeof( Ganglia_host ));
      if(!hostdata)
        {
          if (buff) free(buff);
          apr_pool_destroy(pool);
          return NULL;
        }

      /* Save the pool address for later.. freeing this pool free everthing
       * for this particular host */
      hostdata->pool = pool;

      /* Save the hostname */
      hostdata->hostname = apr_pstrdup( pool, hostname );

      /* Dup the remoteip (it will be freed later) */
      hostdata->ip =  apr_pstrdup( pool, remoteip);

      /* We don't know the location yet */
      hostdata->location = NULL;

      /* Set the timestamps */
      hostdata->first_heard_from = hostdata->last_heard_from = apr_time_now();

      /* Create a hash for the metric data */
      hostdata->metrics = apr_hash_make( pool );
      if(!hostdata->metrics)
        {
          if (buff) free(buff);
          apr_pool_destroy(pool);
          return NULL;
        }

      /* Create a hash for the gmetric data */
      hostdata->gmetrics = apr_hash_make( pool );
      if(!hostdata->gmetrics)
        {
          if (buff) free(buff);
          apr_pool_destroy(pool);
          return NULL;
        }

      /* Save this host data to the "hosts" hash */
      apr_hash_set( hosts, hostdata->ip, APR_HASH_KEY_STRING, hostdata); 
    }
  else
    {
      /* We already have this host in our "hosts" hash update timestamp */
      hostdata->last_heard_from = apr_time_now();
    }

  if (buff) free(buff);
  return hostdata;
}

static void
Ganglia_update_vidals( Ganglia_host *host, Ganglia_value_msg *vmsg)
{
    if(vmsg && !strcasecmp("location", vmsg->Ganglia_value_msg_u.gstr.metric_id.name))
      {
        /* We have to manage this memory here because.. returning NULL
         * will not cause Ganglia_message_save to be run.  Maybe this
         * could be done better later i.e should these metrics be
         * in the host->metrics list instead of the host structure? */
        if(host->location)
          {
            /* Free old location */
            free(host->location);
          }
        /* Save new location */
        host->location = strdup(vmsg->Ganglia_value_msg_u.gstr.str);
        debug_msg("Got a location message %s\n", host->location);
        /* Processing is finished */
        return;
      }
    if(vmsg && !vmsg->Ganglia_value_msg_u.gstr.metric_id.spoof && 
       !strcasecmp("heartbeat", vmsg->Ganglia_value_msg_u.gstr.metric_id.name))
      {
        /* nothing more needs to be done. we handled the timestamps above. */
        host->gmond_started = vmsg->Ganglia_value_msg_u.gu_int.ui;
        debug_msg("Got a heartbeat message %d\n", host->gmond_started);
        /* Processing is finished */
        return;
      }
    if(vmsg && vmsg->Ganglia_value_msg_u.gstr.metric_id.spoof)
      {
        /* nothing more needs to be done. we handled the timestamps above. */
        debug_msg("Got a spoof message %s %s\n", vmsg->Ganglia_value_msg_u.gstr.metric_id.name,
                  vmsg->Ganglia_value_msg_u.gstr.str);
        /* Processing is finished */
        return;
      }
}

static void
Ganglia_metadata_check(Ganglia_host *host, Ganglia_value_msg *vmsg )
{
    char *metric_name = vmsg->Ganglia_value_msg_u.gstr.metric_id.name;
    Ganglia_metadata *metric = 
        (Ganglia_metadata *)apr_hash_get(host->metrics, metric_name, APR_HASH_KEY_STRING);
    
    if(!metric)
      {
        int len;
        XDR x;
        char msgbuf[GANGLIA_MAX_MESSAGE_LEN];
        Ganglia_metadata_msg msg;

        msg.id = gmetadata_request;
        msg.Ganglia_metadata_msg_u.grequest.metric_id.host = host->hostname;
        msg.Ganglia_metadata_msg_u.grequest.metric_id.name = metric_name;
        msg.Ganglia_metadata_msg_u.grequest.metric_id.spoof = FALSE;

        debug_msg("sending metadata request flag for metric: %s host: %s", metric_name, host->hostname);
        ganglia_scoreboard_inc(PKTS_SENT_REQUEST);
        ganglia_scoreboard_inc(PKTS_SENT_ALL);

        /* Send the message */
        xdrmem_create(&x, msgbuf, GANGLIA_MAX_MESSAGE_LEN, XDR_ENCODE);
        if(!xdr_Ganglia_metadata_msg(&x, &msg))
          {
            return;
          }
        len = xdr_getpos(&x); 
        /* Send the encoded data along...*/
        Ganglia_udp_send_message( udp_send_channels, msgbuf, len);
      }

    return;
}

#if 0
static void
Ganglia_metadata_free( Ganglia_metadata *metric )
{
  if(!metric)
    return;
  apr_pool_destroy( metric->pool );
}
#endif

static void
Ganglia_metadata_save( Ganglia_host *host, Ganglia_metadata_msg *message )
{
    /* Search for the Ganglia_metadata in the Ganglia_host */
    Ganglia_metadata *metric = 
        (Ganglia_metadata *)apr_hash_get(host->metrics, 
                                         message->Ganglia_metadata_msg_u.gfull.metric_id.name,
                                         APR_HASH_KEY_STRING);
    if(!host || !message)
        return;
    
    if(metric)
      {
        apr_pool_clear(metric->pool);
      }
    else
      {
        apr_status_t status;

        /* This is a new metric sent from this host... allocate space for this data */
        
        /* Allocate a new metric from this context */
        metric = apr_pcalloc(host->pool, sizeof(Ganglia_metadata));
        if(!metric)
            return;

        /* Create the context for this metric */
        status = apr_pool_create(&(metric->pool), host->pool);
        if(status != APR_SUCCESS)
            return;

        /* NOTE: In order for gmetric messages to be properly saved to the hash table
        * based on the name of the gmetric sent...we need to strdup() the name
        * since the xdr_free below will blast the value later (along with the other
        * allocated structure elements).  This is only performed once at gmetric creation */
        metric->name = apr_pstrdup(host->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
        debug_msg("***Allocating metadata packet for host--%s-- and metric --%s-- ****\n", host->hostname, metric->name);
    }
    
    if(metric)
      {
        Ganglia_metadata_msg *fmessage = &(metric->message_u.f_message);
        u_int i,mlen = message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len;
        
        fmessage->id = message->id;
        fmessage->Ganglia_metadata_msg_u.gfull.metric_id.host = 
            apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.host);
        fmessage->Ganglia_metadata_msg_u.gfull.metric_id.name = 
            apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
        fmessage->Ganglia_metadata_msg_u.gfull.metric_id.spoof = 
            message->Ganglia_metadata_msg_u.gfull.metric_id.spoof;
        fmessage->Ganglia_metadata_msg_u.gfull.metric.type = 
            apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.type);
        fmessage->Ganglia_metadata_msg_u.gfull.metric.name = 
            apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.name);
        fmessage->Ganglia_metadata_msg_u.gfull.metric.units = 
            apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.units);
        fmessage->Ganglia_metadata_msg_u.gfull.metric.slope = 
            message->Ganglia_metadata_msg_u.gfull.metric.slope;
        fmessage->Ganglia_metadata_msg_u.gfull.metric.tmax = 
            message->Ganglia_metadata_msg_u.gfull.metric.tmax;
        fmessage->Ganglia_metadata_msg_u.gfull.metric.dmax = 
            message->Ganglia_metadata_msg_u.gfull.metric.dmax;
        fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len = mlen;
        
        fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val = 
            apr_pcalloc(metric->pool, sizeof(Ganglia_extra_data)*mlen);
        for (i = 0; i < mlen; i++) 
          {
            fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].name = 
                apr_pstrdup(metric->pool, 
                            message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].name);
            fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].data = 
                apr_pstrdup(metric->pool, 
                            message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].data);
          }
    
        metric->last_heard_from = apr_time_now();
    
        /* Save the full metric */
        apr_hash_set(host->metrics, metric->name, APR_HASH_KEY_STRING, metric);
        debug_msg("saving metadata for metric: %s host: %s", metric->name, host->hostname);
      }
}

static void
Ganglia_metadata_request( Ganglia_host *host, Ganglia_metadata_msg *message )
{
  char *name = message->Ganglia_metadata_msg_u.grequest.metric_id.name;
  Ganglia_metric_callback *metric_cb =  (Ganglia_metric_callback *)
                apr_hash_get( metric_callbacks, name, APR_HASH_KEY_STRING );
  
  if(!host || !message)
    return;

  if (metric_cb) 
      metric_cb->metadata_last_sent = 0;
  debug_msg("setting metadata request flag for metric: %s host: %s", name, host->hostname);
}

static void
Ganglia_value_save( Ganglia_host *host, Ganglia_value_msg *message )
{
    /* Search for the Ganglia_metric in the Ganglia_host */
    Ganglia_metadata *metric = 
        (Ganglia_metadata *)apr_hash_get( host->gmetrics,
                                        message->Ganglia_value_msg_u.gstr.metric_id.name,
                                        APR_HASH_KEY_STRING);
  if(!host || !message)
    return;

  if(metric)
    {
      apr_pool_clear(metric->pool);
    }
  else
    {
      apr_status_t status;

      /* This is a new metric sent from this host... allocate space for this data */

      /* Allocate a new metric from this context */
      metric = apr_pcalloc(host->pool, sizeof(Ganglia_metadata));
      if(!metric)
        {
          /* no memory */
          return;
        }

      /* Create the context for this metric */
      status = apr_pool_create(&(metric->pool), host->pool);
      if(status != APR_SUCCESS)
          return;

      /* NOTE: In order for gmetric messages to be properly saved to the hash table
       * based on the name of the gmetric sent...we need to strdup() the name
       * since the xdr_free below will blast the value later (along with the other
       * allocated structure elements).  This is only performed once at gmetric creation */
      metric->name = apr_pstrdup(host->pool, message->Ganglia_value_msg_u.gstr.metric_id.name );
      debug_msg("***Allocating value packet for host--%s-- and metric --%s-- ****\n", host->hostname, metric->name);
    }


  if(metric)
    {
      Ganglia_value_msg *vmessage = &(metric->message_u.v_message);

      vmessage->id = message->id;
      vmessage->Ganglia_value_msg_u.gstr.metric_id.host = 
          apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.host);
      vmessage->Ganglia_value_msg_u.gstr.metric_id.name = 
          apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.name);
      vmessage->Ganglia_value_msg_u.gstr.metric_id.spoof = 
          message->Ganglia_value_msg_u.gstr.metric_id.spoof;
      vmessage->Ganglia_value_msg_u.gstr.fmt = 
          apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.fmt);

      switch(message->id)
        {
        case gmetric_string:
          vmessage->Ganglia_value_msg_u.gstr.str = 
              apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.str);
          break;
        case gmetric_ushort:
          vmessage->Ganglia_value_msg_u.gu_short.us = 
              message->Ganglia_value_msg_u.gu_short.us;
          break;
        case gmetric_short:
          vmessage->Ganglia_value_msg_u.gs_short.ss = 
              message->Ganglia_value_msg_u.gs_short.ss;
          break;
        case gmetric_uint:
          vmessage->Ganglia_value_msg_u.gu_int.ui = 
              message->Ganglia_value_msg_u.gu_int.ui;
          break;
        case gmetric_int:
          vmessage->Ganglia_value_msg_u.gs_int.si = 
              message->Ganglia_value_msg_u.gs_int.si;
          break;
        case gmetric_float:
          vmessage->Ganglia_value_msg_u.gf.f = 
              message->Ganglia_value_msg_u.gf.f;
          break;
        case gmetric_double:
          vmessage->Ganglia_value_msg_u.gd.d = 
              message->Ganglia_value_msg_u.gd.d;
          break;
        default:
          break;
        }

      metric->last_heard_from = apr_time_now();

      /* Save the last update metric */
      apr_hash_set(host->gmetrics, metric->name, APR_HASH_KEY_STRING, metric);
    }
}


static void
process_udp_recv_channel(const apr_pollfd_t *desc, apr_time_t now)
{
  apr_status_t status;
  apr_socket_t *socket;
  apr_sockaddr_t *remotesa = NULL;
  char  remoteip[256];
  char buf[max_udp_message_len];
  apr_size_t len = max_udp_message_len;
  Ganglia_channel *channel;
  XDR x;
  Ganglia_metadata_msg fmsg;
  Ganglia_value_msg vmsg;
  Ganglia_host *hostdata = NULL;
  apr_pool_t *p = NULL;
  Ganglia_msg_formats id;
  bool_t ret;

  socket         = desc->desc.s;
  /* We could also use the apr_socket_data_get/set() functions
   * to have per socket user data .. see APR docs */
  channel       = desc->client_data;

  /* We need to create a copy of the local sockaddr so that the
     recvfrom call has a place holder to put the remote information.
     Getting the remote sockaddr might not work since a SOCK_DGRAM
     type socket is connectionless. */
  apr_pool_create(&p, global_context);
  status = apr_socket_addr_get(&remotesa, APR_LOCAL, socket);
  status = apr_sockaddr_info_get(&remotesa, NULL, remotesa->family, remotesa->port, 0, p);

  /* Grab the data */
  status = apr_socket_recvfrom(remotesa, socket, 0, buf, &len);
  if(status != APR_SUCCESS)
    {
      apr_pool_destroy(p);
      return;
    }  

  /* This function is in ./lib/apr_net.c and not APR. The
   * APR counterpart is apr_sockaddr_ip_get() but we don't 
   * want to malloc memory evertime we call this */
  apr_sockaddr_ip_buffer_get(remoteip, 256, remotesa);

  /* Check the ACL */
  if(Ganglia_acl_action( channel->acl, remotesa) != GANGLIA_ACCESS_ALLOW)
    {
      apr_pool_destroy(p);
      return;
    }

  ganglia_scoreboard_inc(PKTS_RECVD_ALL);

  /* Create the XDR receive stream */
  xdrmem_create(&x, buf, max_udp_message_len, XDR_DECODE);

  /* Flush the data... */
  memset( &fmsg, 0, sizeof(Ganglia_metadata_msg));
  memset( &vmsg, 0, sizeof(Ganglia_value_msg));

  /* Figure out what kind of message it we got */
  xdr_Ganglia_msg_formats(&x, &id);
  xdr_setpos (&x, 0);

  /* Read the gangliaMessage from the stream */
  /* Save the message from this particular host */
  switch (id) 
    {
    case gmetadata_request:
      ganglia_scoreboard_inc(PKTS_RECVD_REQUEST);
      ret = xdr_Ganglia_metadata_msg(&x, &fmsg);
      if (ret)
          hostdata = Ganglia_host_get(remoteip, remotesa, &(fmsg.Ganglia_metadata_msg_u.grequest.metric_id));
      if(!ret || !hostdata)
        {
          ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
          /* Processing of this message is finished ... */
          xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
          break;
        }
      debug_msg("Processing a metric metadata request message from %s", hostdata->hostname);
      Ganglia_metadata_request(hostdata, &fmsg);
      xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
      break;
    case gmetadata_full:
      ganglia_scoreboard_inc(PKTS_RECVD_METADATA);
      ret = xdr_Ganglia_metadata_msg(&x, &fmsg);
      if (ret)
          hostdata = Ganglia_host_get(remoteip, remotesa, &(fmsg.Ganglia_metadata_msg_u.gfull.metric_id));
      if(!ret || !hostdata)
        {
          ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
          /* Processing of this message is finished ... */
          xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
          break;
        }
      debug_msg("Processing a metric metadata message from %s", hostdata->hostname);
      Ganglia_metadata_save( hostdata, &fmsg );
      xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
      break;
    case gmetric_ushort:
    case gmetric_short:
    case gmetric_int:
    case gmetric_uint:
    case gmetric_string:
    case gmetric_float:
    case gmetric_double:
      ganglia_scoreboard_inc(PKTS_RECVD_VALUE);
      ret = xdr_Ganglia_value_msg(&x, &vmsg);
      if (ret)
          hostdata = Ganglia_host_get(remoteip, remotesa, &(vmsg.Ganglia_value_msg_u.gstr.metric_id));
      if(!ret || !hostdata)
        {
          ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
          /* Processing of this message is finished ... */
          xdr_free((xdrproc_t)xdr_Ganglia_value_msg, (char *)&vmsg);
          break;
        }
      debug_msg("Processing a metric value message from %s", hostdata->hostname);
      Ganglia_value_save(hostdata, &vmsg);
      Ganglia_update_vidals(hostdata, &vmsg);
      Ganglia_metadata_check(hostdata, &vmsg);
      xdr_free((xdrproc_t)xdr_Ganglia_value_msg, (char *)&vmsg);
      break;
    default:
      ganglia_scoreboard_inc(PKTS_RECVD_IGNORED);
      break;
  }

  apr_pool_destroy(p);

  return;
}

static apr_status_t
print_xml_header( apr_socket_t *client )
{
  apr_status_t status;
  apr_size_t len = strlen(DTD);
  char gangliaxml[128];
  char clusterxml[1024];
  static int clusterinit = 0;
  static char *name = NULL;
  static char *owner = NULL;
  static char *latlong = NULL;
  static char *url = NULL;
  apr_time_t now = apr_time_now();

  status = apr_socket_send( client, DTD, &len );
  if(status != APR_SUCCESS)
    return status;

  len = apr_snprintf( gangliaxml, 128, "<GANGLIA_XML VERSION=\"%s\" SOURCE=\"gmond\">\n",
                      VERSION);
  status = apr_socket_send( client, gangliaxml, &len);
  if(status != APR_SUCCESS)
    return status;

  if(!clusterinit)
    {
      /* We only run this on the first connection we process */
      cfg_t *cluster = cfg_getsec(config_file, "cluster");
      if(cluster)
        {
          name    = cfg_getstr( cluster, "name" );
          owner   = cfg_getstr( cluster, "owner" );
          latlong = cfg_getstr( cluster, "latlong" );
          url     = cfg_getstr( cluster, "url" );
          if(name || owner || latlong || url)
            {
              cluster_tag =1;
            }
        }
      clusterinit = 1;
    }

  if(cluster_tag)
    {
      len = apr_snprintf( clusterxml, 1024, 
        "<CLUSTER NAME=\"%s\" LOCALTIME=\"%d\" OWNER=\"%s\" LATLONG=\"%s\" URL=\"%s\">\n", 
                  name?name:"unspecified", 
                  (int)(now / APR_USEC_PER_SEC),
                  owner?owner:"unspecified", 
                  latlong?latlong:"unspecified",
                  url?url:"unspecified");

      return apr_socket_send( client, clusterxml, &len);
    }

  return APR_SUCCESS;
}

static apr_status_t
print_xml_footer( apr_socket_t *client )
{
  apr_status_t status;
  apr_size_t len; 
  if(cluster_tag)
    {
      len = 11;
      status = apr_socket_send(client, "</CLUSTER>\n", &len);
      if(status != APR_SUCCESS)
        {
          return status;
        }
    }
  len = 15;
  return apr_socket_send( client, "</GANGLIA_XML>\n", &len);
}

static apr_status_t
print_host_start( apr_socket_t *client, Ganglia_host *hostinfo)
{
  apr_size_t len;
  char hostxml[1024]; /* for <HOST></HOST> */
  apr_time_t now = apr_time_now();
  int tn = (now - hostinfo->last_heard_from) / APR_USEC_PER_SEC;

  len = apr_snprintf(hostxml, 1024, 
           "<HOST NAME=\"%s\" IP=\"%s\" REPORTED=\"%d\" TN=\"%d\" TMAX=\"%d\" DMAX=\"%d\" LOCATION=\"%s\" GMOND_STARTED=\"%d\">\n",
                     hostinfo->hostname, 
                     hostinfo->ip, 
                     (int)(hostinfo->last_heard_from / APR_USEC_PER_SEC),
                     tn,
                     20, /*tmax for now is always 20 */
                     host_dmax,
                     hostinfo->location? hostinfo->location: "unspecified", 
                     hostinfo->gmond_started);

  return apr_socket_send(client, hostxml, &len);
}

/* NOT THREAD SAFE */
static char *
host_metric_type( Ganglia_value_types type)
{
  switch(type)
    {
    case GANGLIA_VALUE_UNKNOWN:
      return "unknown";
    case GANGLIA_VALUE_STRING:
      return "string";
    case GANGLIA_VALUE_UNSIGNED_SHORT:
      return "uint16";
    case GANGLIA_VALUE_SHORT:
      return "int16";
    case GANGLIA_VALUE_UNSIGNED_INT:
      return "uint32";
    case GANGLIA_VALUE_INT:
      return "int32";
    case GANGLIA_VALUE_FLOAT:
      return "float";
    case GANGLIA_VALUE_DOUBLE:
      return "double";
    }
  return "undef";
}

/* NOT THREAD SAFE */
static char *
host_metric_value( Ganglia_25metric *metric, Ganglia_value_msg *message )
{
  static char value[1024];
  if(!metric||!message)
    {
      return "unknown";
    }

  switch(metric->type)
    {
    case GANGLIA_VALUE_UNKNOWN:
      return "unknown";
    case GANGLIA_VALUE_STRING:
      return message->Ganglia_value_msg_u.gstr.str;
    case GANGLIA_VALUE_UNSIGNED_SHORT:
      apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gu_short.us);
      return value;
    case GANGLIA_VALUE_SHORT:
      /* For right now.. there are no metrics which are signed shorts... use u_short */
      apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gs_short.ss);
      return value;
    case GANGLIA_VALUE_UNSIGNED_INT:
      apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gu_int.ui);
      return value;
    case GANGLIA_VALUE_INT:
      /* For right now.. there are no metric which are signed ints... use u_int */
      apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gs_int.si);
      return value;
    case GANGLIA_VALUE_FLOAT:
      apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gf.f);
      return value;
    case GANGLIA_VALUE_DOUBLE:
      apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gd.d);
      return value;
    }

  return "unknown";
}

static char *
gmetric_value_to_str(Ganglia_value_msg *message)
{
  static char value[1024];
  if(!message)
    {
      return "unknown";
    }

  switch(message->id)
    {
    case gmetric_string:
      return message->Ganglia_value_msg_u.gstr.str;
    case gmetric_ushort:
      apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gu_short.fmt, message->Ganglia_value_msg_u.gu_short.us);
      return value;
    case gmetric_short:
      /* For right now.. there are no metrics which are signed shorts... use u_short */
      apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gs_short.fmt, message->Ganglia_value_msg_u.gs_short.ss);
      return value;
    case gmetric_uint:
      apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gu_int.fmt, message->Ganglia_value_msg_u.gu_int.ui);
      return value;
    case gmetric_int:
      /* For right now.. there are no metric which are signed ints... use u_int */
      apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gs_int.fmt, message->Ganglia_value_msg_u.gs_int.si);
      return value;
    case gmetric_float:
      apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gf.fmt, message->Ganglia_value_msg_u.gf.f);
      return value;
    case gmetric_double:
      apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gd.fmt, message->Ganglia_value_msg_u.gd.d);
      return value;
    default:
      return "unknown";
    }

  return "unknown";
}

static apr_status_t
print_host_metric( apr_socket_t *client, Ganglia_metadata *data, Ganglia_metadata *val, apr_time_t now )
{
  char metricxml[1024];
  apr_size_t len;
  apr_status_t ret;

  if (!data || !val)
      return APR_SUCCESS;
  if (!strcasecmp(data->name, "heartbeat") || !strcasecmp(data->name, "location")) 
      return APR_SUCCESS;
  
  len = apr_snprintf(metricxml, 1024,
          "<METRIC NAME=\"%s\" VAL=\"%s\" TYPE=\"%s\" UNITS=\"%s\" TN=\"%d\" TMAX=\"%d\" DMAX=\"0\" SLOPE=\"%s\">\n",
              data->name,
              gmetric_value_to_str(&(val->message_u.v_message)),
              data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.type,
              data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.units,
              (int)((now - val->last_heard_from) / APR_USEC_PER_SEC),
              data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.tmax,
              slope_to_cstr(data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.slope));

  ret = apr_socket_send(client, metricxml, &len);
  if (ret == APR_SUCCESS) 
    {
      int extra_len = data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len;
      len = apr_snprintf(metricxml, 1024, "<EXTRA_DATA>\n");
      apr_socket_send(client, metricxml, &len);
      for (; extra_len > 0; extra_len--) 
        {
          len = apr_snprintf(metricxml, 1024, "<EXTRA_ELEMENT NAME=\"%s\" VAL=\"%s\"/>\n", 
                 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[extra_len-1].name,
                 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[extra_len-1].data);
          apr_socket_send(client, metricxml, &len);
        }
        len = apr_snprintf(metricxml, 1024, "</EXTRA_DATA>\n");
        apr_socket_send(client, metricxml, &len);
    }
  /* Send the closing tag */
  len = apr_snprintf(metricxml, 1024, "</METRIC>\n");

  return apr_socket_send(client, metricxml, &len);
}

static apr_status_t
print_host_end( apr_socket_t *client)
{
  apr_size_t len = 8;
  return apr_socket_send(client, "</HOST>\n", &len); 
}

static void
process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
{
  apr_status_t status;
  apr_hash_index_t *hi, *metric_hi;
  void *val;
  apr_socket_t *client, *server;
  apr_sockaddr_t *remotesa = NULL;
  char  remoteip[256];
  apr_pool_t *client_context = NULL;
  Ganglia_channel *channel;

  server         = desc->desc.s;
  /* We could also use the apr_socket_data_get/set() functions
   * to have per socket user data .. see APR docs */
  channel        = desc->client_data;

  /* Create a context for the client connection */
  apr_pool_create(&client_context, global_context);

  /* Accept the connection */
  status = apr_socket_accept(&client, server, client_context);
  if(status != APR_SUCCESS)
    {
      goto close_accept_socket;
    }

  /* Set the timeout for writing to the client */
  apr_socket_timeout_set( client, channel->timeout);

  apr_socket_addr_get(&remotesa, APR_REMOTE, client);
  /* This function is in ./lib/apr_net.c and not APR. The
   * APR counterpart is apr_sockaddr_ip_get() but we don't 
   * want to malloc memory evertime we call this */
  apr_sockaddr_ip_buffer_get(remoteip, 256, remotesa);

  /* Check the ACL */
  if(Ganglia_acl_action( channel->acl, remotesa ) != GANGLIA_ACCESS_ALLOW)
    goto close_accept_socket;

  /* Print the DTD, GANGLIA_XML and CLUSTER tags */
  status = print_xml_header(client);
  if(status != APR_SUCCESS)
    goto close_accept_socket;

  /* Walk the host hash */
  for(hi = apr_hash_first(client_context, hosts);
      hi;
      hi = apr_hash_next(hi))
    {
      apr_hash_this(hi, NULL, NULL, &val);
      status = print_host_start(client, (Ganglia_host *)val);
      if(status != APR_SUCCESS)
        {
          goto close_accept_socket;
        }

      /* Send the metric info for this particular host */
      for(metric_hi = apr_hash_first(client_context, ((Ganglia_host *)val)->metrics);
          metric_hi; metric_hi = apr_hash_next(metric_hi))
        {
          void *metric, *mval;
          apr_hash_this(metric_hi, NULL, NULL, &metric);

          mval = apr_hash_get(((Ganglia_host *)val)->gmetrics, ((Ganglia_metadata*)metric)->name, APR_HASH_KEY_STRING);

          /* Print each of the metrics for a host ... */
          if(print_host_metric(client, metric, mval, now) != APR_SUCCESS)
            {
              goto close_accept_socket;
            }
        }

      /* Close the host tag */
      status = print_host_end(client);
      if(status != APR_SUCCESS)
        {
          goto close_accept_socket;
        }
    }

  /* Close the CLUSTER and GANGLIA_XML tags */
  print_xml_footer(client);

  /* Close down the accepted socket */
close_accept_socket:
  apr_socket_shutdown(client, APR_SHUTDOWN_READ);
  apr_socket_close(client);
  apr_pool_destroy(client_context);
}


static void
poll_listen_channels( apr_interval_time_t timeout, apr_time_t now)
{
  apr_status_t status;
  const apr_pollfd_t *descs = NULL;
  apr_int32_t num = 0;
  apr_int32_t i;

  /* Poll for incoming data */
  status = apr_pollset_poll(listen_channels, timeout, &num, &descs);
  if(status != APR_SUCCESS)
    return;

  for(i = 0; i< num ; i++)
    {
      Ganglia_channel *channel = descs[i].client_data;
      switch( channel->type )
        {
        case UDP_RECV_CHANNEL:
          process_udp_recv_channel(descs+i, now); 
          break;
        case TCP_ACCEPT_CHANNEL:
          process_tcp_accept_channel(descs+i, now);
          break;
        default:
          continue;
        }
    }
}

static int
tcp_send_message( char *buf, int len )
{
  /* Mirror of UDP send message for TCP channels */
  return 0;
}

static int
send_message( char *buf, int len )
{
  return Ganglia_udp_send_message(udp_send_channels, buf, len ) + tcp_send_message( buf, len );
}

static Ganglia_metric_callback *
Ganglia_metric_cb_define(char *name, metric_func cb, int index, mmodule *modp)
{
  Ganglia_metric_callback *metric = apr_pcalloc( global_context, sizeof(Ganglia_metric_callback));
  if(!metric)
    return NULL;

  metric->name = apr_pstrdup( global_context, name );
  if(!metric->name)
    return NULL;

  /* index is used to determine which metric to gather for multi-metric
     callback functions.  This is to support metric modules or handlers
     that have the ability to gather more than one metric. */
  if (index == CB_NOINDEX) 
      metric->cb = (metric_func_void)cb;
  else
      metric->cbindexed = cb;

  metric->modp = modp;
  metric->multi_metric_index = index;

  apr_hash_set( metric_callbacks, metric->name, APR_HASH_KEY_STRING, metric);
  return metric;
}

g_val_t
gexec_func ( void )
{
   g_val_t val;
   if( gexec_on )
      snprintf(val.str, 32, "%s", "ON");
   else
      snprintf(val.str, 32, "%s", "OFF");
   return val;
}

g_val_t
heartbeat_func( void )
{
   g_val_t val;
   val.uint32 = started / APR_USEC_PER_SEC;
   return val;
}

g_val_t
location_func(void)
{
   g_val_t val;
   if(!host_location)
     {
       cfg_t *host = cfg_getsec(config_file, "host");
       host_location = cfg_getstr( host, "location");
     }
   strncpy(val.str, host_location, 32);
   return val;
}

static apr_status_t modular_metric_cleanup(void *param)
{
    mmodule *modp = (mmodule*)param;
    if (modp->cleanup) {
        modp->cleanup();
    }
    return APR_SUCCESS;
}

static void
load_metric_modules( void )
{
    cfg_t *tmp;
    int j;

    tmp = cfg_getsec( config_file, "modules");
    for (j = 0; j < cfg_size(tmp, "module"); j++) 
      {
        apr_dso_handle_t *modHandle = NULL;
        apr_dso_handle_sym_t modSym;
        mmodule *modp;
        char *modPath=NULL, *modName=NULL, *modparams=NULL, *modLanguage=NULL;
        apr_array_header_t *modParams_list = NULL;
        int k;
        apr_status_t merge_ret;

        cfg_t *module = cfg_getnsec(tmp, "module", j);

        /* Check the module language to make sure that
           the module is loaded correctly or should be
           delegated to an alternate module interface
        */
        modLanguage = cfg_getstr(module, "language");
        if (modLanguage && strcasecmp(modLanguage, "C/C++")) 
            continue;

        modPath = cfg_getstr(module, "path");
        if(modPath && *modPath != '/' && *modPath != '.')
          {
            if (module_dir)
                merge_ret = apr_filepath_merge(&modPath, module_dir,
                                modPath,
                                APR_FILEPATH_NOTRELATIVE | APR_FILEPATH_NATIVE,
                                global_context);
            else
                merge_ret = apr_filepath_merge(&modPath, GANGLIA_MODULE_DIR,
                                modPath,
                                APR_FILEPATH_NOTRELATIVE | APR_FILEPATH_NATIVE,
                                global_context);

            if (merge_ret != APR_SUCCESS) 
                modPath = cfg_getstr(module, "path");
          }
        modName = cfg_getstr(module, "name");
        modparams = cfg_getstr(module, "params");
        modParams_list = apr_array_make(global_context, 2, sizeof(mmparam));

        for (k = 0; k < cfg_size(module, "param"); k++) 
          {
            cfg_t *param;
            mmparam *node = apr_array_push(modParams_list);

            param = cfg_getnsec(module, "param", k);
            node->name = apr_pstrdup(global_context, param->title);
            node->value = apr_pstrdup(global_context, cfg_getstr(param, "value"));
          }

        /*
         * Load the file into the gmond address space
         */
        if (apr_dso_load(&modHandle, modPath, global_context) != APR_SUCCESS) 
          {
            char my_error[256];

            err_msg("Cannot load %s metric module: %s", modPath,
                     apr_dso_error(modHandle, my_error, sizeof(my_error)));
            if (!modPath) 
                err_msg("No load path specified for module: %s or incorrect module language designation [%s].\n", 
                        modName, modLanguage);
            continue;
          }
        debug_msg("loaded module: %s", modName);

        /*
         * Retrieve the pointer to the module structure through the module name.
         */
        if (apr_dso_sym(&modSym, modHandle, modName) != APR_SUCCESS) 
          {
            char my_error[256];

            err_msg("Cannot locate internal module structure '%s' in file %s: %s\nPossibly an incorrect module language designation [%s].\n", 
                     modName, modPath, apr_dso_error(modHandle, my_error, sizeof(my_error)), modLanguage);
            continue;
          }

        modp = (mmodule*) modSym;
        modp->dynamic_load_handle = (apr_dso_handle_t *)modHandle;
        modp->module_name = apr_pstrdup (global_context, modName);
        modp->module_params = apr_pstrdup (global_context, modparams);
        modp->module_params_list = modParams_list;
        modp->config_file = config_file;

        /*
         * Make sure the found module structure is really a module structure
         *
         */
        if (modp->magic != MMODULE_MAGIC_COOKIE) {
            err_msg("Internal module structure '%s' in file %s is not compatible -"
                     "perhaps this is not a metric module.\n", 
                     modName, modPath);
            continue;
        }

        /* Validate that the module was built against a compatible module interface API. */
        if (modp->version != MMODULE_MAGIC_NUMBER_MAJOR) {
            err_msg("Module \"%s\" is not compatible with this "
                    "version of Gmond (found %d, need %d).",
                    modName, modp->version, MMODULE_MAGIC_NUMBER_MAJOR);
            continue;
        }

        if (metric_modules != NULL) {
            modp->next = metric_modules;
        }
        metric_modules = modp;
      }
    return;
}

/* This function imports the metrics from libmetrics right now but in the future
 * we could easily do this via DSO. */
static void
setup_metric_callbacks( void )
{
  mmodule *modp = metric_modules;

  /* Create the metric_callbacks hash */
  metric_callbacks = apr_hash_make( global_context );

  while (modp) {
      const Ganglia_25metric* metric_info;
      int i;

      if (modp->init && modp->init(global_context)) {
          err_msg("Module %s failed to initialize.\n", modp->module_name);
      }
      else
      {
          apr_pool_cleanup_register(global_context, modp,
                                    modular_metric_cleanup,
                                    apr_pool_cleanup_null);
  
          metric_info = modp->metrics_info;
          for (i = 0; metric_info[i].name != NULL; i++) 
            {
              Ganglia_metric_cb_define(metric_info[i].name, modp->handler, i, modp);
            }
      }
      modp = modp->next;
  }
}

double
setup_collection_groups( void )
{
  int i, num_collection_groups = cfg_size( config_file, "collection_group" );
  double bytes_per_sec = 0;
  
  /* Create the collection group array */
  collection_groups = apr_array_make( global_context, num_collection_groups,
                                      sizeof(Ganglia_collection_group *));

  for(i = 0; i < num_collection_groups; i++)
    {
      int j, num_metrics;
      cfg_t *group_conf;
      Ganglia_collection_group *group = apr_pcalloc( global_context, 
                                                     sizeof(Ganglia_collection_group));
      if(!group)
        {
          err_msg("Unable to malloc memory for collection group. Exiting.\n");
          exit(1);
        }

      group_conf  = cfg_getnsec( config_file, "collection_group", i);
      group->once = cfg_getbool( group_conf, "collect_once");
      group->collect_every = cfg_getint( group_conf, "collect_every");
      group->time_threshold = cfg_getint( group_conf, "time_threshold");

      if(group->once)
        {
          /* TODO: this isn't pretty but simplifies the code( next collect in a year)
             since we will collect the value in this function */
          group->next_collect = apr_time_now() + (31536000 * APR_USEC_PER_SEC);
        }
      else
        {
          group->next_collect = 0;
        }

      group->next_send    = 0;

      num_metrics = cfg_size( group_conf, "metric" );
      group->metric_array = apr_array_make(global_context, num_metrics,
                                           sizeof(Ganglia_metric_callback *)); 
      for(j=0; j< num_metrics; j++)
        {
          cfg_t *metric         = cfg_getnsec( group_conf, "metric", j );
          char *name            = cfg_getstr  ( metric, "name");
          char *title           = cfg_getstr  ( metric, "title");
          float value_threshold = cfg_getfloat( metric, "value_threshold");

          Ganglia_metric_callback *metric_cb =  (Ganglia_metric_callback *)
                        apr_hash_get( metric_callbacks, name, APR_HASH_KEY_STRING );
          Ganglia_25metric *metric_info = NULL;

          if(!metric_cb)
            {
              err_msg("Unable to collect metric '%s' on this platform. Exiting.\n", name);
              exit(1);
            }

          if (metric_cb->modp) 
            {
              const Ganglia_25metric *mi = metric_cb->modp->metrics_info;
              int k;

              /*XXX Store the metric info in a hash_table so that this 
                lookup can be done faster. */
              metric_info = apr_pcalloc( global_context, sizeof(Ganglia_25metric));

              for (k = 0; mi[k].name != NULL; k++) 
                {
                  if (strcasecmp(name,  mi[k].name) == 0) 
                    {
                      memcpy (metric_info, &(mi[k]), sizeof(Ganglia_25metric));
                      break;
                    }
                }

              metric_info->key = modular_metric;
            }
          else 
            {
              err_msg("Unable to send metric '%s' (not in gm_protocol.x). Exiting.\n", name);
              exit(1);
            }

          if(metric_info)
            {
              /* Build the message */
              switch(metric_info->type)
                {
                case GANGLIA_VALUE_UNKNOWN:
                  /* The 2.5.x protocol doesn't allow for unknown values. :(  Do nothing. */
                  continue;
                case GANGLIA_VALUE_STRING:
                  metric_info->key = gmetric_string; 
                  metric_cb->msg.Ganglia_value_msg_u.gstr.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                case GANGLIA_VALUE_UNSIGNED_SHORT:
                  metric_info->key = gmetric_ushort; 
                  metric_cb->msg.Ganglia_value_msg_u.gu_short.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                case GANGLIA_VALUE_SHORT:
                  metric_info->key = gmetric_short; 
                  metric_cb->msg.Ganglia_value_msg_u.gs_short.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                case GANGLIA_VALUE_UNSIGNED_INT:
                  metric_info->key = gmetric_uint; 
                  metric_cb->msg.Ganglia_value_msg_u.gu_int.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                case GANGLIA_VALUE_INT:
                  metric_info->key = gmetric_int; 
                  metric_cb->msg.Ganglia_value_msg_u.gs_int.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                case GANGLIA_VALUE_FLOAT:
                  metric_info->key = gmetric_float; 
                  metric_cb->msg.Ganglia_value_msg_u.gf.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                case GANGLIA_VALUE_DOUBLE:
                  metric_info->key = gmetric_double; 
                  metric_cb->msg.Ganglia_value_msg_u.gd.fmt = apr_pstrdup(global_context, metric_info->fmt); 
                  break;
                default:
                  metric_info->key = gmetric_uint; 
                }
    
              /* This sets the key for this particular metric.
               * The value is set by the callback function later */
              metric_cb->msg.id = metric_info->key;
    
              metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrdup(global_context, myname);
              metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.name = apr_pstrdup(global_context, metric_info->name);
    
              /* Save the location of information about this particular metric */
              metric_cb->info   = metric_info;
    
              /* Set the value threshold for this particular metric */
              metric_cb->value_threshold = value_threshold;
    
              /* Fill in the title or short descriptive name of the metric if
               * one had been given in the configuration file. Otherwise just
               * copy the metric name as the title. */
              if (title)
                {
                  metric_cb->title = apr_pstrdup(global_context, title);
                }
              else  
                {
                  metric_cb->title = apr_pstrdup(global_context, metric_info->name);
                }
    
              /* If this metric will only be collected once, run it now at setup... */
              if(group->once)
                {
                  if (metric_cb->multi_metric_index == CB_NOINDEX) 
                      metric_cb->now = metric_cb->cb();
                  else
                      metric_cb->now = metric_cb->cbindexed(metric_cb->multi_metric_index);
                }
              else
                {
                  /* ... otherwise set it to zero */
                  memset( &(metric_cb->now), 0, sizeof(g_val_t));
                }
              memset( &(metric_cb->last), 0, sizeof(g_val_t));
    
              /* Calculate the bandwidth this metric will use */
              bytes_per_sec += ( (double)metric_info->msg_size / (double)group->time_threshold );
    
              /* Push this metric onto the metric_array for this group */
              *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = metric_cb;
            }
          else
            {
              err_msg("Unable to find the metric information for '%s'. Possible that the module has not been loaded.\n", name);
            }
        }

      /* Save the collection group the collection group array */
      *(Ganglia_collection_group **)apr_array_push(collection_groups) = group;
    }

  return bytes_per_sec;
}

void
Ganglia_collection_group_collect( Ganglia_collection_group *group, apr_time_t now)
{
  int i;

  /* Collect data for all the metrics in the groups metric array */
  for(i=0; i< group->metric_array->nelts; i++)
    {
      Ganglia_metric_callback *cb = ((Ganglia_metric_callback **)(group->metric_array->elts))[i];

      debug_msg("\tmetric '%s' being collected now", cb->name);
      cb->last = cb->now;
      if (cb->multi_metric_index == CB_NOINDEX) 
          cb->now = cb->cb();
      else
          cb->now = cb->cbindexed(cb->multi_metric_index);

      /* Check the value threshold.  If passed.. set this group to send immediately. */
      if( cb->value_threshold >= 0.0 )
        {
          debug_msg("\tmetric '%s' has value_threshold %f", cb->name, cb->value_threshold);
          switch(cb->info->type)
            {
            case GANGLIA_VALUE_UNKNOWN:
            case GANGLIA_VALUE_STRING:
              /* do nothing for non-numeric data */
              break;
            case GANGLIA_VALUE_UNSIGNED_SHORT:
              if( abs( cb->last.uint16 - cb->now.uint16 ) >= cb->value_threshold )
                  group->next_send = 0; /* send immediately */
              break;
            case GANGLIA_VALUE_SHORT:
              if( abs( cb->last.int16 - cb->now.int16 ) >= cb->value_threshold )
                  group->next_send = 0; /* send immediately */
              break;
            case GANGLIA_VALUE_UNSIGNED_INT:
              if( abs( cb->last.uint32 - cb->now.uint32 ) >= cb->value_threshold )
                  group->next_send = 0; /* send immediately */
              break;
            case GANGLIA_VALUE_INT:
              if( abs( cb->last.int32 - cb->now.int32 ) >= cb->value_threshold )
                  group->next_send = 0; /* send immediately */
              break;
            case GANGLIA_VALUE_FLOAT:
              if( fabsf( cb->last.f - cb->now.f ) >= cb->value_threshold )
                  group->next_send = 0; /* send immediately */
              break;
            case GANGLIA_VALUE_DOUBLE:
              if( fabs( cb->last.d - cb->now.d ) >= cb->value_threshold )
                  group->next_send = 0; /* send immediately */
              break;
            default:
              break;
            }
        }
      /* If the metadata_last_set has been set to 0 then a request 
       *  to resend the metadata has been received. Send the group 
       *  immediately */
      if (cb->metadata_last_sent == 0) 
        {
          group->next_send = 0;
        }
    }

  /* Set the next time this group should be collected */
  group->next_collect = now + (group->collect_every * APR_USEC_PER_SEC);
}

void
Ganglia_collection_group_send( Ganglia_collection_group *group, apr_time_t now)
{
    int i;
    
    /* This group needs to be sent */
    for(i=0; i< group->metric_array->nelts; i++)
      {
        XDR x;
        int len, errors;
        char metricmsg[max_udp_message_len];
        Ganglia_metric_callback *cb = ((Ganglia_metric_callback **)(group->metric_array->elts))[i];
        
        /* Build the message */
        switch(cb->info->type)
          {
          case GANGLIA_VALUE_UNKNOWN:
            /* The 2.5.x protocol doesn't allow for unknown values. :(  Do nothing. */
            continue;
          case GANGLIA_VALUE_STRING:
            cb->msg.Ganglia_value_msg_u.gstr.str = cb->now.str; 
            break;
          case GANGLIA_VALUE_UNSIGNED_SHORT:
            cb->msg.Ganglia_value_msg_u.gu_short.us = cb->now.uint16;
            break;
          case GANGLIA_VALUE_SHORT:
            cb->msg.Ganglia_value_msg_u.gs_short.ss = cb->now.int16;
            break;
          case GANGLIA_VALUE_UNSIGNED_INT:
            cb->msg.Ganglia_value_msg_u.gu_int.ui = cb->now.uint32;
            break;
          case GANGLIA_VALUE_INT:
            cb->msg.Ganglia_value_msg_u.gs_int.si = cb->now.int32;
            break;
          case GANGLIA_VALUE_FLOAT:
            cb->msg.Ganglia_value_msg_u.gf.f = cb->now.f;
            break;
          case GANGLIA_VALUE_DOUBLE:
            cb->msg.Ganglia_value_msg_u.gd.d = cb->now.d;
            break;
          default:
            continue;
          }

        /* Send the full metadata packet if the specified interval has elapsed or a
         *  request has been received to resend the metadata.  In this case the 
         *  metadata_last_set field will be 0.  No need to send the full data 
         *  with every value update.
         */
        if (!cb->metadata_last_sent || (send_metadata_interval && 
            (cb->metadata_last_sent < (now - apr_time_make(send_metadata_interval,0))))) 
          {
            Ganglia_metric gmetric = Ganglia_metric_create((Ganglia_pool)global_context);
            char *val, *type;
            apr_pool_t *gm_pool = (apr_pool_t*)gmetric->pool;

            if(!gmetric)
              {
                /* no memory */
                return;
              }
        
            val = apr_pstrdup(gm_pool, host_metric_value(cb->info, &(cb->msg)));
            type = apr_pstrdup(gm_pool, host_metric_type(cb->info->type));
        
            errors = Ganglia_metric_set(gmetric, cb->info->name, val, type,
                        cb->info->units, cstr_to_slope( cb->info->slope),
                        cb->info->tmax, 0);

            if (errors) 
              {
                err_msg("Error %d setting the modular data for %s\n", errors, cb->name);
              }
            else 
              {
                Ganglia_metadata_add(gmetric, "TITLE", cb->title);
                Ganglia_metadata_add(gmetric, "DESC", cb->info->desc);

                /* Add the rest of the metadata here by interating through 
                 *  the metadata table of the metric_info structure */
                if (cb->info->metadata) 
                  {
                    int i;
                    const apr_array_header_t *arr = apr_table_elts((apr_table_t*)cb->info->metadata);
                    const apr_table_entry_t *elts = (const apr_table_entry_t *)arr->elts;

                    /* add all of the metadata to the packet */
                    for (i = 0; i < arr->nelts; ++i) 
                      {
                        if (elts[i].key == NULL)
                            continue;
                        Ganglia_metadata_add(gmetric, elts[i].key, elts[i].val);
                      }
                  }

                debug_msg("\tsending metadata for metric: %s", cb->name);

                ganglia_scoreboard_inc(PKTS_SENT_METADATA);
                errors = Ganglia_metadata_send(gmetric, udp_send_channels);
                if (errors) 
                  {
                    err_msg("Error %d sending the modular data for %s\n", errors, cb->name);
                    debug_msg("\tsent message '%s' with %d errors", cb->name, errors);
                    ganglia_scoreboard_inc(PKTS_SENT_FAILED);
                  }
                else 
                  {
                    cb->metadata_last_sent = now; /* mark the metadata as sent */
                  }
              }

            Ganglia_metric_destroy(gmetric);
          }

        /* Send the updated value packet ever time it is collected */
        xdrmem_create(&x, metricmsg, max_udp_message_len, XDR_ENCODE);
        xdr_Ganglia_value_msg(&x, &(cb->msg));
        len = xdr_getpos(&x); 
        errors = send_message( metricmsg, len );
        debug_msg("\tsent message '%s' of length %d with %d errors", cb->name, len, errors);
        ganglia_scoreboard_inc(PKTS_SENT_VALUE);
        ganglia_scoreboard_inc(PKTS_SENT_ALL);

        if(!errors)
          {
            /* If the message send ok. Schedule the next time threshold. */
            group->next_send = now + (group->time_threshold * APR_USEC_PER_SEC);
          }
        else
            ganglia_scoreboard_inc(PKTS_SENT_FAILED);
      }
}
 
/* TODO: It might be necessary in the future to use a heap for the collection groups.
 * Running through an array should suffice for now */
apr_time_t
process_collection_groups( apr_time_t now )
{
  int i;
  apr_time_t next = 0;

  /* Run through each collection group and collect any data that needs collecting... */
  for(i=0; i< collection_groups->nelts; i++)
    {
      Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
      if(group->next_collect <= now)
        {
          Ganglia_collection_group_collect(group, now);
        }
    }

  /* Run through each collection group and send any data that needs sending... */
  for(i=0; i< collection_groups->nelts; i++)
    {
      Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
      if( group->next_send <= now )
        {
          Ganglia_collection_group_send(group, now);
        }
    }

  /* Run through each collection group and find when our next event (collect|send) occurs */
  for(i=0; i< collection_groups->nelts; i++)
    {
      apr_time_t min;
      Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
      min = group->next_send < group->next_collect? group->next_send : group->next_collect;
      if(!next)
        {
          next = min;
        }
      else
        {
          if(min < next)
            {
              next = min;
            }
        }
    }

  /* make sure we don't schedule for the past */
  return next < now ? now + 1 * APR_USEC_PER_SEC: next;
}

static void
print_metric_list( void )
{
  apr_hash_index_t *hi;
  void *val;
  char modular_desc[1024];

  for(hi = apr_hash_first(global_context, metric_callbacks);
      hi;
      hi = apr_hash_next(hi))
    {
      Ganglia_metric_callback *cb;
      Ganglia_25metric *metric_info;
      char *desc = NULL;

      apr_hash_this(hi, NULL, NULL, &val);
      cb = val;
      metric_info = NULL;

      if (cb->modp) 
        {
          int i;

          metric_info = (Ganglia_25metric *)cb->modp->metrics_info;
          for (i = 0; metric_info[i].name != NULL; i++) 
            {
              if (strcasecmp(cb->name,  metric_info[i].name) == 0) 
                {
                  sprintf (modular_desc, "%s (module %s)", metric_info[i].desc, cb->modp->module_name);
                  desc = (char*)modular_desc;
                  break;
                }
            }
        }

      if (desc == NULL) 
        {
          desc = "<no description available>";
        }

      fprintf(stdout, "%-15s\t%s\n", cb->name, desc);
    }
}

static void
cleanup_data( apr_pool_t *pool, apr_time_t now)
{
  apr_hash_index_t *hi, *metric_hi;

  /* Walk the host hash */
  for(hi = apr_hash_first(pool, hosts);
      hi;
      hi = apr_hash_next(hi))
    {
      void *val;
      Ganglia_host *host;
      apr_hash_this(hi, NULL, NULL, &val);
      host = val;

      if( host_dmax && (now - host->last_heard_from) > (host_dmax * APR_USEC_PER_SEC) )
        {
          /* this host is older than dmax... delete it */
          debug_msg("deleting old host '%s' from host hash'", host->hostname);
          /* remove it from the hash */
          apr_hash_set( hosts, host->ip, APR_HASH_KEY_STRING, NULL);
          /* free all its memory */
          apr_pool_destroy( host->pool);
        } 
      else
        {
          /* this host isn't being deleted but it might have some stale gmetric data */
          for( metric_hi = apr_hash_first( pool, host->metrics );
               metric_hi;
               metric_hi = apr_hash_next( metric_hi ))
            {
              void *val;
              Ganglia_metadata *metric;
              int dmax;

              apr_hash_this( metric_hi, NULL, NULL, &val );
              metric = val;

              if(!metric || metric->message_u.f_message.id != gmetadata_full)
                  continue;  /* this shouldn't happen */

              dmax = metric->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.dmax;
              if( dmax && (now - metric->last_heard_from) > (dmax * APR_USEC_PER_SEC) )
                {
                  /* this is a stale gmetric */
                  debug_msg("deleting old metric '%s' from host '%s'", metric->name, host->hostname);
                  /* remove the metric from the metric and values hash */
                  apr_hash_set( host->metrics, metric->name, APR_HASH_KEY_STRING, NULL);
                  apr_hash_set( host->gmetrics, metric->name, APR_HASH_KEY_STRING, NULL);
                  /* destroy any memory that was allocated for this gmetric */
                  apr_pool_destroy( metric->pool );
                }
            }
        }
    }

  apr_pool_clear( pool );
}

void initialize_scoreboard()
{
    ganglia_scoreboard_init(global_context);
    ganglia_scoreboard_add(PKTS_RECVD_ALL, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_RECVD_FAILED, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_RECVD_IGNORED, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_RECVD_METADATA, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_RECVD_VALUE, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_RECVD_REQUEST, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_SENT_ALL, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_SENT_METADATA, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_SENT_VALUE, GSB_READ_RESET);
    ganglia_scoreboard_add(PKTS_SENT_REQUEST, GSB_READ_RESET);
}

int done = 0;
void sig_handler(int i)
{
    done = 1;
}

int
main ( int argc, char *argv[] )
{
  apr_time_t now, next_collection, last_cleanup;
  apr_pool_t *cleanup_context;

  if (cmdline_parser (argc, argv, &args_info) != 0)
      exit(1) ;

  if(args_info.convert_given)
    {
      exit (print_ganglia_25_config( args_info.convert_arg ));
    }

  /* Create the global context */
  global_context = (apr_pool_t*)Ganglia_pool_create(NULL);

  /* Create the cleanup context from the global context */
  cleanup_context = (apr_pool_t*)Ganglia_pool_create((Ganglia_pool)global_context);

  /* Mark the time this gmond started */
  started = apr_time_now();

  /* Builds a default configuration based on platform */
  build_default_gmond_configuration((Ganglia_pool)global_context);

  if(args_info.default_config_flag)
    {
      fprintf(stdout, default_gmond_configuration);
      fflush( stdout );
      exit(0);
    }

  process_configuration_file();

  if(args_info.metrics_flag)
    {
      load_metric_modules();
      initialize_scoreboard();
      setup_metric_callbacks();
      print_metric_list();
      fflush( stdout );
      exit(0);
    }

  if(args_info.bandwidth_flag)
    {
      double bytes_per_sec;
      setup_metric_callbacks();
      bytes_per_sec = setup_collection_groups();
      fprintf(stdout, "%f bytes/sec\n", bytes_per_sec);
      exit(0);
    }

  if(args_info.location_given)
    {
      host_location = args_info.location_arg;
    }

  load_metric_modules();
  daemonize_if_necessary( argv );

  if (args_info.pid_file_given)
    {
      update_pidfile (args_info.pid_file_arg);
    }
  
  /* Collect my hostname */
  apr_gethostname( myname, APRMAXHOSTLEN+1, global_context);

  apr_signal( SIGPIPE, SIG_IGN );
  apr_signal( SIGINT, sig_handler );

  initialize_scoreboard();

  /* This must occur before we setuid_if_necessary() particularly on freebsd
   * where we need to be root to access /dev/mem to initialize metric collection */
  setup_metric_callbacks();

  setuid_if_necessary(); 

  process_deaf_mute_mode();

  if(!deaf)
    {
      setup_listen_channels_pollset();
    }

  /* even if mute, a send channel may be needed to send a request for metadata */
  udp_send_channels = Ganglia_udp_send_channels_create((Ganglia_pool)global_context, 
                                                       (Ganglia_gmond_config)config_file);
  if(!udp_send_channels)
    {
      /* if there are no send channels defined, we are equivalent to mute */
      mute = 1;
    }
  if(!mute)
    {
      setup_collection_groups();
    }

  if(!listen_channels)
    {
      /* if there are no listen channels defined, we are equivalent to deaf */
      deaf = 1;
    }

  /* Create the host hash table */
  hosts = apr_hash_make( global_context );

  /* Initialize time variables */
  last_cleanup = next_collection = now = apr_time_now();

  /* Loop */
  for(;!done;)
    {
      /* Make sure we never wait for negative seconds (shouldn't happen) */
      apr_interval_time_t wait = next_collection >= now ? next_collection - now : 1;
      if(!deaf)
        {
          /* Pull in incoming data */
          poll_listen_channels(wait, now);
        }
      else
        {
          /* Sleep until next collection */
          apr_sleep( wait );
        }

      /* only continue if it's time to process our collection groups */
      now = apr_time_now();
      if(now < next_collection)
          continue;

      if(!deaf)
        {
          /* cleanup the data if the cleanup threshold has been met */
          if( (now - last_cleanup) > apr_time_make(cleanup_threshold,0))
            {
              cleanup_data( cleanup_context, now );
              last_cleanup = now;
            }
        }

      if(!mute)
        {
          /* collect data from collection_groups */
          next_collection = process_collection_groups( now );
        }
      else
        {
          /* we're mute. nothing to collect and send. */
          next_collection = now + 60 * APR_USEC_PER_SEC;
        }
    }

  return 0;
}

Generated by  Doxygen 1.6.0   Back to index