高并发幂等计数器的设计与实现

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁
🦄 博客首页——猫头虎的博客🎐
🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺
🌊 《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐
🌊 《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大小厂~💐

🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥

文章目录

  • 高并发幂等计数器的设计与实现
    • 摘要
    • 引言
    • 问题描述:
    • 依赖组件
    • 实现思路
    • Go 代码示例
    • Java 代码示例
      • 简单代码:
      • 详细代码:
      • 思路解析
      • 优化问题
      • 解决方法
        • 解决方案一(不使用Redis):
        • 方案二: 使用Redis
    • Python 代码示例
    • 拓展:
      • 1. 大量请求同时到来
        • 扩展性:
        • 限流:
        • 缓存:
        • 异步处理:
      • 2. 合适的过期时间
      • 如果不用redis呢?
      • 1. 数据库唯一索引
      • 2. 应用内存
      • 3. 分布式锁
      • 4. 消息队列
      • 5. 文件系统
    • 总结
  • 原创声明

高并发幂等计数器的设计与实现

在这里插入图片描述

摘要

本文探讨了如何实现一个高并发、幂等的计数器服务,该服务用于处理外部的 inc 请求以增加特定视频的播放计数。考虑到网络延迟和重试等因素,该服务需要确保每个请求至少被处理一次,同时避免重复计数。我们使用了 MySQL 用于持久化存储计数数据,并用 Redis 进行幂等性检查。本文通过 Go、Java 和 Python 三种编程语言展示了具体的实现代码,并对核心逻辑进行了详细解释。Java 代码部分更是进行了全流程的展示,包括幂等性检查、数据库更新和已处理请求的记录。这样的设计不仅确保了高并发处理能力,还实现了请求的幂等性。

引言

在分布式系统中,高并发和幂等性是两个非常关键的问题。本文将探讨如何实现一个高并发、幂等的计数器服务。该服务接受外部的 inc 请求,用于增加特定视频的播放计数。由于网络延迟和请求重试等原因,多个相同或不同的 inc 请求可能并发到达服务。因此,服务需要确保每个请求至少被处理一次(at least once),同时避免重复计数。我们将使用 Go、Java 和 Python 来分别演示这一实现。

问题描述:

高并发幂等计数器题目
问题描述:
1.实现一个计数器服务
2.服务接收外部的 inc 请求,每个请求具有全局唯一 request id 和视频 id
3.因为网络和重试的原因,请求可能会重复的到达
4.时序上,多个重复的请求可能并发达到,两次重复请求之间的间隔不可预期
5.需要保证 at least once ,计数值不能丢失
6.可以依赖一些外部组件, mysql redis

依赖组件

  • MySQL: 用于持久化存储计数器的数据。
  • Redis: 用于高速缓存和临时存储已经接收到的 request id。

实现思路

  1. 接收请求: 使用 Web 框架接收 inc 请求,并提取其中的 request_idvideo_id
  2. 幂等检查: 使用 Redis 查询该 request_id,如果已存在,则该请求已被处理。
  3. 队列或缓存: 如果是新的 request_id,则将其存入 Redis,并进行数据库更新操作。
  4. 计数逻辑: 从 MySQL 中获取当前计数,然后加 1,并更新回数据库。

Go 代码示例

// 导入相应的包
import (
	"github.com/go-redis/redis/v8"
	"database/sql"
	// 其他必要的包
)

func incHandler(requestID string, videoID string) string {
	if isProcessed(requestID) {
		return "Already Processed"
	}

	// 更新数据库
	updateCounter(videoID)
	
	return "OK"
}

func isProcessed(requestID string) bool {
	// Redis 检查
	val, _ := redisClient.Get(ctx, requestID).Result()
	return val != ""
}

func updateCounter(videoID string) {
	// MySQL 更新
	// 省略具体实现
}

Java 代码示例

简单代码:

import redis.clients.jedis.Jedis;
import org.springframework.jdbc.core.JdbcTemplate;
// 其他必要的导入

@RestController
public class CounterController {

	@Autowired
	Jedis jedis;

	@Autowired
	JdbcTemplate jdbcTemplate;

	@RequestMapping("/inc")
	public String inc(@RequestParam String requestId, @RequestParam String videoId) {
		if (isProcessed(requestId)) {
			return "Already Processed";
		}

		// 更新数据库
		updateCounter(videoId);

		return "OK";
	}

	public boolean isProcessed(String requestId) {
		// Redis 检查
		return jedis.exists(requestId);
	}

