【MIT6.824】lab2C-persistence, lab2D-log compaction 实现笔记

引言

lab2C的实验要求如下

Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or “serialize”) the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder; see the comments in persist() and readPersist(). labgob is like Go’s gob encoder but prints error messages if you try to encode structures with lower-case field names. For now, pass nil as the second argument to persister.Save(). Insert calls to persist() at the points where your implementation changes persistent state. Once you’ve done this, and if the rest of your implementation is correct, you should pass all of the 2C tests.

lab2D的实验要求如下

Implement Snapshot() and the InstallSnapshot RPC, as well as the changes to Raft to support these (e.g, operation with a trimmed log). Your solution is complete when it passes the 2D tests (and all the previous Lab 2 tests).

总体而言, lab2C需要我们实现关键数据的持久化,lab2D需要我们通过快照实现日志的压缩。代码可以在https://github.com/slipegg/MIT6.824中得到。所有代码均通过了1千次的测试。

lab2C 实现

在实验时测试2C时,测试代码将会尝试将某些节点从网络中断开,然后一段时间后再依据这些断开的节点的持久化的信息重新生成一个新的节点并加入到网络中,测试代码将会检测加入这个节点后是否与预期相同。

在初始化节点的时候,会传入一个Persister对象,这个对象充当一个硬盘的角色,用于持久化数据,后续在测试重新生成节点时,就需要传入旧节点的Persister对象,以便新节点能够从硬盘中读取旧节点的数据进行复原。

参考raft论文,我们需要持久化的数据有:

  • currentTerm
  • votedFor
  • log entries

在raft.go中,我们需要实现persist和readPersist函数,用于持久化和读取数据。

// persist saves Raft's persistent state to stable storage,
func (rf *Raft) persist() {
	rf.persister.Save(rf.encodeState(), rf.persister.snapshot)
}

func (rf *Raft) encodeState() []byte {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.logs)
	return w.Bytes()
}
// readPersist restores previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	var currentTerm int
	var votedFor int
	var logs []LogEntry
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	if d.Decode(&currentTerm) != nil ||
		d.Decode(&votedFor) != nil ||
		d.Decode(&logs) != nil {
		Debug(dError, "S%v failed to read persist", rf.me)
	} else {
		Debug(dInfo, "S%v read persist successfully", rf.me)
		rf.currentTerm = currentTerm
		rf.votedFor = votedFor
		rf.logs = logs
		rf.lastApplied = rf.getFirstIndex()
		rf.commitIndex = rf.getFirstIndex()
	}
}

然后我们需要在每次修改了持久化数据的地方调用persist函数,然后在初始化节点时调用readPersist函数来读取持久化数据,整体难度不大。

lab2D 实现

在实验时测试2D时,测试代码在接收到apply的命令id为9结尾时,就会调用节点的Snapshot函数进行快照,将日志压缩。代码需要做到在压缩日志后,仍然能够准确地运行。

首先需要完成快照生成的函数,如下所示,每次会传入需要快照到的日志index,以及当这个节点为止的状态机的快照数据,系统保证传入的日志index一定是已经apply过的。由于已经将状态机的内容放入到了snapshot中,所以其实包括index在内的前面的所有日志都可以删除了,但是由于在同步日志信息时,需要上一个日志的term信息,所以我们会单独保留id为index的日志的id和term信息,放在logs的第一位。

func (rf *Raft) Snapshot(index int, snapshot []byte) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if index <= rf.getFirstIndex() {
		Debug(dSnap, "S%v ignores the snapshot request with end index %v, because the index is not bigger than the first index %v", rf.me, index, rf.getFirstIndex())
		return
	}

	rf.logs = append([]LogEntry{{index, rf.logs[index-rf.getFirstIndex()].Term, nil}}, rf.logs[index-rf.getFirstIndex()+1:]...)
	rf.persister.Save(rf.encodeState(), snapshot)
	Debug(dSnap, "S%v applies the snapshot with end index %v, now the len(logs)=%v", rf.me, index, len(rf.logs))
}

由于快照的引入,现在logs中的第一个日志可能不再是0了,所以之前代码中所有从logs中依据日志index获取日志的代码都要修改为:rf.logs[index-rf.getFirstIndex()]

同时快照的引入还会导致在leader与follower进行日志同步时,需要的同步的日志可能已经没有了,所以这时候需要直接将整个日志发送给对方。

需要发送的快照请求如下:

