通过写代码学习AWS DynamoDB (3)- 一致性hash

简介

在本文中,我们将简单介绍一致性hash(consistent hash)的概念,以及一致性hash可以解决的问题。然后我们将在模拟的DDB实现中实现一个简单版本的基于一致性harsh实现的partition。

问题

在《通过写代码学习AWS DynamoDB (2)》中我们的DDB使用了最朴素的hash算法来分配一个key/value存储的partition。也就是使用

key % <partition count>

的方法。当我们需要改变partition的数量来达到scale out和scale in的目的时,我们发现使用这种方法算出来的新的partition会和之前的partition有很大的区别。例如,如果我们把partition的数量从10增加到11时,新分配的partition和以前分配的partition会如下表所示:

Key

Existing Partition

New Partition

1567

7

5

2354

4

0

2888

8

6

8971

1

6

Key所对应的partition的改变意味着我们需要把该partition的数据转移到新的partition上。这是一种很大的开销。而在partition数量改变时,一致性hash可以帮助我们减少数据所对应的partition发生改变的次数。

一致性hash介绍

一致性hash的介绍文章在网上很多,大家可以自行参看。这里仅仅做一个简单的介绍。首先,我们介绍下面几个和hash有关的概念:

Hash空间:Hash空间是指全部可能的hash key组成的空间。比如说,我的hash key是用“x % 1000”产生的,那么我的hash空间就是0到999,因为所有可能的hash key都处在这个范围内,并且这个空间的每一个值也都可能是一个hash key。

Hash key:Hash key是用hash函数产生的key。对于每一个需要映射到hash空间的对象都需要使用hash函数产生出一个hash key。

Hash server:这里Hash server是指将一个server或者host通过某种方法映射到Hash空间。比如,使用IP地址来进行映射。

下面的图描述了一个一致性hash的实现原理。

图中的A,B,C是3个hash server,它们已经被映射到了hash 空间上。Kate,John,Steve,Bill,和Jane是5个element,它们也被映射到了hash空间上。现在我们规定每一个element将被保存到与它相邻的前面的hash server中。这样,John,Steve被保存到C中,Bill被保存到B中,Kate和Jane被保存到A中。通过这种方式,我们可以看到如果需要增加或者减少Hash server,理论上只会有1/N的元素被移动到新的hash server里,N是全部hash server的数量。而一般性的hash实现方法几乎是全部数据需要被移动,所以这是一个很大的改进。

这种普通的consistent hash的实现仍然具有一些缺点。例如当我们分配element给hash server时,可能会出现非常不均匀的情况。某些hash server可能会保存比其它hash server多很多的element。这是我们试图避免的。所以,又产生了Virtual node的概念。如下图所示:

现在映射到hash空间的都是每一个hash server的virtual node。每一个hash server可以有很多的virtual node。比如图中每一个hash server有3个virtual node。现在每一个hash空间里的element是被分配到邻近的virtual node上,而通过virtual node又可以找到实际的hash server。通过将每一个hash server映射出多个virtual node的方式,我们可以大大减小element分配不均匀的问题。

实现一致性hash

我们在这里实现的是最基础的一致性hash,但是基于virtual node的实现本身也不复杂很多,只是多了一层virtual node到hash server的mapping。首先,我们将partition的管理从DDB.Table里移出来,并且定义了一个类Consistent_hash_manager,由它管理partition。这个类不仅提供了对key/value的增删改操作,同时提供了scale_out_partition()和scale_in_partition()两个操作。通过这两个操作我们可以增加和删除partition。这样DDB.Table就不再需要知道partition的细节,而仅仅需要通过Consistent_hash_manager的接口直接对数据和partition进行操作。修改之后的DDB和Consistent_hash_manager如下所示:

DDB 类的实现:

from consistent_hash_manager import Consistent_hash_manager

