Feature #1661 ยป autofp_ippair.patch
src/tmqh-flow.c 2015-09-06 23:02:51.569366770 +0100 | ||
---|---|---|
Packet *TmqhInputFlow(ThreadVars *t);
|
||
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
|
||
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
|
||
void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
|
||
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
|
||
void *TmqhOutputFlowSetupCtx(char *queue_str);
|
||
... | ... | |
} else if (strcasecmp(scheduler, "hash") == 0) {
|
||
SCLogInfo("AutoFP mode using \"Hash\" flow load balancer");
|
||
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
|
||
} else if (strcasecmp(scheduler, "ippair") == 0) {
|
||
SCLogInfo("AutoFP mode using \"ippair\" flow load balancer");
|
||
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
|
||
} else {
|
||
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
|
||
"for autofp-scheduler in conf. Killing engine.",
|
||
... | ... | |
(void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
|
||
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
|
||
}
|
||
} else {
|
||
qid = ctx->last++;
|
||
if (ctx->last == ctx->size)
|
||
ctx->last = 0;
|
||
}
|
||
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
|
||
PacketQueue *q = ctx->queues[qid].q;
|
||
SCMutexLock(&q->mutex_q);
|
||
PacketEnqueue(q, p);
|
||
SCCondSignal(&q->cond_q);
|
||
SCMutexUnlock(&q->mutex_q);
|
||
return;
|
||
}
|
||
/**
|
||
* \brief select the queue to output based on IP address pair.
|
||
*
|
||
* \param tv thread vars.
|
||
* \param p packet.
|
||
*/
|
||
void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
|
||
{
|
||
int16_t qid = 0;
|
||
uint32_t addr_hash = 0;
|
||
int i;
|
||
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
|
||
/* if no flow we use the first queue,
|
||
* should be rare */
|
||
if (p->flow != NULL) {
|
||
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
|
||
if (qid == -1) {
|
||
if (p->src.family == AF_INET6) {
|
||
for (i = 0; i < 4; i++) {
|
||
addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
|
||
}
|
||
} else {
|
||
addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
|
||
}
|
||
/* we don't have to worry about possible overflow, since
|
||
* ctx->size will be lesser than 2 ** 31 for sure */
|
||
qid = addr_hash % ctx->size;
|
||
(void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
|
||
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
|
||
}
|
||
} else {
|
||
qid = ctx->last++;
|
||