	public void updateCounter(String videoId) {
		// MySQL 更新
		// 省略具体实现
	}
}

详细代码:


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;

@RestController
public class CounterController {

    @Autowired
    private Jedis jedis;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @GetMapping("/inc")
    public String inc(@RequestParam String requestId, @RequestParam String videoId) {
        // Step 1: 幂等性检查
        if (isProcessed(requestId)) {
            return "Already Processed";
        }

        // Step 2: 更新数据库
        if (updateCounter(videoId)) {
            // Step 3: 将 requestId 存入 Redis 以保证幂等性
            jedis.set(requestId, "true");
            return "OK";
        }
        
        return "Failed";
    }

    private boolean isProcessed(String requestId) {
        // 使用 Redis 检查 requestId 是否已处理
        return jedis.exists(requestId);
    }

    private boolean updateCounter(String videoId) {
        // 使用 JdbcTemplate 和 MySQL 更新计数器
        String query = "SELECT count FROM video_counter WHERE video_id = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);

        if (count == null) {
            // 如果视频尚未有计数,初始化为 1
            jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
        } else {
            // 如果视频已有计数,加 1
            jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
        }

        return true;
    }
}

思路解析

幂等性检查: 使用 Redis 进行了幂等性检查。如果请求已经被处理过(即在 Redis 中有记录),我们就直接返回。

数据库更新: 我们使用 Spring 的 JdbcTemplate 来与 MySQL 进行交互。如果这是一个全新的 video_id,我们将其添加到数据库并初始化计数为 1;否则,我们找到当前计数并加 1。

记录处理过的请求: 最后,我们将处理过的 request_id 添加到 Redis 中,以便进行未来的幂等性检查。

这样,我们就得到了一个高并发、幂等的计数器服务。

优化问题

如果两个重复的请求,进入到Step1 都没有查到,不是就都进入step2

解决方法

如果两个相同的 request_id 几乎同时到达,并且都通过了 Step 1 的幂等性检查,那么它们都会进入 Step 2,从而违反了我们想要的幂等性。

在不使用 Redis 或其他外部锁服务的情况下,这个问题变得更加复杂。一种可能的解决方案是使用数据库的原子操作和唯一约束来确保幂等性。具体来说,您可以使用数据库事务来解决这个问题。

解决方案一(不使用Redis):

  1. 创建一个新表(比如叫做 processed_requests)用于存储已经处理过的 request_id。该表有一个唯一约束在 request_id 上。

  2. 在处理一个新请求之前,尝试将 request_id 插入到 processed_requests 表中。如果插入成功,说明这是第一次处理该请求,然后再去更新视频的播放计数。如果插入失败(因为唯一约束),则说明该请求已经被处理过,直接返回。

  3. 这一切都应在一个数据库事务中完成,以确保操作的原子性。

这里是更新后的 Java 代码片段:

import org.springframework.transaction.annotation.Transactional;

// ...

@RestController
public class CounterController {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional
    @GetMapping("/inc")
    public String inc(@RequestParam String requestId, @RequestParam String videoId) {
        // Step 1: 幂等性检查和插入
        try {
            jdbcTemplate.update("INSERT INTO processed_requests(request_id) VALUES(?)", requestId);
        } catch (DuplicateKeyException e) {
            return "Already Processed";
        }

        // Step 2: 更新数据库
        updateCounter(videoId);

        return "OK";
    }

    private void updateCounter(String videoId) {
        String query = "SELECT count FROM video_counter WHERE video_id = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);

        if (count == null) {
            jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
        } else {
            jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
        }
    }
}

这样,在并发环境中,由于数据库的唯一约束和事务,我们可以确保每个 request_id 只会被处理一次。这解决了您提到的问题。

方案二: 使用Redis

如果使用 Redis,一种常见的方法是利用 Redis 的原子操作来解决这个问题。具体来说,可以使用 SETNX (SET if Not eXists)命令来设置 request_id 的值,并检查操作是否成功。这个命令只有在键不存在时才会设置值,并返回一个标识(通常是 1 表示成功,0 表示失败)。因为 Redis 的这个操作是原子性的,所以即使有多个相同的请求几乎同时到达,也只有一个能够成功设置该 request_id

下面是 Java 代码中这部分逻辑的示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;

@RestController
public class CounterController {