# class to provide DDB public APIs
# - support partitions based on hash value of key;
class DDB:
    def __init__(self):
        self.tables = {}

    def create_table(self, table_name):
        self.tables[table_name] = self.Table(table_name)

    def list_table(self):
        for table in self.tables.values():
            table.describe()

    def delete_table(self, table_name):
        self.tables.pop(table_name)

    def get_table(self, table_name):
        return self.tables[table_name]
    
    class Table:
        def __init__(self, name, partition_count=1):
            self.name = name
            self.consistent_hash_manager = Consistent_hash_manager(partition_count)

        def put_item(self, key, value):
            self.consistent_hash_manager.put_item(key, value)

        def update_item(self, key, value):
            self.consistent_hash_manager.put_item(key, value)

        def get_item(self, key):
            return self.consistent_hash_manager.get_item(key)
        
        def delete_item(self, key):
            self.consistent_hash_manager.delete_item(key)

        def describe(self):
            item_count = self.consistent_hash_manager.get_item_count()
            print("Table name: {}, item size: {}".format(self.name, item_count))

        def scale_out(self):
            self.consistent_hash_manager.scale_out_partition()

        def scale_in(self):
            self.consistent_hash_manager.scale_in_partition()

Consistent_hash_manager的实现:

from random import randrange
import functools

from partition import Partition

class Consistent_hash_manager:
    
    def __init__(self, partition_count=3):
        self.hash_space = 10
        self.available_hash_keys = list(range(self.hash_space))
        
        self.partitions = list()
        for _ in range(partition_count):
            self.partitions.append(Partition(self.available_hash_keys[randrange(len(self.available_hash_keys))]));
            self.available_hash_keys.remove(self.partitions[-1].get_id())

        # sort by id so partitions build a consistent hash ring
        self.partitions.sort(key=lambda x : x.get_id())

    # add one partition
    def scale_out_partition(self):

        if len(self.available_hash_keys) == 0:
            print("No available hash space for scale out")
            return

        self.partitions.append(Partition(self.available_hash_keys[randrange(len(self.available_hash_keys))]))
        self.available_hash_keys.remove(self.partitions[-1].get_id())
        print("scale out one new partition with ID {}".format(self.partitions[-1].get_id()))

        partition_key = self.partitions[-1].get_id()
        self.partitions.sort(key=lambda x : x.get_id())

        for i in range(len(self.partitions)):
            if self.partitions[i].get_id() == partition_key:
                new_partition = self.partitions[i]
                if i + 1 < len(self.partitions):
                    next_partition = self.partitions[i+1]
                else:
                    next_partition = self.partitions[0]
                break
        
        print("move elements from the next partition to the new partition")
        for key in next_partition.get_items():
            partition = self.get_partition(key)
            if partition == new_partition:
                print("move {} from partition {} to {}".format(key, next_partition.get_id(), new_partition.get_id()))
                new_partition.put_item(key, next_partition.get_items()[key])
        
        for key in new_partition.get_items():
            next_partition.delete_item(key)


    # remove one partition
    def scale_in_partition(self):
        if len(self.partitions) == 1:
            print("Only one partition now. Cannot delete")
            return
        
        deleted_partition_index = randrange(len(self.partitions))
        deleted_partition = self.partitions[deleted_partition_index]
        if deleted_partition_index == len(self.partitions) - 1:
            next_partition = self.partitions[0]
        else:
            next_partition = self.partitions[deleted_partition_index + 1]

        print("Delete partition with ID {}. Move elements from the deleted partition to the next partition.".format(deleted_partition.get_id()))
        for key,value in deleted_partition.get_items().items():
            print("move {} from partition {} to {}".format(key, deleted_partition.get_id(), next_partition.get_id()))
            next_partition.put_item(key, value)
        
        self.available_hash_keys.append(deleted_partition.get_id())
        del(self.partitions[deleted_partition_index])


    def put_item(self, key, value):
        partition = self.get_partition(key)
        print("save {} to partition {}".format(key, partition.get_id()))
        partition.put_item(key, value)

    def get_item(self, key):
        partition = self.get_partition(key)
        print("get {} from partition {}".format(key, partition.get_id()))
        return partition.get_item(key)
    
    def delete_item(self, key):
        partition = self.get_partition(key)
        print("delete {} from partition {}".format(key, partition.get_id()))
        partition.delete_item(key)

    def get_item_count(self):
        return functools.reduce(lambda x, y : x + y.get_item_count(), self.partitions, 0) 
    
    def get_hash_key(self, key):
        return self.my_hash(key) % self.hash_space
        
    def my_hash(self, text:str):
        hash=0
        for ch in text:
            hash = ( hash*281  ^ ord(ch)*997) & 0xFFFFFFFF
        return hash
    
    def get_partition(self, key):
        hash_key = self.get_hash_key(key)

        if len(self.partitions) == 1 or hash_key <= self.partitions[0].get_id() or hash_key > self.partitions[-1].get_id() :
            return self.partitions[0]

        left = self.partitions[0].get_id()
        for partition in self.partitions[1:]:
            if hash_key <= partition.get_id() and hash_key > left:
                return partition
            left = partition.get_id()
        
        return None

