etcdv3.6源码笔记----put流程
这边建议copy代码块到本地阅读,因为宽屏阅读体验感更好。
主记录,次分享,不美观
etcdserver.EtcdServer.Put #客户端调用Put grpc会走到这里,客户端都是通过调grpc来和服务器通信,不同的请求对应不同的rpc函数
#因为put会修改集群数据,所以写入数据库之前要走一遍raft流程来同步日志
etcdserver.EtcdServer.raftRequest(pb.InternalRaftRequest{Put: r}) #把普通请求封装到raft请求中,准备走raft流程
#这个Put:r表示把当前请求放到raft请求的Put字段,这个put字段属于raft请求的数据字段(data)
#当然,还有Delete/Range/Txn/Compact字段,
#但是同一个请求有且只有一个字段不为空即一个请求只做一件事
#raft同步日志过程中,只关心日志的元信息比如日志索引、任期,不关心不使用这个数据字段
#这个data字段有两个作用:
#1:持久化wal日志时作为日志的一部分被写入磁盘,从而做好了数据备份,
#当崩溃时数据不会丢失,重启时重放一遍日志就行
#2:apply的时候被解析,data字段是一个完整的请求,包含了数据和请求类型等元信息,
#apply阶段从data字段解析出请求r后就根据Put字段不为空从而判断出这是一个put请求,
#从而可以正确分发请求(调用不同函数)
#所以不管是什么请求,这个raft同步日志的流程都是通用的,因为raft流程完全不涉及data字段,
etcdserver.EtcdServer.raftRequestOnce #执行raft流程以及等待raft的结果
etcdserver.EtcdServer.processInternalRaftRequestOnce #干三件事:1:检测是否接受该请求。如果达到阈值就停止接受新请求
#2:准备好raft所需的日志数据(把客户端请求(put/delete/txn等)
#序列化以后封装到raft日志的data字段)
#3:进行propose然后等到其他线程返回propose结果。这个propose所干的事情就是同步日志
etcdserver.EtcdServer.getAppliedIndex #获取已经应用的日志的索引(即appliedIndex),应用成功表示数据已经写入数据库
etcdserver.EtcdServer.getCommittedIndex #获取已经提交日志的索引(即commitedIndex),提交成功表示该数据日志已经同步到其他节点了
#把一条数据写入etcdf服务器需要两步:
#1:同步日志到其他节点,一条日志包含两部分(日志元信息,原始请求);
#2:写数据到本机bbolt数据库,etcd底层用的是bbolt数据库存储数据
#(原始请求中包含数据)
#同步日志:即commit操作即把日志成功同步到过半etcd节点,同步成功后该日志就变成commited了
#写数据:即apply操作即成功把数据写入底层boltdb数据库
#!!!apply操作是各搞各的,leader与follower之间不会为此进行通信和同步,
#不用担心崩溃恢复的问题,原因如下:
#apply一条日志之前该日志必定是commited,即成功写入了本地磁盘并且同步到了过半节点上,
#所以崩溃后日志还是在的,数据还是在的
#崩溃重启时,只要从0开始重新apply一遍所有日志(相当于mysql redo),就能使数据库恢复到一个确定的状态
if appliedIndex+limits<commitedIndex #如果apply一条数据花费的时间长,那么就可能导致appliedIndex远远滞后于commitedIndex
#一旦超过一个阈值,就拒绝本次put即此时不再接收新请求
#每条日志都有一个索引表示这是第几条日志,etcd中如果index=x的索引还没处理完就不会处理index=x+1的日志
#整个日志索引空间分两大块:持久化了的和unstable,unstable表示这部分日志还在内存,断电会丢失
#日志索引分3大段,applied(已经可以访问即写到了数据库),
#commited(已经提交了日志但还不可以访问),uncommited(还未提交日志)
#applied:[0,x] commited:[0,z-1] uncommited: [z,+∞),x必定小于等于z-1即必须先提交后应用
#还可以分为stable(持久化到了磁盘),unstable:还在内存,断电丢失
#stable:[0,k-1],unstable:[k,+∞),如果allowUseUnstable(可配置,即允许commite和apply未持久化的日志)
#那么x/z都可以大于等于k,反之都必须小于k
#有个InProgress表示正在进行持久化的数据即inprogress之前的已经持久化了,
#之后的分两部分:正在持久化和还未开始持久化
#还有个applying表示正在应用中的日志即applying:[x+1,applying],applying要小于等于commited
#!!applying和commiting时日志可以还没有持久化(可配置allowUseUnstable),
#因为持久化和commited/apply操作都是异步的
#不过一般来说commitedIndex要小于unstable的第一条日志的索引(默认配置是这样)
#关于日志还有另一种分法,即snapshot,memtable,unstable
#snapshot[0,a]:这部分日志已经持久化了,并创建了对应的snapshot文件,
#该文件记录了当前的appliedIndex值即a,重启后会从a开始重放
#memtable[a+1,b]:这部分日志已经持久化了,但不属于快照,会在内存创建一个对应的数组储存
#ustable[b+1,+∞):这部分还没持久化
return #达到了处理阈值,所以直接拒绝新请求
idutil.Generator.Next #为该请求生成一个唯一id,会把这个id和一个chan对应起来,当其他线程处理完后会通过这个chan通知此线程
etcdserverpb.InternalRaftRequest.Marshal #序列化raft请求的数据部分,后续会被放到一个msg对象中,然后准备发往其他etcd节点
wait.list.Register #为前面生成的id生成一个chan,假设叫做ch,会这个ch加到的内部的map中即id->chan
#当前线程会阻塞在这个ch chan上以等待put结果,当put完成的时候其他线程会写这个chan来通知当前线程put完成
#收到结果后就返回给用户了
raft.node.Propose #调用node接口来通知底层的node进行propose,就是写一个chan,通知另一个线程进行propose,然后当前线程就返回
#就是写raft.node.propc chan这个来通知node.run线程来处理propose
#从收到一个请求到commited一条日志,包括两个阶段:propose和commited
#propose就是leader通知各个follower来了一条新日志,你们给我保存一下
#commitd就是leader检测到一些日志可以提交了,然后本地提交并发消息通知各个follower,可以提交这些日志了
raft.node.stepWait(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) #发送MsgProp消息,
#不管什么请求(put/delete/txn/compact等)
#都会作为data字段
raft.node.stepWithWaitOption #这个函数只处理Propose消息,对于其他类型的消息则直接发到recvc chan,由对应线程来处理,然后直接返回
if m.Type != pb.MsgProp:
case n.recvc <- m #非Propose消息直接转发
return
raft.node.propc <- msg(pb.MsgProp) #把包含了日志的Propse消息发往指定chan,即raft.node.run这个线程去异步propose
result <- #阻塞,直到当前线程收到node.run返回propose的结果
another thread 1 {
#线程1:即node.run函数,处理propose消息。run函数是一个死循环,
#每次循环可以做多件事,但本次只处理prose这个事件
#propose处理流程分2步:
#1:leader先在本机上把该日志持久化(实际是异步操作,只不过会默认持久化成功,然后更新相关变量);
#2:发送MsgApp消息通知follower节点保存该日志
#之所以本机上也做是因为leader本身也算作一个节点,最终投票时leader本身也要投一票,
#所以leader本身也要做follower所做的事
#向follower节点发送的MsgApp日志同步消息会放到发送缓冲区由其他线程发送,
#并乐观的更新leader看到的follower节点视图(即pr.next)
#即消息还没发给follower,就默认follower会成功。就算失败也没关系,
#因为检测的是pr.Match这个字段,如果指定时间没有过半节点
#的pr.match字段达到日志对应的索引,那么leader就知道这个失败了,就会进行相应处理
#补充一下:
#关于etcd中etcdserver、Node、raftNode、raft这四个结构体之间的关系如下:
#etcdserver结构体包含两部分:1:一堆收发消息的chan;2:一个raftNode,用来进行raft操作即同步日志
#为了解耦,etcd把这堆chan和这个raftNode封装到同一个node结构体中,然后再把这个node变量放到raftNode内部
#这样就相当于在node和raftNode之间搭建了一个双向通信的桥梁
#这样etcdserver写这些chan实际会透传到raftNode,raftNode写applyc,也可以透传给etcdserver,
#因为他们三个使用的chan实际上是同一个变量
#raftNode可以做多种事情比如持久化wal日志、和follower节点通信、通知上层apply
#node和raftNode共同组成raft层,raft层只做一件事:就是同步日志
#所以整个etcd又可以分为两大层:etcdserver层和raft层,etcdserver负责接收客户端请求,
#然后调用raft层同步日志后,etcdserver再把数据写入bbolt数据库
#node.run里面的死循环相当于raft的大脑即controller,raftNode.run里面的死循环相当于raft的worker
#node.run可以从多个来源检测数据是否ready,一旦ready就通知raftNode 数据ready,
#然后raftNode会根据rd的各个字段来进行不同的处理,比如发消息到peer、持久化wal日志和snap日志、通知readIndex
#举例:当raft.Node在收到server发来的请求同步的日志时,node就填充数据到ready中,然后通知raftNode 发起propose
#比如当raftNode处理完node发来的消息的时候就写相关chan,通知node当前操作已经处理完了
raft.node.run #在这个函数里会有一个死循环select这个raft.node.propc
for{
...略去其他代码,在another thread 2里会细讲...
select:
case <-raft.node.propc #当收到etcdserver发来的日志同步请求时执行这里
#做两件事:1:调用StepLeader完成propose;2:阻塞,直到收到propose的结果,然后把这个结果发给
raft.raft.Step #节点有三种角色(leader/candidate/follower),对应stepLeader/stepCandidate/stepFollower函数,根据角色调用对应的函数
#这里以leader为例,因为etcd只允许leader发起propose,所以当前节点必定是leader,如果是follow,那么会把propose请求转发给leader
raft.stepLeader: #leader propose主要执行两个操作:1:把日志写入unstabel;2:发送MsgApp消息给follower,日志会放在MsgApp消息中
case MsgProp: #这里处理propose消息,leader的日志追加操作对应MsgProp消息,而非leader的日志追加操作则对应MsgApp消息,因为只有leader才可以propose
raft.raft.appendEntry #本地添加日志,只是添加到内存,并不会立即持久化,是由另一个线程异步进行持久化
#!!!不用担心还没持久化就崩溃导致数据丢失。在关闭allowUseUnstable的情况下etcd只会commite已经持久化的日志,所有还在unstable
#中的日志必定是还没有持久化的,还没有持久化的日志必定不会commite和apply,所以必定不会返回成功的结果给客户端,所以客户端肯定也知道
#本次请求还没有成功,所以客户端可以选择稍后重试直到etcd集群恢复正常,所以断电以后,unstable中的日志丢失就丢失了,完全不用担心
raft.raftLog.lastIndex #获取最后一条日志的索引,然后本次新增的n条日志的索引就依次为lastIndex+1、lastIndex+2,...lastIndex+n
#可以把整个日志空间看成一个逻辑数组,第0个元素的日志索引就是0,第x条日志的索引就是x,每次追加日志都是追加到数组末尾
#然后unstable就是这个逻辑数组中位置x到+∞的这部分索引对应的日志,即unstable就是整个数组的后半段
raft.unstable.maybeLastIndex #返回unstanble的最后一条日志的index
return unstableOffset+len(unstableEntries) #offset表示unstable数组第一条日志在这个逻辑数组中的位置,index=unstable起始偏移量+unstable日志的条数
raft.raft.increaseUncommittedSize #计算未提交数据总大小,如果算上本次准备提交的日志数据字节数超过了系统允许的未提交数据的字节数
#那么就拒绝本次提交,以免未提交数据太多,一旦断电,影响太大
#比如由1G的数据还没提交,然后断电,丢失,又要花时间花资源重做这1G数据,代价太大
raft.raftlog.append #这里把日志添加到unstable数组的末尾,由其他线程异步持久化到磁盘
if after := ents[0].Index - 1; after < l.committed : #如果新增日志的索引号小于已提交最后一条日志的索引号,所以发生了不可修复错误,直接打印panic日志
log(panicLevel,xxx)
raft.unstable.truncateAndAppend #这里把日志添加到unstable数组的末尾
raft.raft.send(pb.MsgAppResp) #走到这里说明leader已经在本地把日志添加到了unstable,表示本机已经追加成功了,所以自己给自己发一个msgAppResp消息
#App为append的缩写,leader收到一条msgAppResp消息代表有一个节点日志追加成功
#raft本质就是一个状态机,然后不断在各节点间同步日志,同步日志就是一个追加操作
#leader节点也是集群的一份子,所以在一次propose中leader也需要做一遍和投一票
append(raft.msgAfterAppend) #发送给自己的msgAppResp消息会被放到msgAfterAppend数组中,
#msgAfterAppend中的对象在后面会转存到StepsOnAppend数组中
#StepsOnAppend用来追踪尚未持久化的数据,一旦持久化,就会清空StepsOnAppend中的对应数据
#在advance中处理StepsOnAppend
#node.run有一个死循环,每次循环开始都会把msgAfterAppend和msgs打包放到Ready结构中
raft.raft.bcastAppend #bcastAppend的意思是broadcastAppend,即广播需要追加的数据到所有peer节点即广播MsgApp消息
#bcastAppend发过去的消息要干两件事:第一件事是告知peer节点要追加哪些日志,第二件事是通知peer节点更新状态
#MsgApp消息包含两个部分,第一个部分是需要append的日志,第二个是peer节点的新状态(比如commitedIndex)
#leader拥有所有节点的视图(即progress对象),然后leader会更新这些progress对象
#更新完后通知peer节点把他们自己的状态更新到leader所看到的这个状态,比如leader的commited到达x了
#peer节点也需要把commited索引更新到x。
tracker.ProgressTracker.Visit #遍历所有节点,etcd用id来代表集群,visit接受一个lambda函数,该函数中如果目的节点是自己,那么就会忽略该节点
raft.raft.sendAppend #发送MsgApp发到指定节点
raft.raft.maybeSendAppend #!!!在发送给follower节点的MsgApp消息内还会带上leader视角下follower此刻的状态信息(任期、Match、Next)
#即leader认为follwer此刻应该提交到这里了,follower收到后就会按照leader的命令用这些信息更新本机的状态
#!!假设当前收到的请求对应的日志为x,这个maybesend并不是就发送这条日志x,相反的是maybesend函数并不关心当前收到的日志
#而是会检测状态,凡是满足要求的日志都会被发送,所以发来一条请求,收到一条日志,仅仅是激活send操作
#send操作发送哪些日志并不是由收到的日志决定,而是由当前状态决定
pr := tracker.ProgressTracker.Progress[to] #根据目的节点id获取目的节点的相关信息,pr表示leader视角下follower节点的状态
#pr即progress,即进度的意思,即leader视角下follower节点的日志复制进度
#pr.Match:表示follower节点leader节点之间日志的匹配索引
#即leader认为follower已经把leader上[0,pr.Match]内日志都复制到follower上
#pr.Next: 表示leader期望赋值给follower的下一条日志的索引
#pr即struct tracker.Progress,前面说的发送哪些日志,就是由这个progres状态变量来决定
raft.raftLog.entries(pr.Next) #获取leader上日志索引号大于pr.Next的所有日志,pr表示该某个follower节点上日志复制进度
#如果leader上存在日志索引号大于pr.Next的日志,说明leader有些日志该follower没有,即follower落后了
#所以这里就是一次性把follower缺少的日志都发送给follower,而不是仅仅发送收到的那条日志x
#所以这个mabesend叫做maybe,如果不存在就说明没有日志要发送,如果存在说明有日志要发送
#所以说日志x只是激活这个maybesend操作,而具体发送哪些日志由实际状态来决定
#当然,如果日志太多了,一次发送不了,那么就只会发送一部分,剩下的等到下一次maybesend被激活再来发送
tracker.Progress.UpdateOnEntriesSend #更新follower节点对应的pr对象的索引(即pr.Next)并添加到Inflights,inflights用来限制发消息速度
#这里在实际发送前就默认本次发给follower的MsgApp消息会被成功处理,所以不等结果就直接更新对象索引
#这是一种乐观的方法,先更新索引,如果失败了,后续再处理
tracker.Progress.OptimisticUpdate #更新pr.Next,即pr.Next=最后一条日志的Index+1,OptimisticUpdate是乐观更新的意思
#这里正常情况下pr.Next都等于leader lastIndex+1,如果follower落后于leader,那么next<lastIndex+1
#在follower报告给leader的heartbeat消息里,follower会发送自己的实际状态
#leader会据此更新pr.Next
pr.Next+=len(entries)
tracker.Inflights.Add #表示传输中的消息窗口,用于限制消息的数量和带宽,如果满了会检测到,从而导致本次发送取消。
raft.raft.send(pb.Message{ #把leader视角下follower的状态都填充到这个MsgApp消息里
To: to, #消息接收节点对应的id
Type: pb.MsgApp, #消息类型
Index: lastIndex, #接受此批日之前follower节点上最后一条日志的索引号应该为lastIndex
#如果对不上,follower就会忽略这套MsgApp消息
LogTerm: lastTerm, #接受此批日之前follower节点上最后一条日志对应的leader的任期应该为lastTerm
#如果对不上,follower就会忽略这套MsgApp消息
Entries: ents, #本次leader发送给follower的日志
Commit: r.raftLog.committed, #告诉follower,leader上的日志已经提交到这里了
}) #把leader视角下follower的状态都填充到这个MsgApp消息里
#给peer节点发送一条msgApp消息,节点收到一条msgapp消息表示收到leader节点发过来的一条日志同步消息
append(raft.msgs) #msgApp消息会放到msgs数组,会有另一个专门的发送线程不断轮询这个数组,如果发现msgs不为空就发送
#即node.run里有一个死循环,每次循环开始都会检测msgAfterAppend和msgs,有数据则表示ready
result <- res #走到这里表明propose流程已经结束,这里把propose结果发回给raft.node.stepWithWaitOption函数,唤醒并让他进入下一步
#propose只需要把对应日志加到本机的unstable数组以及把消息成功发出去就算成功,至于有没有过半节点成功则是下一步线程的事情
}
} //another thread 1
ch <- #走到这里还只是成功propose,但是数据还没有apply,所以此时还不能返回结果给客户端,需要阻塞到这里
#我们前面为每个请求都生成了一个 reqid->ch chan
#当该日志被成功apply的时候其他线程会填充这个ch来唤醒当前线程返回结果给客户端。
another thread 2 { #线程2:仍然是node.run函数,和上面的是同一个函数,但是这是下一次循环了,所以我这里就把笔记挪到后面来了
#这些代码原本是在select case <-raft.node.propc之前的
#线程2就是检测是否有数据redy了,如果有就把这些数据放到一个结构里,然后填充指定chan来通由数据到了
raft.node.run
for{
if raft.RawNode.HasReady #此函数是node.run用来检测是否有数据需要处理,如果有就会激活本次循环,如果无则本次循环会阻塞在某个chan上,数据有多种,来源也有多种
#0:msgs和msgAfterAppend是否为空,若不为空说明要处理,还有许多其他条件,如:
#1:软状态、硬状态是否和之前的状态一致?不一致则需要处理
#软状态包括当前leader编号、当前节点是什么角色这两个不会持久化道磁盘的信息
#硬状态b包括term、commitedIndex、vote,这个hard state会在持久化wal日志时一起写入磁盘
#2:是否还有未持久化的snap数据、日志数据?有则需要持久化到磁盘上,这些数据都是存放在unstable结构体中
#这里的snap数据是指follower严重落后,然后leader直接发一个备份数据snapshot过来
#然后follower收到snapshot后会把他放到usntable的snapshot变量里,表明这个备份数据还没有写到磁盘,写到磁盘后会置空这个变量
#4:是否有已提交但是还未applied且也不是正在applying的数据
#5:ReadStats数组是否为空?不为空则说明有ReadIndex请求已经有过半节点确认当前leader是有效地,可以唤醒该请求
#这个hasReady操作在node.run中是在每次循环开始的时候判断,因为上一次循环中处理MsgProp消息时写入了msgs和msgAfterAppen
#所以本轮循环,hasReady会返回true,这些数据会在本次循环中处理,因为我的笔记大致是按处理流程来记录的,所以这里我就把他的挪到这里来了
raft.RawNode.readyWithoutAccept #准备好本次要处理的数据,会把所有数据封装到一个raft.Ready结构中
#因为etcd所有操作基本都是异步的,所以同一时刻三种数据都会存在:
#unstable:需要持久化的数据、commited:已经提交的数据,需要apply、msgs:发往指定节点的消息(如heartbeat/MsgApp/MsgAppResp等)
#正因为如此,所以每次迭代时需要同时进行三种操作即持久化unstable、apply已经commited的数据、发送msgs到peers节点
#也就是说同一轮迭代可以处理不同的数据
append(messages,msgs) #1:把msgs和msgsAfterAppend统一打包到一个message数组中,统一发送
append(messages,msgsAfterAppend)
raft.raftlog.nextUnstableEnts #2:获取所有未持久化的日志,这些日志准备写入磁盘,一条日志包括日志元信息和数据,这个写操作就是持久化wal(预写日志)
raft.unstable.nextEntries
return unstable[inprogress+1:]#inprogress之前的数据表示之前已经在持久化过程中或者已经持久化了,只不过还没完成
raft.raftlog.nextCommittedEnts #3:获取一批已经commited,需要apply的日志
raft.raftlog.maxAppliableIndex #如果allowUseUnstable,那么可以允许提交未持久化的日志,反之不允许
hi := l.committed
if !allowUnstable { #如果配置不允许使用未提交的数据,则只允许提交已持久化的日志,offset-1表示最后一条持久化的日志的index
#如果不允许使用unstable的日志,那么这样就可以确保用户一旦收到写入完成的响应,那么数据对应的wal日志必定已经写入了磁盘
#此后即使服务器崩溃,数据不会丢失
#如果开启,就说明还没有持久化的日志可以被apply,这样断电以后,有可能数据丢失
hi = min(hi, l.unstable.offset-1)
}
raft.raftlog.slice(applying+1,hi+1,maxSize) #[applied,applying]区间内的数据已经在应用中了
#本次commite[applying+1,hi+1)之间的日志,至多处理到hi,并且至多处理maxSize字节
raft.raftlog.nextUnstableSnapshot #4:获取未持久化的leader发来的快照数据(即备份数据)
if raft.RawNode.asyncStorageWrites #如果开启异步写入磁盘即把消息发给专门的写线程由该线程去处理而不是立即写入磁盘,不过默认是没有开启的,可以忽略此处
raft.needStorageAppendMsg #如果存在需要写入磁盘的Append消息
raft.newStorageAppendMsg #构造一个消息,该消息的目的地设置为本地磁盘线程,当异步刷盘线程收到消息并刷盘成功后会给leader自己发送一个MsgStorageAppendResp
append(rd.Messages) #把消息添加到raft.Ready结构中,异步写入和立即写入都是把数据封装到一个消息中,两者的唯一区别只有消息目的地不同
select #这个select和上面的select case <-raft.node.propc是同一个select
case readyc <- rd: #当node处理好数据后就通知底层raftNode进行相应处理,因为底层raftnode会监听这个readyc
#上面填充了rd结构的msgs和msgAppAfter结构,这里把Ready结构填充到raft.node.readyc chan
#填充完毕后会通知raft.raftNode.start里的case rd := <-r.Ready(),通知他持久化日志、发送消息、通知etcdserver apply数据
raft.RawNode.acceptReady #此case主要就是一个发送的目的,一旦发送完成就设置一些回调函数,当收到ready完成的消息的时候就会触发这些回调
#这里回调函数的意思是先准备好消息,一旦收到处理完ready的消息就把这些消息发送给自己
if !asyncWrite: #leader也是一个节点,所以leader也需要完成本机的wal日志的持久化、日志追加、apply成功以后同样自己给自己分别返回一个
#MsgStorageAppendResp、MsgAppResp、MsgApplyResp
#MsgStorageAppendResp:m.Index+1表示在此之前的所有日志都已经刷到磁盘了,所以需要从unstable数组删除这部分日志,
#默认是每次都刷全部unstable日志,即在此之后unstable数组置为空
#MsgAppResp:表示日志已经成功追加,因为这三个消息都是在another thread 3里面调用advance通知以后在another thread 6里面执行
#在thread3里会把所有unstable日志刷盘,所以当处理MsgAppResp消息的时候本次put的数据对应的日志必定成功持久化到了磁盘(关闭allowUnstable的情况下),
#所以可以认为leader本次put的数据持久化成功,至于其他节点有没有成功,在处理其他节点发来的MsgAppResp时候通过Maybe处理
#MsgApplyResp表示日志成功写入bbolt数据库
#因为!asyncWrite表示同步,那么执行到后面case <-advance的时候日志的持久化、日志追加、apply这三个动作必定已经实现,所以这里直接添加消息
#如果开启异步写入,那么会有专门的写入线程来发送这三个消息。
#对于MsgAppResp消息,因为后面是通过消息来驱动的,所以即使本机还没有持久化完成,但是当leader收到其他节点发来的MsgAppResp消息时
#他会检测一下是否可以提交,如果过半就可以更新相关变量,无需等待leader自身完成
if raft.needStorageAppendRespMsg #如果此处有要append的数据就添加一个MsgStorageAppendResp消息
#表示当本机unstable数据成功刷盘后给自己发送一个MsgStorageAppendResp消息
#这里就是先准备好消息,一旦收到advance chan就知道这些操作已经完成了,然后就立即把个数组中的消息发送给自己
newStorageAppendRespMsg
append(stepsOnAdvance,MsgStorageAppendResp) # 把消息添加到stepsOnAdvance
}
if needStorageApplyRespMsg(rd) { #同上,一个是append,一个是apply
m := newStorageApplyRespMsg
append(stepsOnAdvance,MsgStorageApplyResp)
append(raft.RawNode.stepsOnAdvance,msgAfterAppend) #把前面自己发给自己的MsgAppResp消息转存到stepsOnAdvance
raft.raftlog.acceptUnstable #更新inprogress(一个索引值),即[unstable,inprogress-1]区间的数据都表示正在持久化过程中
raft.unstable.acceptInProgress
offsetInProgress=entryLastIndex+1 #即此次持久化的最后那条entry的index+1
raft.raftlog.acceptApplying #同理,更新索引applying的值
applying=commitedEntryLastIndex+1 #即此次准备apply的最后那条entry的index+1
}
}
another thread 3 {
#在thread2的select中我们通过case readyc<-rd 把准备好的ready结构填充到了raft.node.readyc这个chan,底层raftNode会监听这个chan
raft.raftNode.start #也会启动一个死循环,通过select从chan接受消息
for{
select
case rd := <-r.Ready() #rd不为空时,raftNode就知道node又发命令来了,要开始实际执行同步日志的操作了
#监听的是raft.node.readyc这个chan,上一步node.run的case readyc <- rd这个操作会激活这个case
#!!!因为传过来的rd中同时包含了unstable(准备持久化的数据)/commited(准备apply的数据/msg(准备发送的消息)
#所以这个case也会同时干这三件事sync(unstable)、apply(commited)、send(msg)
#这个case一共做五件事:
#1:检查软状态是否有改变(比如集群是不是有新leader了,集群是否有leader),如果有则调用相关函数
#2:如果readStateC数组不为空,则说明此刻appliedIndex大于等于这个readState所等待的commitedIndex,需要通知readyC
#3~5:sync(unstable)、apply(commited)、send(msg)
if rd.SoftState != nil: #1:检查集群leader
etcdserver.EtcdServer.setLeader #设置集群leader
func(){ #表示这里调用的是一个lambda函数
if leader=节点自己:
v3compactor.Periodic.Resume #如果leader是节点自己,那么重新开启compact
#因为compact会改变集群数据状态,所以etcd只有leader才可以发起compact,进行compact之前要先走raft同步一条compact日志
#compact日志中包含本次要要压缩哪些版本的数据
if leader!=节点自己:
v3compactor.Periodic.Pause #如果leader节点不是自己,那么就暂停compact
notify.Notifier.Notify #通知其他线程集群leader变了
#我们在并发进行线性读的时候因为要确认是否有过半节点有效,所以函数requestCurrentIndex会阻塞
#直到节点确认过半节点承认节点是当前集群的leader之后才会去读取当前的appliedInex
#如果requestCurrentIndex阻塞的时候收到集群leader改变的消息
#那么他就会立刻放弃所有此前还在等待的ReadIndex请求,然后返回错误
}
if len(rd.ReadStates) != 0 {
r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1] #2:如果readStattes不为空,说明此刻有ReadIndex准备好了,所以需要唤醒getCurrentIndex函数
etcdserver.updateCommittedIndex #设置server上的commitedIndex。etcdserver的commitedIndex变量取决于底层的raft日志的commitedIndex
#可以这样理解:server有一层状态,然后server底层的raft日志也有一层状态
#raft日志的commited表示日志已经同步到这里了,对于server来说,这些日志都是可以commited的
#即etcdserver的commitedIndex小于等于raft的commitedIndex
idx:=commitedEntryLastIndex #本次待apply的最后一条日志的index
etcdserver.EtcdServer.setCommittedIndex(idx) #设置server上的commitedIndex
raftNode.applyc <- toApply{commitedEntry,notifyc chan} #3:apply(commited)
#把准备apply的数据填充到apply chan中,
#raftNode写这个chan会直接透传给etcdserver
#etcdserver.run会监听这个applyc chan
#commitedEntry表示本次要apply的日志,notify用来同步etcdserver.run里的apply操作
#因为etcdserver.run里apply函数在成功写入bbolt数据库后会调用trrigerSnapshot做一个快照
#而本函数也会处理快照,所以为了安全,需要一个notify chan来同步
if isLeader: #4:把待发送的消息发送给follower节点(put流程中此处是MsgApp消息)
etcdserver.raftNode.processMessages #根据实际情况重新设置消息的某些字段,主要就是丢弃自己发给自己的消息
#r.msgs中的消息就是thread 1 中填充的准备发往peer节点的pb.MsgApp消息和发给自己的raftpb.MsgAppResp消息
#这里将MsgAppResp的目的地是自己的消息的目的地设置为0表示丢弃
rafthttp.Transport.Send #把r.msgs中的消息发送给peer节点
rafthttp.peer.Send #把msg写入一个chan,然后由监听这个chan的其他线程去异步发送请求给peer节点
writerch=rafthttp.peer.pick #获取对应的输出chan,支持 streamAppV2、streamMsg、pipeline
writerch<-msgs #把msgs填充到chan中,这个chan会有对应的线程x在监听,一旦收到chan就会把消息发往指定节点
#也就是说发送线程从raftnode.ready chan(实际是node.readyc)接受数据
#然后把收到的响应填充到raftNode.recv chan(实际是node.recvc)
if !raft.IsEmptySnap(rd.Snapshot)
storage.storage.SaveSnap(rd.Snapshot) #5.1:持久化快照点。如果rd的snap字段不为空说明leader发来了snap,需要对应的snap数据持久化到磁盘上,以供后续回放
#说明这个节点当前的角色为follower
#注意:这里说的snap就是本文后面apply完成之后triggersnap函数创建的snap日志文件和snap日志
#这个snap点相当于mysql里的checkpoint点,snapshot操作也是必须得leader发起
snap.Snapshotter.SaveSnap #保存snap的数据部分到一个单独的文件中,数据部分指的是集群状态、term、index,并不是bbolt数据库中的数据,别搞混了
wal.Wal.SaveSnapshot #保存一条snap日志到wal文件中,snap日志和其他raft日志都是一样的只是日志的类型不同而已
storage.storage.Save #5.2:刷盘wal日志,即持久化unstable部分对应的所有日志!!是所有usntable日志以及当前的硬状态(term/vote/commit)
#unstable之前的日志肯定也已经写入磁盘,即那些准备返回结果给用户的数据对应的日志肯定已经写入了磁盘
#因为etcd会限制未提交的数据大小,所以一旦请求太多,导致unstable的日志超过阈值,那么集群就会拒绝后续所有新增put请求
#当完成这一步的时候可以确保目前leader上所有unstable的日志都已经持久化到了磁盘,此后崩溃也不怕了
#leader发送MsgApp消息给follower节点和leader刷盘wal日志是一个并发操作
#刷盘的时候是按日志的index来刷盘的,即index=a的日志如果刷盘失败,那么index>a的日志肯定不会在存储中
#下面的为个人随笔,并非etcd的实现,:
#可以简单理解为一条日志刷入磁盘的日志状态有四种取值:prepare、commited、applied、abort
#prepare状态:这是初始状态,已经给follower节点发出MsgApp消息了,但是还没有收集到足够多的选票
#abort状态: 指定时间内没有收到足够多的选票、被其他节点拒绝了,那么就把prepare状态的日志标记为abort
#commited状态:收集到了足够多的选票,则从prepare转变为commited
#applied状态:commited状态的日志发布成功则变为applied状态
#突然断电后重启后通过重放日志来恢复:
#prepare:可以简单的选择直接丢弃或者重试,全在个人取舍
#abort状态:直接丢弃
#commited:再次apply就行
#applied:直接使用
#OLAP型数据库starrocks1.9版本的事务处理流程就是与上面的流程几乎一模一样
#但是etcd实际不是这么处理的,etcd判断一条日志是不是有效,是通过集群协商来获取的,而不是直接记录日志的状态
#崩溃重启后,etcd节点会获取到集群leader,然后以leader为准,对于index=a的日志,如果leader上有,且index、term都对的上,
#那么这条日志对于该etcd节点来说就是有效地,相当于上面所说的处于commited状态的日志,如果这条日志leader上没有或者冲突了,
#那么这条日志对于该节点来说就是prepare的,直接丢弃,etcd这里没有abort,因为所有日志都是以leader为准,比较霸道
#如果leader发现一条日志一直follower节点返回reject,那么该leader就会强制用自己的日志去覆盖该follower的日志
#如果leader发现一条日志一直收不到过半节点返回响应,如果是节点宕机或者网络出错,那么leader在heatbeat部分就会检测到该节点失效
#并且会标记该节点并返回失败
wal.Wal.Save #是storage.storage.Save函数里调用wal.Wal.Save
wal.Wal.saveEntry #保存日志,此处还没有强制刷盘
wal.Wal.saveState #保存硬状态,包括
#Term:表示当前的任期;Vote:表示当前节点最后投票给的候选人的ID;Commit:已经被大多数节点确认过的最高索引号。
sync=raft.MustSync #判断是否需要强制刷盘,如果写入的entry不为空或者硬状态不等于旧状态则必须强制刷盘
if curOff < SegmentSizeBytes: #如果一个段没有写满(默认64MB,不一定是64MB,因为最后一条日志大小不一定刚好凑满64MB,可能超出一点点)
if sync:
wal.Wal.sync #强制刷盘
wal.Wal.Cut #超过了指定的文件大小,所以先把文件切割成两部分(不会把一条日志保存到两个文件),然后分别刷盘
if !raft.IsEmptySnap(rd.Snapshot)
raft.MemoryStorageApplySnapshot #应用快照,就是按照发来的snap来设置memtable的相关字段
storage.storage.Release #释放掉旧的snap文件的锁,以便purgeFile线程可以异步删除过时的文件。之前加锁是为了防止文件被其他人其他线程意外删除
raft.MemoryStorage.Append #把日志条目追加到memoryTable中,此时已经写好wal了,所以断电也不怕丢失
#wal日志会被最新的快照点分为两部分,快照点之前的日志都是保存在磁盘上,快照点之后的所有日志,etcd还会把他加载到内存中
notifyc <- struct{}{} #这个notifyc是上面toApply chan内部的一个chan,etcdserver.run在处理applyc chan过程中会等待这个notifyc
#是snap相关
raft.node.Advance #发消息通知道node.run可以进行下一步了,处理完本次ready后通知node往下走,
#主要是处理stepsOnApp中leader自己发给自己的的MsgAppResp消息
raft.node.advancec <- struct{}{} #node.run会监听这个advancec chan
}
}
another thread 4 {
#前面说过,整个etcd分为两大部分,etcdserver层和raft层,这两层之间通过chan来通信
#etcdserver写数据之前会先通过raft层同步一下日志,当raft层同步完时,
#raftNode会写这个applyc chan,然后这个applyc会透传给etcdserver,
#当etcdserver收到消息的时候就知道这些数据对应的日志已经同步并且持久化了,所以etcdserver下一步就是把数据写入数据库
#本线程就是负责把底层raftNode发来的可以apply的日志应用到etcdserver即提交到bolt数据库。
etcdserver.EtcdServer.run
for
select:
case ap <- applyc
schedule.NewJob #把applyAll封装到一个job里面,然后丢到异步fifo队列去执行,
#注意,fifo是先进先出的意思,fifo队列会等到事务x-1完成才会开始事务x
#这样就保证了一定是按日志顺序apply的
#一个事务对应一个appliedIndex,所以代码里也是用revision来作为事务id。
#另一个方面,之所以开一个异步队列是因为applyc chan的容量只有1,
#如果顺序执行如果一直本次apply非常慢,那么就会导致无法接受新的apply请求,从而导致发送方也阻塞,然后整个系统都阻塞了
schedule.Schedule.schedule #把这个job丢到一个fifo队列里,然后一个work线程不断从fifo队列取元素,这里略,这里直接就转到applyAll函数
etcdserver.EtcdServer.applyAll #干两件事:1:把数据写入bbolt; 2:写reqid对应的chan从而通知etcdserver 本次客户端请求已经完成了,可以返回了
#!!!一路都是串行的,即直到把数据写入bbolt数据库才会返回,即上一个事物还没结束,下一个事务必定不会开始
etcdserver.EtcdServer.applySnapshot #应用snapshot。就是把相关变量重置为snapshot文件中指定的相关信息
storage.OpenSnapshotBackend #打开指定目录下的xx.snap.db文件,这个是真的备份文件,
#etcdctl --endpoints xx snapshot save命令会从--endpoints指定的server上拉取数据并保存到本地
#当使用etcdctl snapshot restore的时候就会去读取指定的文件然后从该文件恢复数据库
#更多详情见snapshot那一章
mvcc.watchableStore.Restore #读取指定文件(xx.snap.db),然后恢复数据,略
etcdserver.EtcdServer.applyEntries #根据日志,把结果写入数据库
if firsti > ep.appliedi+1 #必须按日志顺序apply,如果x-1这条日志还没有applied,那么x就不能apply
#ep.applied表示leader上applied的最后一条日志,firsti表示本次准备apply的第一条日志的索引
panic
curRev=etcdserver.Etcdserver.apply #应用数据,也就是把数据成功写入etcd,
#当写入一条数据后会更新当前集群的revision字段,
#apply这些操作是每个节点自己进行的,无需同步,因为日志已经同步好了,
#按顺序apply就可以保证不会出差错。revision字段是该节点上所有操作都共用的一个字段
#即下一个事务的revison就等于curRev+1
cindex.consistentIndex.SetConsistentApplyingIndex #设置底层一致性存储的applyingIndex,这一块还不懂,之所以还搞一个这个,看代码注释说是为了安全
case raftpb.EntryNormal: #put数据对应的entry是EntryNormal,put完之后还要更新applied索引
etcdserver.EtcdServer.applyEntryNormal: #干两件事:apply一条日志;通过ch<-x通知其他线程本次put操作已结束
etcdserver.EtcdServer.applyInternalRaftRequest
apply.uberApplier.Apply #执行链CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend
apply.authApplierV3.Apply #apply之前判断一下是否允许apply
applierV3backend.Apply
apply.uberApplier.dispatch #根据req类型来调用对应的方法
#!!!raft层的作用只是同步一下日志,会把具体的请求内容封装在日志的data字段
#因为raft层只同步日志,完全不关心data字段
case r.Put!=nil: #put请求则调用对应的put方法,执行链 auth->quota->backend
#raft请求里面有多个指针比如put/delete/txn等,这些指针有且只有一个不为空
apply.authApplierV3.Put #判断是否允许put
apply.quotaApplierV3.Put
storage.BackendQuota.Available #检查磁盘是否还有足够空间写入数据,backend可以简单理解为磁盘
apply.applierV3backend.Put
txn.Put
leases.lessor.Lookup #如果key提前设置了租约,则租约必须存在,如果没有,则忽略
mvcc.watchableStore.Write #创建Mvcc Txn Writer
txn.put #mvcc put,这是通用流程,txn里面的put后面也会转到这里
mvcc.metricsTxnWrite.Put
mvcc.storeTxnWrite.Put #mvcc操作
mvcc.storeTxnWrite.put #干两件事:1:把数据写入bbolt;2:更新treeIndex
rev := tw.beginRev + 1 #rev即revision
#etcd有个版本概念的:revision(修订版本号)
#revision是相对于整个集群的,即每次put(假设一个事务只包含一次put)操作都是一个全新的revision版本
#revision有两个字段,main和sub,main表示事务操作的主版本号
#同一事务中发生变更的key共享同一main版本号,sub表示同一事务中发生变更的次数
#举个例子:假设自启动以来进行了三次put,分别是put a 1和put b 2以及put a 3
#那么put a 1 时(main=1)
#那么put b 2 时(main=2)
#那么put a 3 时(main=3)
#put b时 b的版本是2,不是1,因为每次put操作都算一个全新的版本
#版本号虽然单调递增但是是64位,目前是不可能用完的
#注意:别把日志索引(index)和数据版本(revision)混起来了
backend.batchTxBuffered.UnsafeSeqPut #写入数据库
#key=(revision),value=(key-value,someRevisionInfo
backend.batchTxBuffered.UnsafeSeqPut
backend.batchTx.UnsafeSeqPut
backend.batchTx.unsafePut #把数据存入bbolt
bbolt.Tx.Bucket #获取对应的桶
bbolt.Bucket.Put #把数据存放到桶里
backend.txWriteBuffer.putSeq #还没看懂,不过不是在这里写入,貌似和查询有关
mvcc.treeIndex.Put #把(key,revision)写入内存里的treeIndex,key=key,value=[]generation{revision},
#保存了所有历史版本
#这个treeindex是启动时重建的,etcd会把所有key都放到内存,这限制了etcd支持的数据大小
#generation是代的意思,一个key从创建到删除是一个generation
#generation记录key从创建到删除中间所有变更的版本信息。
#当key被删除后再次被操作,会创建新的generation来记录版本信息。
wait.list.Trigger #数据已经写入数据库了即apply成功,所以这里把applyInternalRaftRequest的结果写入reqid对应的chan,
#从而通知processInternalRaftRequestOnce已完成put操作
#processInternalRaftRequestOnce收到结果后就返回给用户了
ch <- x
close(ch) #这里是关闭chan
etcdserver.EtcdServer.setAppliedIndex #更新apply索引(在etcdserver.EtcdServer.applyEntryNormal之后)
etcdserver.EtcdServer.setTerm #更新任期
backend.VerifyBackendConsistency #验证后端存储一致性,暂时忽略
wait.timeList.Trigger(appliedIndex) #唤醒所有阻塞在appliedIndex之前的读请求。
#etcd ReadIndex(线性读):当读请求到来时会记录下此刻的commitedIndex值,并保存到confirmIndex变量中,
#只有当appliedIndex>=confirmIndex的时候读请求才会解除阻塞,才会去数据库读数据,
#这里已经apply到appliedIndex了,所以在此之前的读请求都可以被唤醒了
<- apply.notifyc #在上一步所有wal日志刷盘以后才会通知etcdserver进行快照
etcdserver.EtcdServer.triggerSnapshot #创建快照。v3版本创建快照只是在member/snap目录下创建一个xx.snap快照文件,不是真的数据快照,而是集群状态快照。
#存在的意义就是记录一个checkpoint点,这个checkpoint点包括当前得集群成员信息,以及一个term,一个index
#index的意思就是标记一下当前已经应用到了这条日志,重启的时候直接从这条日志开始重做
#在etcdserver.EtcdServer.run里会初始化applied为快照中记录的index,如果没有快照,那么就从0重新开始
#!!!v3版本不会对数据做快照处理!!因为etcd v3不再需要快照。etcd v3会把每一次写操作的结果都落到数据库里
#而数据库里的数据的格式为key=revesion,value=(elemkey,elemvalue),也就是说数据库里记录了所有版本的数据
#相当于数据库里保存了所有数据相关的wal日志
#v2版本的snapshot操作被v3版本的compact操作(即对于同一个elemkey,只保留最新版本)所替代,所以v3版本代码里
#triggersnapshot操作只持久化当前集群状态,不再转储数据,即v3版本里,triggersnapshot相当于一个checkpoint点
#如果要转储数据,应该用etcdctl snapshot,这个命令才会真正地对数据做转储,
#这个转储操作也是直接去读这个bbolt数据库,不需要读wal日志,因为bbolt数据库里保存了所有版本数据
#当其他节点以snapshot启动的时候也是直接把这个snapshot文件里的数据直接放到bbolt数据库里面
#重放wal日志的目的在哪里?
#这是因为之前之前的数据库里保存的是值,没有保存版本号,所以你不知道数据库里的数据应用到哪里了
#你只能先创建一个快照,表示数据库应用到这里了,然后再去重放wal日志
#比如说你再index=10的时候创建了一个快照,表示数据库至少apply到了index=10这条日志,假设有一条数据x=998
#你不知道x=998这条数据是哪条日志写的,所以只能用一个保险的办法,从index=10这里开始重放所有日志,
#重放完所有日志后就能使数据库到达一个确定的状态,重放后x=xxx便是一个确定的状态
#但是v3不同,v3保存了所有版本,我们读出v3数据库里数据的最大版本号x,然后我们就知道数据库已经apply到版本号x了
#节点重启肯定会加入集群,加入集群那么日志就要和leader同步,而不是自己重放
#因为有些日志可能被leader覆盖
#假设wal中文件中包含索引为[1,2,3,4,5,6]的日志其中[1,2]放在wal文件1,[3,4]放在wal文件2,[5,6]放在文件3
#然后已经把索引为[1,2,3,4,5]的日志都应用到db文件中了
#如果我们在应用完索引=4的日志后创建一个xx.snap文件,其中记录一条信息,表示我们的数据库里的数据已经应用到了4这里
#也就是说4及4之前的日志都可以不要,也就是说我们可以把这些日志删除,比如我们可以立即删除文件1和文件2
#这样就可以减少wal日志文件的总大小,所以etcd专门开了一个purgeFile线程,去定时删除这些不需要的wal文件,
#删除原理就是根据xx.snap文件判断哪些wal文件是可以删除的。
etcdserver.EtcdServer.shouldSnapshot #如果当前最新的appliedIndex减去当前快照的最后一条日志的Index超过了阈值就触发快照,否则跳过,默认是1w条
etcdserver.EtcdServer.snapshot #创建快照
etcdserver.GetMembershipInfoInV2Format #获取成员信息
mvcc.store.Commit #提交当前的所有事务
s.mu.Lock #暂不清楚为什么这里提交要加写锁,而batchTx对应的异步定时提交线程并不要求加写锁
#我猜测是因为这里的快照的意思是到当前位置所有事务都要完成之后才能创建快照
#因为所有新事务都会加读锁即调用s.mu.RLock,这里加写锁,就阻止了一边提交事务一边不断到来写事务
go func(){
raft.MemoryStorage.CreateSnapshot #更新memtable里的snapmetadata
storage.storage.SaveSnap #把集群状态相关信息(成员、term、index)写入一个新创建的xx.snap文件,同时在wal日志中写一条snapshot的日志
#之后由异步的purge线程在下一次启动的时候根据这个xx.snap文件中的配置信息去执行删除操作
#(比如把所有index<x的wal日志文件都删掉)
#同时如果节点重启,那么节点会把applideIndex设置为最新的快照中的index,然后从此处开始重新apply日志
storage.storage.Release #清理member/snap目录下过时的以.snap.db结尾的文件
wal.Wal.ReleaseLockTo #释放过时的wal文件的锁,否则这些文件会被etcd锁住,purgeFile线程无法打开
snap.Snapshotter.ReleaseSnapDBs #删除掉member/snap目录下以.snap.db结尾的文件(因为我们已经在applyEntries之前applySnapShot过了,所以可以删除)
}
}
another thread 5{ #处理follower节点发过来的消息,put流程则对应MsgAppResp即leader发MsgApp给follwer,follower返回MsgAppResp
raft.node.run
for{
select
case m := <-n.recvc: #我们通过bcastAppend消息给peer节点的时候是把消息写到一个chan,由发送线程异步发送
#当发送线程收到响应的时候就会把消息填充到n.recvc chan
raft.raft.Step #下面的流程和thread 6里面的流程一模一样,详细注释就写在thread6里面了。
#threa5和thread6的区别在于1:thread5的MsgAppResp是其他节点发过来的,而thread6的MsgAppResp是leader自己发给自己的
raft.stepLeader
select
case pb.MsgAppResp:
if tracker.Progress.MaybeUpdate
raft.raft.maybeCommit
tracker.ProgressTracker.Committed
raft.raftlog.maybeCommit #尝试提交日志即更新commitedIndex变量
raft.raftlog.commitTo
raft.raft.bcastAppend #向其他节点广播最新的commitedIndex
}
}
another thread 6{ #node.run里处理advance
raft.node.run
for{
select
case <- advancec: #node检测到Ready的时候就会通知raftNode干事,当raftNode干完以后就会通知node可以走下一步了
raft.RawNode.Advance #遍历raft.RawNode.stepsOnAdvance中的所有msg,对每个msg调用raft.raft.Step
raft.raft.Step
raft.stepLeader
select
case pb.MsgAppResp: #处理日志append成功的消息,主要操作是修改leader的commitedIndex
if tracker.Progress.MaybeUpdate #判断的同时更新消息来源节点对应的progress对象的Match和Next字段
#m.index表示该条MsgAppResp消息中日志的索引
#MsgAppResp表示成功追加了一条日志,而raft不允许日志空洞,
#因此收到一条日志索引字段为10的MsgAppResp消息就表明10之前的所有日志,
#该节点上都有,所以直接把令pr.Match=m.index,pr.Next=m.index+1
raft.raft.maybeCommit #判断是否可以进行一次提交操作,即是否有过半的节点已经成功追加日志
#节点对应的progress对象的Match字段用来表示该节点上日志已经复制到哪个位置了(关闭allowUnstable时match<unstable.offset)
#收到一个MsgAppResp消息的时候我们在MaybeUpdate里会更新消息来源节点的progress对象的Match字段
#就是说leader有一个progerss数组,然后每收到一个MsgAppResp消息就通过MaybeUpdate更新一个pregress对象的Match
#更新完之后就调用maybeCommite判断一次,判断是否有过半节点的progress对象的Match字段已经大于等于leader的commitedIndex
#就是先对所有match字段升序排序,然后取n - (n/2 + 1)这个位置的match,如果大于leader的commitedIndex说明已经有过半节点成功追加
#日志(即[commitedIndex,Matchs[n-(n/2+1)]这个区间内的日志是新增的可以commited的日志)
#etcd他是按顺序把日志写入wal文件,wal文件中日志的状态是不知道的,所以etcd是通过一些索引变量来判断该日志是什么状态的
#这些索引变量就是applied/commited/unstable,如果一条日志的索引即index>commited就说明该日志还没有commited
#因为wal日志已经持久化了,就算崩溃了也没事,因为wal日志中每条记录都包含了(term/index/type/data四个字段)
#集群启动的时候,applied/unstable这些变量的值都是可以自动生成
#commited会作为硬状态的一部分写入磁盘
tracker.ProgressTracker.Committed #获取排序后n-(n/2+1)这个位置的progrees对象的match字段
raft.raftlog.maybeCommit #比较一下中间那个match是否大于当前leader的commitedIndex,如果大于就说明有日志可以提交
raft.raftlog.commitTo #把leader节点的commiteIndex更新到n-(n/2+1)这个位置对应的match值
raft.raft.bcastAppend #这次会发一条不带任何日志的MsgApp消息给所有其他节点通知他们已经提交到这里了
#bcastAppend发过去的MsgApp消息两部分:待append的日志和peer节点新状态
#因为代码是通过MsgAppResp消息来激活检测,而通过maybeCommite中检测时是直接检测所有集群的日志提交情况
#并不是检测日志是否更新到了MsgAppResp中的日志所对应的index,即收到MsgAppResp只是告知程序需要检测一下集群提交状态
#如果有新日志可以提交,就通知所有节点更新commitedIndex
#比如三个节点的集群(leader,follwer1,follwer2),commitedindex分别为(x,x,x),然后经过一段时间运行
#三个节点的日志分别已经追加到了(x+3,x+1,x+2),此时收到了一个迟来的MsgAppResp,这个MsgAppResp中的消息对应的日志索引为x-1,
#然后这个消息就会激活检测,leader检测到此时x+2及x+2以前的日志都是可以提交的((n-(n/2+1)=1,排序后match[1]=x+2)
#leader就会通知他们把commitedIndex更新到x+2,更新完以后commitedIndex分别是是(x+2,x+1,x+2)
#因为过半就行,所以leader和follower2的commitedIndex都可以更新到x+2,而follower1的日志最多只能到x+1,所以只更新到x+1
#这种情况就是follower1滞后了,没关系,后面赶上就行了,目前集群还是有过半节点正常的,可以正常运行
case pb.MsgStorageAppendMsg: #处理日志追加成功即wal日志持久化成功,就是更新leader的unstable.offset和缩减unstable数组
raft.raftlog.stableTo(m.Index)
raft.unstable.stableTo
unstable.offset=m.Index+1
raft.unstable.shrinkEntriesArray #把unstable数组中日志索引在m.index+1之前的所有日志都删掉(copy到一个新的数组)
#因为这部分已经持久化到磁盘上了
case pb.MsgStorageApplyMsg: #处理日志apply成功,主要是是修改appliedIndex变量(这些变量都可以重启后重建,无需持久化)
raft.raftlog.appliedTo(max(applied,m.Index)) #更新applied索引,applied表示当前applied值,哪个大就取哪个值
}
}
<-ch #等待put操作完成,在上面wait.list.Trigger中会填充这个chan,这个chan就是前面和reqid对应的那个chan,然后返回结果给客户端
follower:
stepFollower
case: pb.MsgApp #leader发来的MsgApp消息
raft.raft.handleAppendEntries #添加日志到unstable,并更新commitedIndex
if m.index<commited #如果是已经提交的数据则直接返回最新的的已提交数据的索引
raft.raft.send(pb.MsgAppResp,commited) #告知leader处理完了
else
raft.log.maybeAppend #如果是新数据则追加到unstable,并更新commitedIndex,注意,此时还没有持久化wal日志到磁盘,断电可能丢失,不过没关系,raft协议
#允许follower节点还没有持久化wal日志到磁盘就更新commitedIndex,但是当关闭allowUnstable时,leader不能还没持久化就更新commitedIndex
#leader发过来的commtiedIndex对应的日志必定是leader上已经持久化了的日志,这样就至少有一个副本持久化了到磁盘
#不过如果此时leader崩溃了,并且follower节点也同时崩溃了,所有follower都恰好没持久化该日志
#如果该leader节点不再重启,那么整个集群就永久的丢失了该日志,也就是数据丢失了
#速度与一致性的取舍吧(频繁刷盘性能肯定低,而且也导致该日志同步延时长)
#leader只会控制follower的commitedIndex,其他比如wal日志刷盘、apply数据,这些操作都是follower自动进行的,无需leader参与
#leader除了commitedIndex外还会控制follower节点对外提供的ReadIndex,即client不会通过follower节点看到leader还没批准的数据
#在没有配置read-replica(允许读请求不经过leader)的时候,follower节点收到读请求的时候会去询问leader当前数据是否是最新的,反之直接在follower上读
#也就是说follower节点当前的readIndex一定是leader提供的,一定是有效地,但不一定保证是最新的,不是最新的就表示可能有安全隐患
#速度和一致性的取舍吧(不经过leader可以少一次网络请求)
raft.raftlog.Append #追加到unstable
raft.raftlog.commitTo(min(m.commitedIndex,newIndex)) #m.commited是消息中代表的leader已经要求提交到这个位置,newIndex加上新增日志后本地日志索引
#因为m.commited的值是leader节点上follower节点的progress对象的match值,
#这个值是过半节点都具有的日志的最大索引值,这行代码的意思就是取leader和follower都有的日志的commited索引,
#如果m.commited<newIndex表明follower日志多于commited,那么此follower的commitedIndex就设置为m.commited
#如果newIndex<m.commited表明leader日志多于follower,那么只能设置为follower日志最大值
#注意,此时日志还只是放到了unstable,可能还没有持久化,如果断电,那么这些日志会丢失,不过没关系
#假设[x,y,z]是本次append(即追加)的日志,假设进行完所有操作后,本follower节点的commitdIndex设置为z
#然后x、y日志已经由另一个线程持久化了,然后此follower节点突然断电,导致z还未持久化就丢失了,此时其他节点是正常运行
#因为wal日志才是核心,而commiteIndex等索引值都是计算出来的,所以当此follower节点重新上线的时候就走一个崩溃恢复流程
#我猜的,还未验证,因为treeIndex和崩溃恢复那一块逻辑还没看:此follower节点启动后查看日志得到commitedIndex=y
#(commitedIndex会作为硬状态的一部分写入磁盘)
#然后会在发给leader的心跳信息中包含本节点的最新状态:commitedIndex=y,leader一看,就知道此节点落后了,
#然后就启动一个追赶流程,把他缺少的日志发送给他(即z日志)
#因为对于leader来说,在关闭allowUseUnstable的情况下
raft.raft.send(pb.MsgAppResp,commited) #告知leader处理完了
一些杂记:
笔记:apply索引更新后数据就可以查
笔记:case pb.MsgStorageAppendResp:
处理异步写入
会创建streamReader,会把node的recev、propoc都存到streamReader,当读到数据的时候就会自动填充到该chan
match:先更新follower,然后再更新leader
stepsOnAdvance只有leader在用
maybecommited用来更新和提交leader上的日志信息
apply应该就是自己要做的事情了,只要已经commited,那么它就会自动去apply,从而不需要受leader控制
stepFollower会忽略掉MsgAppResp消息
raftindex是已提交的
put的时候消息类型是prop
主从之间可以通过heartbeat消息来同步index
日志没有持久化之前都不算成功
go run ./etcdctl/main.go --endpoints node1_IP:2379,node2_IP:2379 endpoint status --write-out=table #查看各节点状态
go run ./etcdctl/main.go --endpoints node1_IP:2379 put x z
这段代码是 etcd 中的 Progress 结构体的定义,它表示了领导者视角下跟随者节点的进度。以下是结构体中各字段的解释:
Match:表示领导者和跟随者之间日志的匹配索引,即领导者认为跟随者已经复制了的最高日志条目的索引。
Next:表示领导者预期发送给跟随者的下一个日志条目的索引。当领导者向跟随者发送日志条目时,会以 Next 作为起始索引。
State:表示领导者与跟随者之间的交互状态。有三种状态: StateProbe:在此状态下,领导者每个心跳间隔最多只发送一次复制消息,用于探测跟随者的实际进度。 StateReplicate:在此状态下,领导者会在发送复制消息后乐观地增加 Next 到最新发送的日志条目的索引,以快速复制日志条目给跟随者。
StateSnapshot:在此状态下,领导者已经发送了快照给跟随者,并停止发送任何复制消息。这个snap只是一个类似checkpoint的文件,真正的快照文件通过etcdctl snapshot save复制或者手动复制
PendingSnapshot:在 StateSnapshot 状态下使用,跟踪领导者在确定需要快照时的最后一个索引。当有待处理的快照时,复制到跟随者的操作会暂停。
RecentActive:表示进度是否最近活跃。如果收到来自对应跟随者的任何消息,则表示进度是活跃的。选举超时后,RecentActive 可以重置为 false。
MsgAppFlowPaused:表示是否暂停了向节点发送 MsgApp 流。在 StateProbe 或 StateReplicate 且 Inflights 饱和时,会暂停 MsgApp 流。
Inflights:表示传输中的消息窗口,用于限制消息的数量和带宽。每个传输中的消息可能包含一个或多个日志条目。当 Inflights 饱和时,不应发送更多消息。领导者发送消息时,应将最后一个条目的索引添加到 Inflights 中。当领导者收到回复时,应该通过调用 inflights.FreeLE 方法释放之前的传输中的消息。
IsLearner:表示是否跟踪了一个学习者节点。
Progress 结构体主要用于跟踪领导者与跟随者之间的状态以及复制日志的进度,确保复制过程的有效进行,并根据不同状态进行相应的操作。 `
启动代码,通过xx.snap文件中的元信息来确定appliedIndex,在此之后的日志都会重做?
ReadIndex
崩溃恢复
!!会保存commited
因为channel都是no-buff的,所以多个函数同时调用的话,是串行的、并且是1-1对应
Raft协议日志条目本身存在index且全局单调递增,可以通过index判断WAL是否已经执行过了,但还不够安全,etcd引入consistent index来存储已经执行过的日志条目索引,实现幂等