func (rf *Raft) genInstallSnapshotRequest() *InstallSnapshotRequest {
	return &InstallSnapshotRequest{
		Term:             rf.currentTerm,
		LeaderId:         rf.me,
		LastIncludeIndex: rf.getFirstIndex(),
		LastIncludeTerm:  rf.logs[0].Term,
		Data:             rf.persister.ReadSnapshot(),
	}
}

follower接收到快照请求后,需要进行如下处理,主要就是检查这个快照有没有过期,是不是真的比自己当前commit的日志还要新,如果是的话,就将自己的日志全部删除,只保留快照中给的最后一个日志,作为logs中的第一个日志,然后再唤起applyCond进行快照的apply。

func (rf *Raft) InstallSnapshot(request *InstallSnapshotRequest, reply *InstallSnapshotReply) {
	rf.mu.Lock()
	Debug(dSnap, "S%v {term: %v, commitIndex: %v}, received from S%v with InstallSnapshotRequest {%v} ", rf.me, rf.currentTerm, rf.commitIndex, request.LeaderId, request)
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm
	if request.Term < rf.currentTerm {
		return
	}
	if request.Term > rf.currentTerm {
		rf.currentTerm = request.Term
		rf.votedFor = -1
		rf.persist()
	}
	rf.changeState(Follower)

	if request.LastIncludeIndex <= rf.commitIndex {
		return
	}

	rf.persister.Save(rf.encodeState(), request.Data)
	rf.commitIndex = request.LastIncludeIndex
	rf.logs = []LogEntry{{request.LastIncludeIndex, request.LastIncludeTerm, nil}} //2D遇到的bug所在
	Debug(dSnap, "S%v installs snapshot from S%v, now the commitIndex is %v", rf.me, request.LeaderId, rf.commitIndex)

	rf.waitApplySnapshotRequest = *request
	rf.applyCond.Signal()
}

如果leader接收到回复表示快照已经更新成功了,那么就更新这个节点的nextIndex和matchIndex。

func (rf *Raft) handleInstallSnapshotReply(peer int, request *InstallSnapshotRequest, reply *InstallSnapshotReply) {
	if reply.Term > rf.currentTerm {
		rf.changeState(Follower)
		rf.currentTerm = reply.Term
		rf.votedFor = -1
		rf.persist()
		Debug(dWarn, "S%v found higher term %v in InstallSnapshotReply %v from S%v, changes to follower", rf.me, reply.Term, reply, peer)
	} else {
		rf.nextIndex[peer] = request.LastIncludeIndex + 1
		rf.matchIndex[peer] = request.LastIncludeIndex
		Debug(dLog, "S%v has installed snapshot to S%v, now the S%v's nextIndex is %v", rf.me, peer, peer, rf.nextIndex[peer])
		rf.updateCommitIndexForLeader()
	}
}

注意为了能够有序地进行快照的apply,对原本的applier函数进行了修改,同时增加了waitApplySnapshotRequest来记录最新需要apply的快照请求。

其主要思想是每次唤起applyCond时,先检查是否有新的快照请求,即waitApplySnapshotRequest的Term是否为-1,如果不为-1,那么就进行快照的apply,快照apply了之后再把waitApplySnapshotRequest的Term设置为-1。如果没有新的快照请求,那么就进行日志的apply。

func (rf *Raft) applier() {
	for !rf.killed() {
		rf.mu.Lock()
		for rf.lastApplied >= rf.commitIndex {
			rf.applyCond.Wait()
		}

		if rf.waitApplySnapshotRequest.Term != -1 {
			if rf.lastApplied < rf.waitApplySnapshotRequest.LastIncludeIndex {
				rf.mu.Unlock()

				rf.applyCh <- ApplyMsg{ //Question: two applyCh update way, how to update orderly?
					SnapshotValid: true,
					Snapshot:      rf.waitApplySnapshotRequest.Data,
					SnapshotTerm:  rf.waitApplySnapshotRequest.LastIncludeTerm,
					SnapshotIndex: rf.waitApplySnapshotRequest.LastIncludeIndex,
				}

				rf.mu.Lock()
				rf.lastApplied = rf.waitApplySnapshotRequest.LastIncludeIndex
				Debug(dSnap, "S%v applies snapshot from S%v, now the lastApplied is %v", rf.me, rf.waitApplySnapshotRequest.LeaderId, rf.lastApplied)

			}
			rf.waitApplySnapshotRequest = InstallSnapshotRequest{Term: -1}
			rf.mu.Unlock()
		} else {
			commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
			if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {
				Debug(dWarn, "S%v has no log to apply, because lastApplied %v < firstIndex %v", rf.me, lastApplied, rf.getFirstIndex())
				rf.mu.Unlock()
				continue
			}
			entries := make([]LogEntry, commitIndex-lastApplied)
			Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",
				rf.me, lastApplied, rf.getFirstIndex(), commitIndex)
			copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])
			rf.mu.Unlock()

			for _, entry := range entries {
				rf.applyCh <- ApplyMsg{
					CommandValid: true,
					Command:      entry.Command,
					CommandIndex: entry.Index,
					CommandTerm:  entry.Term,
				}
			}

			rf.mu.Lock()
			Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",
				rf.me, lastApplied+1, len(entries), rf.lastApplied)
			rf.lastApplied = commitIndex
			rf.mu.Unlock()
		}
	}
}

