首页 > 系统 > Linux >

Linux C++ 实现时间轮 优化超时检测机制

2016-07-25

参考资料: http: www ijilei com 8357 https: www zhihu com question 38427301 https: www ibm com developerworks cn linux l-cn-timers http: www cnblogs com processakai archive 2012 04 11


我参考java代码的思路来编写的C++代码 ,说来其实也没啥技术难度,把代码分享下供后来的技术人使用

我使用c++ 实现的时间轮主要是用于检测超时,是tcp会话的超时与否
所以定义一个sessionKey作为唯一的主键key.

因为我使用了C++11中的unordered_map,所以需要重载operator == 操作符,并自己编写hash函数,也就是SessionHash中的函数

class Sessionkey{
public:
    Sessionkey(){srcIp=dstIp=srcPort=dstPort = 0;};
    Sessionkey(const Sessionkey& skey);
    Sessionkey(uint32_t src,uint32_t dst,uint16_t sp,uint16_t dp):srcIp(src),dstIp(dst),srcPort(sp),dstPort(dp){}

    bool operator == (const Sessionkey &skey) const;

public:
    uint32_t srcIp;
    uint32_t dstIp;
    uint16_t srcPort;
    uint16_t dstPort;
};

class SessionHash{
public:
    size_t operator()(const Sessionkey& sk) const;
};
#include "sessionKey.h"
#include "murmurHash.h"

Sessionkey::Sessionkey(const Sessionkey& sk)
{
    srcIp   = sk.srcIp;
    dstIp = sk.dstIp;
    srcPort = sk.srcPort;
    dstPort = sk.dstPort;
}

bool Sessionkey::operator==(const Sessionkey& sk) const
{
    return  (srcIp == sk.srcIp) && (dstIp == sk.dstIp)
        && (srcPort == sk.srcPort) && (dstPort == sk.dstPort);
}

/*通过Sessionkey的全部成员构造hash值,切勿随意在Sessionkey类中添加成员变量*/
size_t SessionHash::operator()(const Sessionkey& sk) const 
{
    ull64_t ul64 = murmurHash64A(&sk, sizeof(sk), 0xee6b27eb);
    return ul64;
}

哈希值生成算法采用的是

ull64_t murmurHash64A ( const void * key, int len, ull64_t seed ){
    //const uint64_t m = BIG_CONSTANT(0xc6a4a7935bd1e995);
    const ull64_t m = 0xc6a4a7935bd1e995;
    const int r = 47;

    ull64_t h = seed ^ (len * m);

    const ull64_t * data = (const ull64_t *)key;
    const ull64_t * end = data + (len/8);

    while(data != end)
    {
        ull64_t k = *data++;

        k *= m; 
        k ^= k >> r; 
        k *= m; 

        h ^= k;
        h *= m; 
    }

    const unsigned char * data2 = (const unsigned char*)data;

    switch(len & 7)
    {
    case 7: h ^= ull64_t(data2[6]) << 48;
    case 6: h ^= ull64_t(data2[5]) << 40;
    case 5: h ^= ull64_t(data2[4]) << 32;
    case 4: h ^= ull64_t(data2[3]) << 24;
    case 3: h ^= ull64_t(data2[2]) << 16;
    case 2: h ^= ull64_t(data2[1]) << 8;
    case 1: h ^= ull64_t(data2[0]);
        h *= m;
    };

    h ^= h >> r;
    h *= m;
    h ^= h >> r;

    return h;
}
#ifndef __TIME_WHEEL_H__
#define __TIME_WHEEL_H__

#include 
#include 
#include 

#include 
#include 
#include 

#include 

#include 
#include 
#include "lfds611.h"

using namespace std;

/****************************全局函数定义区**********************************/
#define TIMEOUT_SESSKEY_CNT  1000 //default
#define SESSKEY_BUFFER_CNT  2000

void *tickStepThreadGlobal(void* param);

typedef std::unordered_map tDurationMap;

/*定义槽类Slot*/
class Slot
{
public:
    Slot();
    Slot(int nId);
    ~Slot();
    void addElement(Sessionkey& key,int num);
    void removeElement(Sessionkey& key);

public:
    /*unordered_map的int是预留字段*/
    tDurationMap slotDurationMap;//sessionKey和int
    int id;
};

typedef std::unordered_map tRelationMap;

/*时间轮类*/
class CTimeWheel
{
public:
    CTimeWheel();
    ~CTimeWheel();

    /*tickDuration:一个tick持续时间    ticksPerWheel 一轮的tick数(会话超时时间)       timeUnit:时间单位,如毫秒*/
    //构造函数中开启tick step线程
    CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit);
public:
    /*添加元素,返回新加入元素的timeout时间*/
    long addElement(Sessionkey& key,int num);
    bool removeElement(Sessionkey& key);

    /*开启线程步进tick数*/
    void tickStepRun();

private:
    void waitForNextTick();
    int getPreviousTickIndex();
    bool checkAdd(Sessionkey& key);
    void notifyExpired(int idx);//此处有疑问
    /*返回值以秒为单位*/
    long getCurrentTime();