现在我们使用下面的代码对我们的DDB和Consistent_hash_manager进行测试:

from ddb import DDB

ddb = DDB()

table_name = "test_table"

ddb.create_table(table_name)
ddb.list_table()
ddb_table = ddb.get_table(table_name)
ddb_table.put_item("a", "value_of_a")
ddb_table.put_item("b", "value_of_b")
ddb_table.put_item("c", "value_of_c")
ddb_table.put_item("d", "value_of_d")

ddb_table.scale_out()

print(ddb_table.get_item("a"))
print(ddb_table.get_item("b"))
print(ddb_table.get_item("c"))
print(ddb_table.get_item("d"))

ddb_table.scale_in()

ddb_table.delete_item("a")

ddb_table.describe()

我们首先创建了一个DDB表,表的默认partition只有一个。然后向其中插入了4个key/value,key分别是"a", "b", "c", "d"。然后我们给该表增加一个partition,此时我们应该会看到某些key/value被移动到新的partition里。然后我们查询key,观察它们是否是从正确的partition里查询的。然后我们删除掉一个partition,并且删除掉一个key,此时我们会发现有些数据会从被删除的partition移动到其它的partition里。最后我们查询一下目前表中key/value的数量。

我们测试结果如下:

# 创建表
Table name: test_table, item size: 0

# 添加key/value到partition 9中
save a to partition 9
save b to partition 9
save c to partition 9
save d to partition 9

# scale out一个partition 7
scale out one new partition with ID 7

# 三条数据被移动到新的partition里
move elements from the next partition to the new partition
move b from partition 9 to 7
move c from partition 9 to 7
move d from partition 9 to 7

# 读取key并且验证它们是从正确的partition中读取的
get a from partition 9
value_of_a
get b from partition 7
value_of_b
get c from partition 7
value_of_c
get d from partition 7
value_of_d

# scale in一个partition 9. partition 9里的数据被移动到partition 7里.
Delete partition with ID 9. Move elements from the deleted partition to the next partition.
move a from partition 9 to 7

# 删除一个key.并且查询目前表中key的数量.
delete a from partition 7
Table name: test_table, item size: 3

问题扩展

首先大家可以尝试在现在的实现基础上实现virtual node。我们在这里讨论另一个问题。如果我们有很多的partition,现在我们有一个key,我们如何快速的找到它对应的hash server(或者virtual node)呢?在我们目前的实现里我们是线性查找的,时间复杂度是O(n)。我们可以考虑使用二叉搜索树(binary search tree)来改善时间复杂度。二叉搜索树是一种二叉树,每一个node的左孩子节点的value都小于它,右孩子节点的value都大于它。这样我们搜索的效率就可以改善为O(lg(n))。