问题记录

当时写的时候也感觉不是特别复杂,但是后面测试的时候发现这里还是有很多需要注意的点,容易导致错误。快照的引入导致的一个重要的问题是我们现在有两种方式来更新状态机的数据,一种是通过日志的apply,一种是通过快照的apply。

一开始的写法是在接收到快照请求进行InstallSnapshot的处理的时候新起了一个go协程来直接对快照进行apply,但是这会导致一系列的问题。

一开始我们对这两者的并发做什么限制,那么这就有可能出现下面这种情况:

  1. follower节点接受到快照同步请求,并且开启一个协程开始进行快照的apply
  2. 在快照的apply之前,follower节点接收到下一个日志的同步的请求,开始进行日志的apply

这两个apply的顺序其实是不确定的,很有可能就会出现先进行日志的apply,然后再进行快照的apply,这样就会导致状态机的数据不一致,所以需要控制在快照进行apply的时候,不允许进行日志的apply。

然后我采用的方法是控制节点的lastApplied值,即在开启协程进行快照的apply前将lastApplied值设置为-1,然后在快照的apply结束后再将lastApplied设置为快照的index值,然后在日志进行apply的时候,对lastApplied进行判断,如果lastApplied值为-1,那么就进行锁等待,直到lastApplied值不为-1,然后再进行日志的apply。但是这种方法在测试的时候会发现,进行1000次测试大约会有0~3次的可能出现错误,错误的原因是在进行日志的apply的时候,需要apply的日志已经在logs中没有了,导致了取值的错误,也就是并发控制没有成功,在进行了快照的apply后,日志的apply依旧在进行。

经过debug发现这是由于出现了如下这种情况:

  1. followe节点接收到日志同步的请求,开启一个协程进行日志的apply
  2. leader节点已经进行了快照,然后由于超时又给该follower节点发送了日志同步的请求
  3. follower节点接收到快照同步的请求,设置lastApplied为-1,然后开启一个协程进行快照的apply
  4. follower节点结束了日志的apply,将lastApplied设置为日志的index,然后follower节点继续检查,发现lastApplied不为-1,且lastApplied小于commitIndex,所以继续进行日志的apply,然后在logs中取日志时发现该日志已经没有了,导致错误。

所以通过lastApplied进行并发控制并不可行,最后采用的方法是添加了snapApplyCount变量,每次在进行快照的apply时,将snapApplyCount加1,快照的apply结束后将snapApplyCount减1,然后在进行日志的apply时,如果snapApplyCount不为0,那么就进入锁等待。

注意在完成快照的apply后,有可能节点已经接收到了leader同步来的其他日志,所以需要在结束后检查是否有新的日志需要apply,如果需要就唤起日志的apply。最后处理快照同步请求的代码如上述的InstallSnapshot所示,日志apply的代码如下:

func (rf *Raft) applier() {
	for !rf.killed() {
		rf.mu.Lock()
		for rf.snapApplyCount != 0 || rf.lastApplied >= rf.commitIndex {
			rf.applyCond.Wait()
		}

		commitIndex, lastApplied := rf.commitIndex, rf.lastApplied
		if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 {
			rf.mu.Unlock()
			continue
		}
		entries := make([]LogEntry, commitIndex-lastApplied)
		Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)",
			rf.me, lastApplied, rf.getFirstIndex(), commitIndex)
		copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()])
		rf.mu.Unlock()

		for _, entry := range entries {
			rf.applyCh <- ApplyMsg{
				CommandValid: true,
				Command:      entry.Command,
				CommandIndex: entry.Index,
				CommandTerm:  entry.Term,
			}
		}

		rf.mu.Lock()
		Debug(dInfo, "S%v finishes applying log entries(startId: %v, length: %v), now rf.lastApplied = %v",
			rf.me, lastApplied+1, len(entries), rf.lastApplied)
		rf.lastApplied = commitIndex
		rf.mu.Unlock()
	}
}