    @Autowired
    private Jedis jedis;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @GetMapping("/inc")
    public String inc(@RequestParam String requestId, @RequestParam String videoId) {
        // Step 1: 幂等性检查
        if (!setIfNotExists(requestId)) {
            return "Already Processed";
        }

        // Step 2: 更新数据库
        updateCounter(videoId);
        
        return "OK";
    }

    private boolean setIfNotExists(String requestId) {
        return jedis.setnx(requestId, "true") == 1;
    }

    private void updateCounter(String videoId) {
        String query = "SELECT count FROM video_counter WHERE video_id = ?";
        Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);

        if (count == null) {
            jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
        } else {
            jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
        }
    }
}

这里,setIfNotExists 方法使用 Redis 的 SETNX 命令来尝试设置一个 request_id。如果设置成功(返回 1),则继续更新数据库。如果设置失败(返回 0,表示该 request_id 已经存在),则直接返回 “Already Processed”,实现幂等性。

这样,在高并发环境下,即使多个相同的 request_id 几乎同时到达,也只有一个会被实际处理,其他的都会在 setIfNotExists 阶段被拦截,从而确保了幂等性。

Python 代码示例

from flask import Flask, request
import redis
import mysql.connector

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
conn = mysql.connector.connect(user='root', password='password', database='test')

@app.route('/inc', methods=['POST'])
def inc():
    request_id = request.form['request_id']
    video_id = request.form['video_id']

    if r.exists(request_id):
        return 'Already Processed'

    r.set(request_id, 1)
    
    cursor = conn.cursor()
    # 更新 MySQL
    # 省略具体实现

    return 'OK'

拓展:

面试官提问:1.我就是同一个时间段大量的请求到来了怎么办?
2.还有这个合适的过期时间怎么合适

1. 大量请求同时到来

当大量请求在同一时间段到来时,一般有以下几种应对方式:

扩展性:

  1. 横向扩展(水平扩展): 通过增加更多的服务器来分摊负载。
  2. 垂直扩展(纵向扩展): 通过增加单个服务器的计算资源(例如,CPU、RAM)。

限流:

  1. 令牌桶或漏桶算法: 控制进入到系统的请求速率。
  2. 优先级队列: 为不同类型的请求设置不同的优先级。

缓存:

  1. 读缓存: 对常读不变或少变的数据进行缓存。
  2. 写缓存(Write-Through、Write-Back): 缓存写操作,批量或异步地更新到数据库。

异步处理:

  1. 消息队列: 使用如 RabbitMQ、Kafka 等消息队列服务将处理请求和更新数据库的操作解耦。

2. 合适的过期时间

设置合适的过期时间主要依赖于业务需求和系统资源。以下几点可能有助于确定一个合适的过期时间:

  1. 业务逻辑: 如果请求是短暂的(比如秒杀活动),那么短的过期时间可能更合适。

  2. 资源限制: 如果 Redis 或其他缓存资源有限,较短的过期时间可以更快地回收资源。

  3. 数据一致性: 长的过期时间可能导致数据不一致,短的过期时间虽然可以减少这种风险,但会增加系统的复杂性。

  4. 网络延迟和重试策略: 如果网络延迟和重试可能导致长时间的重复请求,应设置相对长一些的过期时间。

综合以上因素,过期时间的设置通常需要在性能和准确性之间找到一个平衡点。这通常需要通过实验和性能测试来不断调整和优化。

如果不用redis呢?

如果不使用 Redis,我们有几种其他方法也可以实现高并发和幂等性。

1. 数据库唯一索引

可以在数据库中为 request_id 设置唯一索引。当重复请求到来时,尝试插入相同的 request_id 将会触发数据库错误,从而达到幂等性的目的。

private boolean isProcessed(String requestId) {
    try {
        jdbcTemplate.update("INSERT INTO request_ids(request_id) VALUES(?)", requestId);
    } catch (DuplicateKeyException e) {
        return true;
    }
    return false;
}

2. 应用内存

在应用级别维护一个已处理 request_id 的集合。这样不需要外部存储,但这种方式不适合分布式系统,因为每个实例都有自己的内存。

private Set<String> processedRequestIds = Collections.synchronizedSet(new HashSet<>());

private boolean isProcessed(String requestId) {
    if (processedRequestIds.contains(requestId)) {
        return true;
    }
    processedRequestIds.add(requestId);
    return false;
}

3. 分布式锁

使用数据库或者其他分布式锁机制确保同一时间只有一个请求在处理。这种方法可以确保幂等性但可能影响系统性能。