其次,我们还应该想到真正的产品实现会比我们的实现复杂的多。每一次scale out和scale in的操作都需要同步来保证数据的增删改读的可用性和正确性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/390527.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

.NET Core MongoDB数据仓储和工作单元模式实操

前言 上一章节我们主要讲解了MongoDB数据仓储和工作单元模式的封装&#xff0c;这一章节主要讲的是MongoDB用户管理相关操作实操。如&#xff1a;获取所有用户信息、获取用户分页数据、通过用户ID获取对应用户信息、添加用户信息、事务添加用户信息、用户信息修改、用户信息删除…

适合新手注册google谷歌账号详细步骤截图演示,记录解决 “此电话号码无法用于进行验证” 问题

目录 手机号码验证问题解决方法具体注册步骤演示1、谷歌浏览器改语言2、点击注册3、选择个人用途4、输入姓名5、输入年月日6、创建登录账号7、设置登录密码8、手机号码验证&#xff08;重点&#xff09;9、手机号码验证成功界面10、关联邮箱备用登录账号设置 手机号码验证问题 …

Java与JavaScript同源不同性

Java是目前编程领域使用非常广泛的编程语言&#xff0c;相较于JavaScript&#xff0c;Java更被人们熟知。很多Java程序员想学门脚本语言&#xff0c;一看JavaScript和Java这么像&#xff0c;很有亲切感&#xff0c;那干脆就学它了&#xff0c;这也间接的帮助了JavaScript的发展…

2.1.1 摄像头

摄像头 更多内容&#xff0c;请关注&#xff1a; github&#xff1a;https://github.com/gotonote/Autopilot-Notes.git 摄像头是目前自动驾驶车中应用和研究最广泛的传感器&#xff0c;其采集图像的过程最接近人类视觉系统。基于图像的物体检测和识别技术已经相当成熟&#…

wayland(xdg_wm_base) + egl + opengles——dma_buf 作为纹理数据源(五)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、EGL dma_buf import 相关的数据结构和函数1. EGLImageKHR2. eglCreateImageKHR()3. glEGLImageTargetTexture2DOES()二、egl 中 import dma_buf 作为纹理的代码实例1. egl_wayland_dmabuf_…

Crypto-RSA2

题目&#xff1a;&#xff08;BUUCTF在线评测 (buuoj.cn)&#xff09; 已知e,n,dp/(dq),c求明文: 首先有如下公式&#xff1a; dp ≡ d mod (p-1) &#xff0c;ed ≡ 1 mod φ(n) &#xff0c;npq &#xff0c;φ(n)(p-1)(q-1) python代码实现如下&#xff1a; import libnu…

[UI5 常用控件] 09.IconTabBar,IconTabHeader,TabContainer

文章目录 前言1. IconTabBar1.1 简介1.2 基本结构1.3 用法1.3.1 颜色&#xff0c;拖放&#xff0c;溢出1.3.2 Icons Only , Inner Contents1.3.3 showAll,Count,key,IconTabSeparator 1.3.4 Only Text1.3.5 headerMode-Inline1.3.6 design,IconTabSeparator-icon1.3.7 DensityM…

云计算基础-虚拟机迁移原理

什么是虚拟机迁移 虚拟机迁移是指将正在运行的虚拟机实例从一个物理服务器&#xff08;或主机&#xff09;迁移到另一个物理服务器&#xff08;或主机&#xff09;的过程&#xff0c;而不会中断虚拟机的运行。 虚拟机拟机迁移分类虚 热迁移&#xff1a;开机状态下迁移 冷迁…

ChatGPT高效提问—prompt实践(健康助手)

ChatGPT高效提问—prompt实践&#xff08;健康助手&#xff09; ​ 随着社会经济的发展&#xff0c;人们的生活条件不断改善&#xff0c;人们对身体健康也日益重现。让ChatGPT作为健康助手也是一件不错的事。开始之前&#xff0c;还是老样子&#xff0c;先设置角色。 ​ 输入…