但是上述方法后面经过测试发现也还是有少量的bug,bug的主要原因在于如下这种情况:

  1. follower节点接收到最后日志为x的快照同步请求,开启一个协程进行快照的apply
  2. follower节点又接收到最后日志为x+10的快照同步请求,开启一个协程进行快照的apply
  3. follower先完成了x+10的快照的apply,然后才完成了x的快照的apply,但是这时候它会将lastApplied设置为x,同时apply的顺序也出现了错误。

纵观上面的问题的一大根源在于我们出现了多个apply的协程,而没有对协程进行很好的并发控制,所以最后采取了上述的发型,将所有的apply都放在一个协程中进行,优先进行快照的apply,进测试可以准确地通过。

实验结果

最终对lab2中所有的测试进行了1000次的测试,全部通过。

请添加图片描述

总结

整个lab2中感觉难度最大的还是lab2B,因为需要实现的功能比较多,需要多多参考raft论文中的论文,最为印象深刻的就是lab2D中的并发问题了,这种问题确实在一开始实现的时候比较难想到,需要通过实验发现,而这种1000次测试才出现一两次错误的问题就更加难发现了,需要有全面的日志记录和多次重复实验的系统才行,后面有机会也分享一下有关日志记录和重复实验相关的内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/557913.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

el-table使用show-summary合计,但只需要合并某一列

el-table使用show-summary合计&#xff0c;但只需要合并某一列 这里有两种方法&#xff0c;一种是网上的&#xff0c;我会引用他的链接给你们看。 一种是我自己看源码发现的 方法一 这个就是方法一的链接 点击我跳转方法一 方法二 不需要计算的列 去掉prop 然后用插槽显示即可…

比特币叙事大转向

作者&#xff1a;David Lawant 编译&#xff1a;秦晋 要理比特币解减半动态&#xff0c;最关键的图表是下面这张&#xff0c;而不是价格图表。它显示了自 2012 年以来&#xff0c;矿业总收入与比特币现货交易量的比例&#xff0c;并标注了三个减半日期。 虽然矿工仍然是比特币生…

Python八股文:基础知识Part2

1. Python中变量的保存和访问 Python中的变量实际上是一个指向对象的引用&#xff0c;每个对象都有一个唯一的标识符&#xff08;即内存地址&#xff09;。对于一些不可变对象&#xff0c;如字符串和整数&#xff0c;因为它们的值不可更改&#xff0c;所以当多个变量引用相同的…

OSPF大型实验

OSPF大型实验 实验拓扑图 实验思路 1、R4为ISP&#xff0c;其上只配置IP地址&#xff1b;R4与其他所直连设备间均使用公有IP&#xff1b; 2、R3-R5、R6、R7为MGRE环境&#xff0c;R3为中心站点&#xff1b; 3、整个OSPF环境IP基于172.16.0.0/16划分&#xff1b;除了R12有两…

Java精品项目--第8期基于SpringBoot的宠物用品商城的设计分析与实现

项目使用技术栈 SpringBootThymeleafMyBatisMySQLAopJavajdk1.8 项目介绍 项目截图

JAVA 集合框架(一) Collection集合详解和常用方法

Java集合框架&#xff08;Java Collections Framework&#xff09;是一个强大的、高度灵活的数据结构库&#xff0c;它为Java应用程序提供了组织、存储和操作对象集合的标准方法。 集合类体系结构 接口类&#xff1a; Collection: 是单例集合类的顶层接口&#xff0c;包括Lis…

移动端适配之viewport

目录 盒模型&#xff1a;widthcontent&#xff08;padding border&#xff09; class"content-box"内容盒模型&#xff08;W3C盒&#xff09; class"border-box"边框盒模型&#xff08;IE 盒&#xff09; scroll滚动 window浏览器视窗&#xff1a;包括…

MySQL高级(索引-性能分析-explain执行计划)

explain 或者 desc 命令获取 MySQL 如何执行 select 语句的信息&#xff0c;包括在 select 语句执行过程中表如何连接和连接的顺序。 -- 直接在 select 语句之前加上关键字 explain / desc explain select 字段列表 from 表名 where 条件 &#xff1b; explain select * …

自养号测评:如何提高国际站,敦煌网的店铺销量?