private boolean isProcessed(String requestId) {
    if (acquireLock(requestId)) {
        // 进行处理
        releaseLock(requestId);
        return false;
    }
    return true;
}

4. 消息队列

使用消息队列(如 Kafka、RabbitMQ 等)确保消息的幂等处理。大多数现代消息队列都提供了这种机制。

5. 文件系统

在文件系统中为每个 request_id 创建一个唯一文件。通过检查文件存在与否,来确定是否处理了该请求。但这种方法不适合高并发场景。

每种方法都有其优点和局限性。选择哪种方法取决于具体需求、可用资源以及你愿意接受的复杂性。

总结

通过使用 Redis 进行幂等性检查,以及使用 MySQL 进行持久化存储,我们成功地实现了一个高并发、幂等的计数器服务。这种设计能够在高并发条件下保证 at least once 语义,同时也实现了幂等性。

该设计还有进一步优化和扩展的空间,例如,可以加入更多的负载均衡和高可用性特性,或者使用消息队列来进一步解耦生产者和消费者。

感谢您的阅读,希望本文能为您提供有用的信息和启示。如有任何问题或建议,请随时留言。


作者: [猫头虎]
发布时间: [2023.08.30]

在这里插入图片描述

原创声明

======= ·

  • 原创作者: 猫头虎

作者wx: [ libin9iOak ]

学习复习

本文为原创文章,版权归作者所有。未经许可,禁止转载、复制或引用。

作者保证信息真实可靠,但不对准确性和完整性承担责任

未经许可,禁止商业用途。

如有疑问或建议,请联系作者。

感谢您的支持与尊重。

点击下方名片,加入IT技术核心学习团队。一起探索科技的未来,共同成长。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/96031.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Prompt GPT推荐社区

大家好&#xff0c;我是荷逸&#xff0c;这次给大家带来的是我日常学习Prompt社区推荐 Snack Prompt 访问地址&#xff1a;http://snackprompt.com Snack Prompt是一个采用的Prompts诱导填空式的社区&#xff0c;它提供了一种简单的prompt修改方式&#xff0c;你只需要输入关…

【工作笔记-0038】mongodb mongorestore 命令行导入 bson.gz数据

1. 导出的集合文件格式如下&#xff08;也就是导出的表文件&#xff09;&#xff1a; 例如&#xff1a; D:\Files\xxxx集合名称.bson.gz 怎样导出&#xff0c;这里不做介绍&#xff0c;用 mongodb compass 或者 studio 3t 都可以 2. 下载命令行导入工具&#xff1a; 官方…

APP UI自动化测试思路总结

首先想要说明一下&#xff0c;APP自动化测试可能很多公司不用&#xff0c;但也是大部分自动化测试工程师、高级测试工程师岗位招聘信息上要求的&#xff0c;所以为了更好的待遇&#xff0c;我们还是需要花时间去掌握的&#xff0c;毕竟谁也不会跟钱过不去。接下来&#xff0c;一…

设计模式之工厂模式(万字长文)

文章目录 概述工厂模式的优点包括工厂模式有几种主要的变体看一个具体需求使用传统的方式来完成传统的方式的优缺点 简单工厂模式基本介绍使用简单工厂模式简单工厂模式的优缺点优点&#xff1a;缺点&#xff1a; 工厂方法模式看一个新的需求思路 1思路 2工厂方法模式介绍工厂方…

Python提取JSON文件中的指定数据并保存在CSV或Excel表格文件内

本文介绍基于Python语言&#xff0c;读取JSON格式的数据&#xff0c;提取其中的指定内容&#xff0c;并将提取到的数据保存到.csv格式或.xlsx格式的表格文件中的方法。 JSON格式的数据在数据信息交换过程中经常使用&#xff0c;但是相对而言并不直观&#xff1b;因此&#xff0…

【LeetCode】290. 单词规律

这里写自定义目录标题 2023-8-30 09:34:23 290. 单词规律 2023-8-30 09:34:23 这道题目&#xff0c;我是根据 205. 同构字符串 的思路一样&#xff0c;都转化为另外一个第三方的字符串&#xff0c;在比较翻译过后的语句是不是一样的。 class Solution {public boolean wordP…

《Flink学习笔记》——第三章 Flink的部署模式

不同的应用场景&#xff0c;有时候对集群资源的分配和占用有不同的需求。所以Flink为各种场景提供了不同的部署模式。 3.1 部署模式&#xff08;作业角度/通用分类&#xff09; 根据集群的生命周期、资源的分配方式、main方法到底在哪里执行——客户端还是Client还是JobManage…