HTML | DOM | 网页前端 | 常见HTML标签总结

文章目录 1.前端开发简单分类2.前端开发环境配置3.HTML的简单介绍4.常用的HTML标签介绍 1.前端开发简单分类 前端开发&#xff0c;这里是一个广义的概念&#xff0c;不单指网页开发&#xff0c;它的常见分类 网页开发&#xff1a;前端开发的主要领域&#xff0c;使用HTML、CS…

基于Springboot的社区物资交易互助平台(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的社区物资交易互助平台&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系…

游戏安全组件运行时发生异常1-0-0

可能是这个服务&#xff0c;可能被禁用了。 如果是文件缺少直接修复游戏

15-k8s控制器资源-deployment/部署控制器

一、deployment部署控制器概念 在学习rc和rs控制器资源时&#xff0c;我们指导&#xff0c;这两个资源都是控制pod的副本数量的&#xff0c;但是&#xff0c;他们两个有个缺点&#xff0c;就是在部署新版本pod或者回滚代码的时候&#xff0c;需要先apply资源清单&#xff0c;然…

云计算基础-网络虚拟化

虚拟交换机 什么是虚拟交换机 虚拟交换机是一种运行在虚拟化环境中的网络设备&#xff0c;其运行在宿主机的内存中&#xff0c;通过软件方式在宿主机内部实现了部分物理交换机的功能&#xff0c;如 VLAN 划分、流量控制、QoS 支持和安全功能等网络管理特性 虚拟交换机在云平…

java8-用流收集数据-6

本章内容口用co1lectors类创建和使用收集器 口将数据流归约为一个值 口汇总:归约的特殊情况 数据分组和分区口 口 开发自己的自定义收集器 我们在前一章中学到&#xff0c;流可以用类似于数据库的操作帮助你处理集合。你可以把Java8的流看作花哨又懒惰的数据集迭代器。它们…

[office] Excel设置打印表头 #学习方法#学习方法#微信

Excel设置打印表头 有时候需要在每页上都显示表头&#xff0c;这样好查看数据&#xff0c;下面我们来看一个练习; 1、启动Excel 1)点击“开始-所有程序-Microsoft-Microsoft Office Excel 2003"; 2)出现一个满是格子的空白窗口&#xff0c;这就是一张电子表格了&#x…

Simulink模块说明

Simulink库 Commonly Used Blocks Bus Creator 将一组输入元素合并成一条总线**输入&#xff1a;**要包含在总线中的输入元素。**输出&#xff1a;**由合并的输入元素组成的输出总线&#xff0c;指定为虚拟或非虚拟总线。 Bus Selector Bus Selector 模块输出您从输入总线选…

CTFshow web(文件上传158-161)

web158 知识点&#xff1a; auto_append_file 是 PHP 配置选项之一&#xff0c;在 PHP 脚本执行结束后自动追加执行指定的文件。 当 auto_append_file 配置被设置为一个文件路径时&#xff0c;PHP 将在执行完脚本文件的所有代码后&#xff0c;自动加载并执行指定的文件。 这…

物流快递管理系统

文章目录 物流快递管理系统一、系统演示二、项目介绍三、13000字论文参考四、系统部分页面展示五、部分代码展示六、底部获取项目源码和万字论文参考&#xff08;9.9&#xffe5;带走&#xff09; 物流快递管理系统 一、系统演示 校园物流快递管理系统 二、项目介绍 主要技术…

2024.2.16日总结(小程序开发8)

数据监听器 监听对象属性的变化 数据监听器支持监听对象中单个或多个属性的变化 纯数据字段 什么是纯数据字段 纯数据字段指的是哪些不用于页面渲染的data字段 应用场景:例如有些情况下&#xff0c;某些 data 中的字段既不会展示在界面上&#xff0c;也不会传递给其他组件…