首页 > 程序开发 > 综合编程 > 其他综合 >

zookeeper session tracker机制分析

2012-11-08

说到zookeeper session管理 ,免不了要问什么是session?session id/session是如何产生的?session 信息如何存储?本文以session tracker线程【详见SessionTrackerImpl】的运行机制作为主线,并尝试解答一些...

说到zookeeper session管理 ,免不了要问

什么是session?

session id/session是如何产生的?

session 信息如何存储?

本文以session tracker线程【详见SessionTrackerImpl】的运行机制作为主线,并尝试解答一些相关问题


1)session基础

在介绍session tracker线程之前先回答几个问题


1.1) 什么是session?

zookeeper中session意味着一个物理连接,客户端connect成功之后,会发送一个connect型请求,此时就会有session 产生(下面会具体讲)


1.2)sessionid是如何产生的?

在SessionTrackerImpl实例化的时候就会调用下面的函数【详见SessionTrackerImpl.initializeNextSession】

123456 public static long initializeNextSession(long id) { long nextSid = 0; nextSid = (System.currentTimeMillis() << 24) >> 8; nextSid = nextSid | (id <<56); return nextSid; }

产生的值会存入nextSessionId属性,以后一旦有新的连接(session)产生,就会nextSessionId++

1.3)session是如何产生的?


接到一个连接类型的请求【详见ZooKeeperServer.processConnectRequest】

12345678910111213141516171819202122232425262728 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don&#39;t want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); long sessionId = connReq.getSessionId(); if (sessionId != 0) { long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }


1.3.1)确定session的timeout和id
【详见SessionTrackerImpl.createSession】

1234 synchronized public long createSession(int sessionTimeout) { addSession(nextSessionId, sessionTimeout); return nextSessionId++; }

可见产生session需要两个元素,一个是sessionid,一个是timeout

timeout由客户端确定,但必须在服务器规定的最大的timeout(ticktime*20)和最小的timeout(ticktime*2)之间

如果客户端没有指定sessionid,那么就会产生一个新的session【详见ZooKeeperServer.createSession】,否则会reopen【详见ZooKeeperServer.reopenSession】

sessionid的产生上面解释过了

1.3.2)实例化session及相关关系存放
【详见SessionTrackerImpl.addSession】


12345678910 sessionsWithTimeout.put(id, sessionTimeout); if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout, 0); sessionsById.put(id, s); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Adding session 0x" + Long.toHexString(id) + " " + sessionTimeout); } }

一个重要的数据结构sessionsWithTimeout存放sessionid和timeout的映射

另一个重要的数据结构sessionsById存放sessionid和SessionImpl实例的映射

1.3.3)确定session实例的tickTime及sessionSets关系维护
【详见SessionTrackerImpl.touchSession】


12345678910111213141516 long expireTime = roundToInterval(System.currentTimeMillis() + timeout); if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s);


根据当前时间和timeout计算本session 的expireTime即tickTime

一个重要的数据结构sessionSets 存放过期时间和一组session实例(相同过期时间)的映射的建立及维护

session实例的tickTime的确定

2)session tracker线程的机制


在zookeeper服务体系中,专门有一个线程(session tracker)维护session【详见SessionTrackerImpl.run】,重要代码如下

1234567891011121314 currentTime = System.currentTimeMillis();if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue;}SessionSet set;set = sessionSets.remove(nextExpirationTime);if (set != null) { for (SessionImpl s : set.sessions) { sessionsById.remove(s.sessionId); expirer.expire(s); }}nextExpirationTime += expirationInterval;


可见SessionTrackerImpl这个线程会一直轮询的清除过期session


每次轮询都会比较currentTime和nextExpirationTime,如果还未到nextExpirationTime,就等,否则往下走

将sessionSets中的以nextExpirationTime为key的那组session移出

遍历session,从sessionsById移除session,并调用相关的过期处理(下面会讲)

调整下载比较的时间,即nextExpirationTime += expirationInterval;


3) session维护相关问题

3.1)清除session如何实现?

【详见ZooKeeperServer.close】

123 private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); }

3.1.1)构造一个Request实例
3.1.2)调用PrepRequestProcessor.processRequest放入submittedRequests队列
3.1.3)PrepRequestProcessor线程的处理
12345678910111213141516171819202122232425262728293031323334 request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type); switch (type) { //省略N行代码...... case OpCode.closeSession: // We don&#39;t want to do this check since the session expiration thread // queues up this operation without being the session owner. // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); HashSet<String> es = zks.getZKDatabase() .getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { for (ChangeRecord c : zks.outstandingChanges) { if (c.stat == null) { // Doing a delete es.remove(c.path); } else if (c.stat.getEphemeralOwner() == request.sessionId) { es.add(c.path); } } for (String path2Delete : es) { addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path2Delete, null, 0, null)); } zks.sessionTracker.setSessionClosing(request.sessionId); } LOG.info("Processed session termination for sessionid: 0x" + Long.toHexString(request.sessionId)); break;


设置request.hdr,这个很重要,

在FinalRequestProcessor.processRequest会有相应的处理

123456 if (request.hdr != null) { TxnHeader hdr = request.hdr; Record txn = request.txn; rc = zks.processTxn(hdr, txn); }

&#8203;

一旦某个session关闭,与session相关的EPHEMERAL类型的节点都得清除


并且通过调用sessionTracker.setSessionClosing将session设置为关闭,使得后续此session上的请求无效

3.1.4)SessionTrackerImpl相关数据结构的清理
【详见SessionTrackerImpl.removeSession】


123456789101112 synchronized public void removeSession(long sessionId) { SessionImpl s = sessionsById.remove(sessionId); sessionsWithTimeout.remove(sessionId); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Removing session 0x" + Long.toHexString(sessionId)); } if (s != null) { sessionSets.get(s.tickTime).sessions.remove(s); }}

分别对sessionsById、sessionsWithTimeout、sessionSets进行处理


3.2)session owner咋回事?
如果不是在集群环境,即没有LearnerHandler线程,session 的owner就是一个常量实例ServerCnxn.me


3.3)sessionsWithTimeout这个数据结构的用途?

sessionsWithTimeout存放的是sessionid和timeout,此数据结构会和ZKDatabase中相通,会被持久化


如果某个session timeout为60s,如果空闲了30s,意味着还能空闲30s,此时服务重启,那么此session的timeout又变为60s


3.4)touch session是干吗的?
每次一旦该session有请求,就会touch,意味着session的过期时间变为(基本等于当前时间+timeout)

具体算法为

1234 private long roundToInterval(long time) { // We give a one interval grace period return (time / expirationInterval + 1) * expirationInterval; }

time为System.currentTimeMillis() + timeout

expirationInterval默认为ticktime

3.5)check session是干吗的
基本上所有的事务型操作,都会调用用来验证当前请求的session是否关闭,owner是否正确


4)小结
SessionTrackerImpl作为一个单独的线程专门处理过期session

SessionTrackerImpl有3个重要的数据结构sessionsById、sessionSets、sessionsWithTimeout,其中sessionsWithTimeout会被持久化

SessionTrackerImpl提供了几个常用的API

createSession

addSession

touchSession

removeSession

checkSession

setOwner

dumpSessions

相关文章
最新文章
热点推荐