Pillow:Python的图像处理库(安装与使用教程)

在Python中&#xff0c;Pillow库是一个非常强大的图像处理库。它提供了广泛的图像处理功能&#xff0c;让我们可以轻松地操作图像&#xff0c;实现图像的转换、裁剪、缩放、旋转等操作。此外&#xff0c;Pillow还支持多种图像格式的读取和保存&#xff0c;包括JPEG、PNG、BMP、…

白嫖idea

白嫖idea 地址 https://www.jetbrains.com/toolbox-app/

基于java Swing 和 mysql实现的购物管理系统(源码+数据库+说明文档+运行指导视频)

一、项目简介 本项目是一套基于java Swing 和 mysql实现的购物管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过…

使用WSL修改docker文件存储位置

按照以下说明将其重新定位到其他驱动器/目录&#xff0c;并保留所有现有的Docker数据。 首先&#xff0c;右键单击Docker Desktop图标关闭Docker桌面&#xff0c;然后选择退出Docker桌面&#xff0c;然后&#xff0c;打开命令提示符&#xff1a; wsl --list -v您应该能够看到&a…

Linux之Shell(一)

Linux之Shell Shell概述Linux提供的Shell解析器bash和sh的关系Centos默认的解析器是bash Shell脚本入门脚本格式第一个脚本脚本常用的执行方式 变量系统预定义变量自定义变量特殊变量$n$#\$*、\$$? 运算符条件判断流程控制(▲)if判断case语句for循环while循环 read读取控制台输…

如何判断一个java对象还活着

引用计数算法 引用计数器的算法是这样的&#xff1a;在对象中添加一个引用计数器&#xff0c;每当有一个地方引用它时&#xff0c;计数器值就加一&#xff1b;当引用失效时&#xff0c;计数器值就减一&#xff1b;任何时刻计数器为零的对象就是不可能再被使用的。 缺点&#x…

el-select范围选择框

1、html <el-select v-model"searchForm.hour" :class"searchForm.hour?.length>1?edit-tag-hour:keep-tag-hour" filterable multiple clearable :multiple-limit"2" remove-tag"removeChange" change"hourChange"…

Vscode画流程图

1.下载插件 Draw.id Integration 2.桌面新建文件&#xff0c;后缀名改为XXX.drawio 在vscode打开此文件 &#xff0c;就可以进行绘制流程图啦

音频基本知识

声音传播方式: 1)声音的传播需要介质,在真空中不能传播; 2)声波属于纵波,即如下图传播方向与振动方向一致; 声音速度: 1)常温常压下,一般空气速度为340m/s; 2)温度越高,声速越大; 3)液体、固体的传播速度比空气快; 人耳可接收到的频域范围: 1)通常范围…

Spring boot中调用C/C++(dll)

添加JNA依赖 <dependency><groupId>net.java.dev.jna</groupId><artifactId>jna</artifactId><version>5.5.0</version> </dependency>准备C代码/C代码 如下是C代码&#xff0c;文件名&#xff1a;xizi.c #include <std…

Python实战之数据表提取和下载自动化

在网络爬虫领域&#xff0c;动态渲染类型页面的数据提取和下载自动化是一个常见的挑战。本文将介绍如何利用Pyppeteer库完成这一任务&#xff0c;帮助您轻松地提取动态渲染页面中的数据表并实现下载自动化。 一、环境准备 首先&#xff0c;确保您已经安装了Python环境。接下来…

阔别线下三年的BIRTV影视盛会:有哪些变革式创新应用?

2023年8月26日&#xff0c;以“融合创新 面向未来”为主题的第三十届北京国际广播电影电视展览会&#xff08;BIRTV 2023&#xff09;收官。这是一场阔别线下三年的行业顶尖盛会&#xff0c;展馆处处人潮涌动。 接下来盘点一下&#xff0c;本次BIRTV的一些特色应用&#xff1a…

《vue3实战》通过indexOf方法实现电影评价系统的模糊查询功能

目录 前言 一、indexOf是什么&#xff1f;indexOf有什么作用&#xff1f; 含义&#xff1a; 作用&#xff1a; 二、功能实现 这段是查询过程中过滤筛选功能的代码部分: 分析&#xff1a; 这段是查询用户和性别功能的代码部分&#xff1a; 分析&#xff1a; 三、最终效…
最新文章