public:
    /*读写加锁*/
    mutex mtx;

    /*时间轮,元素是Slot类型*/
    std::vector wheel;

    /*维护sessionKey和slot槽对应关系的哈希表*/
    tRelationMap keyToSlotMap;

    /*存储超时会话的SessionKey*/
    struct lfds611_queue_state *timeoutSessionQueue;

    Sessionkey  *sessKeyPool;

private:
    uint32_t tickDuration;
    uint32_t ticksPerWheel;
    uint32_t currentTickIndex;
    pthread_t tickThread;

    long startTime;
    long tick;
};


#endif /* __STW_TIMER_H__ */
#include "timeWheel.h"

void *tickStepThreadGlobal(void* param)
{
    CTimeWheel* pThis = (CTimeWheel*)param;
    pThis->tickStepRun();

    return NULL;
}

Slot::Slot()
{
    id = 0;
}

Slot::Slot(int nId)
{
    id = nId;
}

Slot::~Slot()
{

}

void Slot::addElement(Sessionkey& key,int num)
{
    if(0 == key.dstPort || 0 == key.srcPort)
    {
        VLOG(4)<<"addElement ERROR! 8888888888888888888888";
    }

    slotDurationMap.insert(make_pair(key,num));
}

void Slot::removeElement(Sessionkey& key)
{
    slotDurationMap.erase(key);
}

/************************CTimeWheel类实现**********************************/

CTimeWheel::CTimeWheel()
{
    tick = 0;
    currentTickIndex = 0;
}
CTimeWheel::~CTimeWheel()
{
    /*释放申请的slot资源*/
    for(uint32_t i=0;iticksPerWheel;i++)
    {
        Slot* tmp = new Slot(i);
        wheel.push_back(tmp);
    }

    sessKeyPool = new Sessionkey[SESSKEY_BUFFER_CNT];

    /*申请无锁队列存储超时sessionkey*/
    lfds611_queue_new(&timeoutSessionQueue,TIMEOUT_SESSKEY_CNT);

    /*开启线程,传递this指针*/
    if(pthread_create(&tickThread,NULL,tickStepThreadGlobal,this)!=0)
    {
        LOG(ERROR) <<"create tickStepThreadGlobal thread failed!";
    }
}

void CTimeWheel::tickStepRun()
{
    //获取当前时间
    startTime = getCurrentTime();

    //设置tick为1
    tick = 1;

    //1.获取当前tick指针的slot
    for(int i=0;;i++)
    {
        if(i == wheel.size())
        {
            i=0;
        }

        //加锁
        mtx.try_lock();

        currentTickIndex = i;

        //解锁
        mtx.unlock();

        //2.对当前slot所有元素进行timeout处理(重要,暂时没完成)
        notifyExpired(currentTickIndex);

        //3.等待下一次tick到来
        waitForNextTick();
    }
}

void CTimeWheel::waitForNextTick()
{
    while(1)
    {
        long currentTime = getCurrentTime();
        long sleepTime = tickDuration * tick - (currentTime - startTime);//单位

        /*这块的值可能过大,加调试信息*/
        //VLOG(3)<<"tick step Thread sleepTime is "<"<"<dstPort;

        VLOG(4)<<"sessionKey enqueue enqueue enqueue";

        //删除记录
        removeElement(key);

        if(++index  == SESSKEY_BUFFER_CNT)
        {
            index = 0;
        }   
    }
}

bool CTimeWheel::checkAdd(Sessionkey& key)
{
    //检测集合中是否存在,如存在则删除slot槽中元素,删除keyToSlotMap对应表中元素
    Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);

    //删除keyToSlotMap关系表key对应元素
    tRelationMap::iterator iteGot = keyToSlotMap.find(key);
    if(iteGot == keyToSlotMap.end())
    {
        iteGot = keyToSlotMap.find(reverseKey);
        if(iteGot == keyToSlotMap.end())
        {
            VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
            return false;
        }
        else
        {
            Slot* pSlot = iteGot->second;
            if(NULL == pSlot)
            {
                VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
                return false;
            }

            //4.删除wheel slot中的元素
            pSlot->removeElement(reverseKey);
            VLOG(3)<<"Erase key from wheel slot!";

            //5.删除keyToSlotMap关系表中元素,便于后续添加和更新
            keyToSlotMap.erase(reverseKey);
            VLOG(3)<<"Erase key from keyToSlotMap!";

            return true;
        }
    }
    else
    {
        Slot* pSlot = iteGot->second;
        if(NULL == pSlot)
        {
            VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
            return false;
        }

        //6.删除wheel slot中的元素
        pSlot->removeElement(key);
        VLOG(3)<<"Erase key from wheel slot!";

        //7.删除keyToSlotMap关系表中元素,便于后续添加和更新
        keyToSlotMap.erase(key);
        VLOG(3)<<"Erase key from keyToSlotMap!";

        return true;
    }

    return true;
}

int CTimeWheel::getPreviousTickIndex()
{   
    //加锁
    mtx.try_lock();

    int cti = currentTickIndex;
    if(0 == cti)
    {
        return ticksPerWheel - 1;//4
    }

    return cti - 1;

    //解锁
    mtx.unlock();
}

long CTimeWheel::getCurrentTime()
{
    struct timeval tv;
    gettimeofday(&tv,NULL);
    return tv.tv_sec  + tv.tv_usec / 1000000;
}
相关文章
最新文章
热点推荐