随着互联网技术的迅猛进步&#xff0c;电子商务已经成为现代商业领域中不可或缺的一环。敦煌网&#xff0c;作为专注于中国文化艺术产品的电商平台&#xff0c;成功吸引了大量消费者的目光。然而&#xff0c;对于商家而言&#xff0c;如何进一步提升敦煌网的销售业绩&#xff0…

用这些工具搭建企业内部知识库,原来这么轻松

在快速发展和信息爆炸的时代&#xff0c;为企业构建一个内部知识库变得十分重要。它不仅有助于保存关键信息&#xff0c;促进知识传承&#xff0c;还能提高企业的整体效率和响应能力。今天&#xff0c;我们将探讨三款非常适合搭建企业内部知识库的工具&#xff0c;它们各具特色…

Python教学入门:流程控制

条件语句&#xff08;if 语句&#xff09;&#xff1a; 条件语句用于根据条件的真假执行不同的代码块。 x 10if x > 0: # 如果 x 大于 0print("x 是正数") # 输出&#xff1a;x 是正数 elif x 0: # 如果 x 等于 0print("x 是零") else: # 如果以…

第07-1章 计算机网络相关概念

7.1 本章目标 了解网络协议的概念了解网络体系结构熟悉ISO/OSI参考模型以及每一层的功能掌握TCP/IP模型各层的主要协议及其功能熟练掌握IP地址、子网规划等相关内容 7.2 网络协议的概念 7.2.1 概念介绍 &#xff08;1&#xff09;网络协议&#xff1a;计算机网络和分布系统中…

Java 笔试强训篇- Day1

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 点击消除 1.1 解题思路一 1.2 解题思路二 2.0 在两个数组中找出相同的数 2.1 解题思路 笔试强训说明&#xff1a;有一些题目提供不了原题。 1.0 点击消除 该题链…

初学python记录:力扣1883. 准时抵达会议现场的最小跳过休息次数

题目&#xff1a; 给你一个整数 hoursBefore &#xff0c;表示你要前往会议所剩下的可用小时数。要想成功抵达会议现场&#xff0c;你必须途经 n 条道路。道路的长度用一个长度为 n 的整数数组 dist 表示&#xff0c;其中 dist[i] 表示第 i 条道路的长度&#xff08;单位&…

Redis之路系列(2)纸上得来终觉浅(上)

02 纸上得来终觉浅(上) 文章内容基于redis6&#xff0c;本章节介绍了redis的实际应用&#xff0c;主要包含&#xff1a;大量键值对保存的案例场景&#xff0c;海量key时的聚合计算、排序计算、状态统计、基础统计的应用 大量键值对保存 场景案例 有这么一个需求场景&#xff…

OpenSearch的几种认证

在Amazon OpenSearch Service中&#xff0c;主用户的配置可以通过三种方式进行&#xff1a;用户名和密码组合、IAM角色&#xff0c;以及通过第三方联合登录。这样的配置授权主用户在OpenSearch仪表板上进行内部用户、角色和角色映射的创建。需要注意的是&#xff0c;OpenSearch…

【nginx代理和tengine的启动-重启等命令】

在nginx成功启动后[任务管理器有nginx.exe进程]&#xff0c;运行vue项目&#xff0c;在浏览器访问http://localhost:10001/&#xff0c;提示&#xff1a;访问拒绝&#xff08;调试中network某些地址403&#xff09;&#xff1b; 解决方案&#xff1a; localhost改为ip&#xff…

【论文笔记 | 异步联邦】Asynchronous Federated Optimization

论文信息 Asynchronous Federated Optimization&#xff0c;OPT2020: 12th Annual Workshop on Optimization for Machine Learning&#xff0c;不属于ccfa introduction 背景&#xff1a;联邦学习有三个关键性质 任务激活不频繁&#xff08;比较难以达成条件&#xff09;&…

怎么配置python

右键点击“计算机”&#xff0c;选择“属性”。 在左侧栏找到“高级系统设置”。 点击“环境变量”。 在系统变量中&#xff0c;双击“Path”。 在字符串的末尾&#xff0c;加一个分号; 然后再输入你安装python的路径&#xff0c;如图所示&#xff1a; 点击“确定”&#xff0…

Python中的迭代器:深入理解与实用指南

文章目录 1. 迭代器的基本概念2. Python中的迭代器实例3. 自定义迭代器3.1 例子3.2 详细过程 4. 迭代器的高级应用5. 常见问题与解答 迭代器是Python中非常核心的概念之一&#xff0c;在面试中也会被问到。下面我会详细介绍什么是迭代器&#xff0c;使用方法&#xff0c;以及使…
最新文章