Kafka(四)消费者消费消息

文章目录

  • 如何确保不重复消费消息?
  • 消费者业务逻辑重试
  • 消费者提交
  • 自定义反序列化类
  • 消费者参数配置及其说明
    • 重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id
    • 增加消费者的吞吐量
    • 消费者消费的超时时间和poll()方法的关系
  • 消费者消费逻辑
  • 启动消费者
  • 关闭消费者
  • 配置listener
  • 结语
  • 示例源码仓库

在 上一篇文章里,对于生产者,发送时失败之后会由定时任务进行重新发送, 并且我们是根据消息的key进行分区的, 所以不管我们重新发送了多少次,对于同一个key,始终会被送到同一个分区

那么到消费者这里,最重要的问题是如何确保不会重复消费之前因为各种原因被重新发送到某个分区的消息。

如何确保不重复消费消息?

基本思路如下

  1. 我们在数据库中创建了一个已成功消费的消息表,里面只有一列,消息的key。当消费者消费逻辑成功之后,我们会把其key保存到这张表里 。
  2. 当消费者拉取新的一批消息时,我们会去数据库的消息表里查是否已经存在该消息的key,存在的话,就跳过实际的消费业务。
  3. 一批消息里也可能存在相同的key,所以我们处理完一次消费业务,就把该key放到一个set里,消费下一条消息时,则先去set里看一下,存在的话即跳过,不存在则正常执行消费业务。即使前面的消息消费业务失败了,后面相同key的消息也直接跳过,不会再次消费

消费者业务逻辑重试

对于消费者业务逻辑的重试,我们使用failsafe框架进行重试,该框架的使用可参考官方文档,这里不做过多赘述。

消费者提交

这里的方式采用的是Kafka权威指南中消费者一章中提出的方式。 异步+同步。平时使用异步提交,在关闭消费者时,使用同步提交,确保消费者退出之前将当前的offset提交上去。

自定义反序列化类

在生产者端,我们发送自定义的对象时,利用自定义序列化类将其序列化为JSON。在消费者端,我们同样需要自定义反序列类将JSON转为我们之前的对象

public class UserDTODeserializer implements Deserializer<UserDTO> {
    
    @Override
    @SneakyThrows
    public UserDTO deserialize(final String s, final byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(bytes, UserDTO.class);
    }
}

消费者参数配置及其说明

    /**
     * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景需求 自己调整
     * https://kafka.apache.org/26/documentation/#group.instance.id
     *
     * 为什么需要group.instance.id?
     * 假设auto.offset.reset=latest
     * 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会【丢失这部分消息】
     * 假如auto.offset.reset=earliest
     * 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会重复消费【全部消息】
     *
     * 光有group.instance.id还不够,还需要修改heartbeat.interval.ms和session.timeout.ms的值为合理的值
     * 如果程序部署,重启期间,重启时间超过了session.timeout.ms的值,那么kafka会认为此消费者已经挂了会触发rebalance,在一些大型消息场景,rebalance的过程可能会很慢, 更详细的解释请参考
     * https://kafka.apache.org/26/documentation/#static_membership
     * @param groupInstanceId
     * @return
     */
    public static Properties loadConsumerConfig(int groupInstanceId, String valueDeserializer) {
        Properties result = new Properties();
        result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9093");
        result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        result.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 代表此消费者是消费者组的static member
        result.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "test-" + ++groupInstanceId);
        // 修改heartbeat.interval.ms和session.timeout.ms的值,和group.instance.id配合使用,避免重启或重启时间过长的时候,触发rebalance
        result.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 60);
        result.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 60 * 5);
        // 关闭自动提交
        result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
        // 默认1MB,增加吞吐量,其设置对应的是每个分区,也就是说一个分区返回10MB的数据
        result.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576 * 10);
        result.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        // 返回全部数据的大小
        result.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1048576 * 100);
        // 默认5分钟
        result.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 60 * 5);
        return result;
    }

重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id

三者的使用方式见上面代码中的注释。

增加消费者的吞吐量

和上一篇文章一样,由于我们的邮件消息每个大概是20KB,使用默认的消费者参数,吞吐量是上不来的。 所以做了一些优化,除了消费者消费逻辑要尽可能简单之外,为了增加消费者的吞吐量,可以根据实际场景修改倒数第4、3、2个参数。

消费者消费的超时时间和poll()方法的关系

由max.poll.interval.ms参数控制,默认5分钟。如果消费者业务逻辑处理特别耗时,在5分钟之内没有再次调用poll()拉取消息,则Kafka认为消费者已死,根据具体配置会立刻触发rebalance还是等一段时间再触发rebalance。

这里特别强调一下,网上有一部分文章说是要确保消费逻辑在poll(timeUnit)时间内处理完,否则就会触发rebalance。这都是很早之前的Kafka版本了,是因为原来消费者的poll()线程和心跳线程使用的是同一个线程。现在的版本早就把这两个分开了。所以你只需要注意,自己的消费逻辑别超过max.poll.interval.ms即可,如果觉得不够用,也可自己调整。

poll()方法中的时间代表的是多长时间去拉取一次消息。假设你设置的是1分钟,你的消费逻辑处理的很快,可能用了10s。那么在你消费完了之后,消费者会在1分钟之后拉取新消息。

在消费者中使用手动提交。

消费者消费逻辑

这里要注意

  1. 如果消费逻辑可能抛出异常,则使用try-catch处理,防止因为抛出异常,导致我们错误的关闭了消费者
  2. 消费者消费逻辑失败时会重试,重试N次之后,我们会将其保存在数据库中,以便和生产者一样,定时处理失败的消息
  3. 消费逻辑没问题的话,则把该消息的key进行入库处理
@Log
public class MessageConsumerRunner implements Runnable {
    
    private final AtomicBoolean closed = new AtomicBoolean(false);
    
    private MessageAckConsumesSuccessService messageAckConsumesSuccessService = new MessageAckConsumesSuccessService();
    
    private MessageFailedService messageFailedService = new MessageFailedService();
    
    private final KafkaConsumer<String, UserDTO> consumer;
    
    private final int consumerPollIntervalSecond;
    
    public MessageConsumerRunner(KafkaConsumer<String, UserDTO> consumer, int consumerPollIntervalSecond) {
        this.consumer = consumer;
        this.consumerPollIntervalSecond = consumerPollIntervalSecond;
    }
    
    /**
     * 1. 使用https://failsafe.dev/进行重试
     * 2. 每次消费消息前,判断消息ID是否存在于数据库中和当前Set集合中,避免重复消费,
     *    我们的消息时根据消息的key进行hash分区的,所以同一个消息即使生产多次,一定会到同一个partition中,partition动态增加引起的特殊情况不在考虑范围之内
     * 4. 在一次消费消息中重试两次,如果两次都失败,那么将失败原因、消息的JSON字符串插入到message_failed表中,以便后续再次生产或排查问题
     * 3. 平时异步提交,关闭消费者时使用同步提交
     */
    @Override
    public void run() {
        AtomicReference<String> errorMessage = new AtomicReference<>(StringUtils.EMPTY);
        RetryPolicy<Boolean> retryPolicy = RetryPolicy.<Boolean>builder()
            .handle(Exception.class)
            // 如果业务逻辑返回false或者抛出异常,则重试
            .handleResultIf(Boolean.FALSE::equals)
            // 不包含首次
            .withMaxRetries(2)
            .withDelay(Duration.ofMillis(200))
            .onRetry(e -> log.warning("consume message failed, start the {}th retry"+ e.getAttemptCount()))
            .onRetriesExceeded(e -> {
                Optional.ofNullable(e.getException()).ifPresent(u -> errorMessage.set(u.getMessage()));
                log.severe("max retries exceeded" + e.getException());
            })
            .build();
        Fallback<Boolean> fallback = Fallback.<Boolean>builder(e -> {
            // do nothing, suppress exceptions
        }).build();
        try {
            consumer.subscribe(Collections.singletonList("email"));
            while (!closed.get()) {
                // get message from kafka
                ConsumerRecords<String, UserDTO> records = consumer.poll(Duration.ofSeconds(consumerPollIntervalSecond));
                if (records.isEmpty()) {
                    return;
                }
                Set<UserDTO> successConsumed = new HashSet<>();
                Set<UserDTO> failedConsumed = new HashSet<>();
                Map<String, String> failedConsumedReason = new HashMap<>();
                // check message if exist in database
                Set<String> checkingMessageIds = new HashSet<>(records.count());
                records.iterator().forEachRemaining(item -> checkingMessageIds.add(item.value().getMessageId()));
                Set<String> hasBeenConsumedMessageIds = messageAckConsumesSuccessService.checkMessageIfExistInDatabase(checkingMessageIds);
                records.forEach(item -> {
                    if (hasBeenConsumedMessageIds.contains(item.value().getMessageId())) {
                        // if exist, continue
                        return;
                    }
                    // 每一批消息中也可能存在同样的消息,所以需要再次判断
                    hasBeenConsumedMessageIds.add(item.value().getMessageId());
                    try {
                        Failsafe.with(fallback, retryPolicy)
                            .onSuccess(e -> successConsumed.add(item.value()))
                            .onFailure(e -> {
                                failedConsumed.add(item.value());
                                failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(errorMessage.get()) ? errorMessage.get() : "no reason, may be check server log");
                                errorMessage.set(StringUtils.EMPTY);
                            })
                            .get(() -> {
                                // 这里是业务逻辑,可以返回true或false,为什么要这样?是因为上面RetryPolicy这里定义的boolean,根据自己实际业务设置相应的类型
                                return true;
                            });
                        // 这里要catch住所有业务异常,防止由业务异常导致消费者线程退出
                    }catch (Exception e) {
                        log.severe("failed to consume email message" + e);
                        failedConsumed.add(item.value());
                        failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(e.getMessage()) ? e.getMessage() : e.getCause().toString());
                    }
                });
                postConsumed(successConsumed, failedConsumed, failedConsumedReason);
                // 平时使用异步提交
                consumer.commitAsync();
            }
        }catch (WakeupException e) {
            if (!closed.get()) {
                throw e;
            }
        } finally {
            // 消费者退出时使用同步提交
            try {
                consumer.commitSync();
            } catch (Exception e) {
                log.info("commit sync occur exception: " + e);
            } finally{
                try {
                    consumer.close();
                }catch (Exception e) {
                    log.info("consumer close occur exception: " + e);
                }
                log.info( "shutdown kafka consumer complete");
            }
        }
    }
    
    /**
     * 处理成功、成功后的回调、失败
     * @param successConsumed
     * @param failedConsumed
     * @param failedConsumedReason
     */
    private void postConsumed(Set<UserDTO> successConsumed, Set<UserDTO> failedConsumed, Map<String, String> failedConsumedReason) {
        // 后置处理开启异步线程处理,不阻塞消费者线程
        
        // 克隆传进来的集合,而不使用原集合的引用,因为原集合每次消费都会重置
        Set<UserDTO> cloneSuccessConsumed = new HashSet<>(successConsumed);
        Set<UserDTO> cloneFailedConsumed = new HashSet<>(failedConsumed);
        Map<String, String> cloneFailedConsumedReason = new HashMap<>(failedConsumedReason);
        new Thread( () -> {
            if (!cloneSuccessConsumed.isEmpty()) {
                messageAckConsumesSuccessService.insertMessageIds(cloneSuccessConsumed.stream().map(UserDTO::getMessageId).collect(Collectors.toSet()));
                cloneFailedConsumed.forEach(item -> {
                    if (Objects.nonNull(item.getCallbackMetaData())) {
                        // do callback
                        CallbackProducer callbackProducer = new CallbackProducer();
                        callbackProducer.sendCallbackMessage(item.getCallbackMetaData(), MessageFailedPhrase.PRODUCER);
                    }
                });
            }
            if (!cloneFailedConsumed.isEmpty()) {
                ObjectMapper objectMapper = new ObjectMapper();
                cloneFailedConsumed.forEach(item -> {
                    MessageFailedEntity entity = new MessageFailedEntity();
                    entity.setMessageId(item.getMessageId());
                    entity.setMessageType(MessageType.EMAIL);
                    entity.setMessageFailedPhrase(MessageFailedPhrase.CONSUMER);
                    entity.setFailedReason(cloneFailedConsumedReason.get(item.getMessageId()));
                    try {
                        entity.setMessageContentJsonFormat(objectMapper.writeValueAsString(item));
                    } catch (JsonProcessingException e) {
                        log.info("failed to convert UserDTO message to json string");
                    }
                    messageFailedService.saveOrUpdateMessageFailed(entity);
                });
            }
        }).start();
    }
    
    public void shutdown() {
        log.info( Thread.currentThread().getName() + " shutdown kafka consumer");
        closed.set(true);
        consumer.wakeup();
    }
}

启动消费者

通过实现ServletContextListener接口对于方法使其在Tomcat启动之后,启动消费者

public class StartUpConsumerListener implements ServletContextListener {
    
    
    /**
     * 假设开启10个消费者.
     *
     * 消费者的数量要和partition的数量一致,实际情况下,可以调用AdminClient的方法获取到topic的partition数量,然后根据partition数量来创建消费者.
     * @param sce
     */
    @Override
    public void contextInitialized(final ServletContextEvent sce) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new AbortPolicy());
        for (int i = 0; i < 10; i++) {
            KafkaConsumer<String, UserDTO> consumer = new KafkaConsumer<>(KafkaConfiguration.loadConsumerConfig(i, UserDTO.class.getName()));
            MessageConsumerRunner messageConsumerRunner = new MessageConsumerRunner(consumer, 10);
            // 使用另外一个线程来关闭消费者
            Thread shutdownHooks = new Thread(messageConsumerRunner::shutdown);
            KafkaListener.KAFKA_CONSUMERS.add(shutdownHooks);
            // 启动消费者线程
            threadPoolExecutor.execute(messageConsumerRunner);
        }
    }
}

关闭消费者

public class KafkaListener implements ServletContextListener {
    
    public static final Vector<Thread> KAFKA_CONSUMERS = new Vector<>();

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        // do noting
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        KAFKA_CONSUMERS.forEach(Thread::run);
    }
}

配置listener

<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
                      https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
         version="6.0">
  <display-name>Kafka消息的消费者-消息系统</display-name>

<!--  listener的contextInitialized顺序按照声明顺序执行, contextDestroyed方法按照声明顺序反向执行-->
  <listener>
    <listener-class>com.message.server.listener.KafkaListener</listener-class>
  </listener>

  <listener>
    <listener-class>com.message.server.listener.StartUpConsumerListener</listener-class>
  </listener>
</web-app>

结语

  1. 在处理消费者相关逻辑时,我们重点关心如何确保消息不重复消费以及如何增加消费者的吞吐量
  2. 消费逻辑尽可能保证处理速度快,尽量减少耗时的逻辑

示例源码仓库

  1. Github地址
  2. 项目下message-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述

我们生产者和消费者的正常情况都以处理完了,下一篇文章我们将重点处理生产者失败和消费者失败之后重新生产消息和消费消息的逻辑,以及简单说一下Kafka中的rebalance。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/163217.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

遗传算法GA-算法原理与算法流程图

本站原创文章&#xff0c;转载请说明来自《老饼讲解-BP神经网络》bp.bbbdata.com 目录 一、遗传算法流程图 1.1. 遗传算法流程图 二、遗传算法的思想与机制 2.1 遗传算法的思想 2.2 遗传算法的机制介绍 三、 遗传算法的算法流程 3.1 遗传算法的算法…

PXE高效批量网络装机

目录 一.PXE 1. 系统装机的三种引导方式 2. 系统安装过程 3. 光盘安装相关文件 4. PXE简介 5. 实现过程 6. PXE优点 二.PXE实现过程 1. 实验准备 2. 搭建DHCP服务器 3. 配置TFTP服务器 4. 准备pxelinu.0文件 5. 挂载镜像准备内核、驱动文件 6. 手写配置文件 7. 准…

强烈 推荐 13 个 Web前端在线代码IDE

codesandbox.io&#xff08;国外&#xff0c;提供免费空间&#xff09; 网址&#xff1a;https://codesandbox.io/ CodeSandbox 专注于构建完整的 Web 应用程序&#xff0c;支持多种流行的前端框架和库&#xff0c;例如 React、Vue 和 Angular。它提供了一系列增强的功能&…

springboot项目中获取业务功能的导入数据模板文件

场景: 在实际业务场景中,经常会遇到某些管理功能需要数据导入共功能,但既然是导入数据,肯定会有规则限制,有规则就会有数据模板,但这个模板一般是让客户自己下载固定规则模板,而不是让客户自己随便上传模板。下面介绍直接下载模板 一、下载模板示例 1、在项目的…

信安.网络安全.UDP协议拥塞

第一部分 如何解决UDP丢包问题 一、UDP 报文格式 每个 UDP 报文分为 UDP 报头和 UDP 数据区两部分。报头由 4 个 16 位长&#xff08;2 字节&#xff09;字段组成&#xff0c;分别说明该报文的源端口、目的端口、报文长度和校验值。UDP 报文格式如图所示。 UDP 报文中每个…

前端性能优化之LightHouse

优质博文&#xff1a;IT-BLOG-CN 一、LightHouse环境搭建 LightHouse是一款由Google开发的开源工具&#xff0c;用于评估Web应用程序的性能和质量。可以将其看作是一个Chrome扩展程序运行&#xff0c;或从命令行运行。为LightHouse提供一个需要审查的网址&#xff0c;它将针对…

基于django水果蔬菜生鲜销售系统

基于django水果蔬菜生鲜销售系统 摘要 基于Django的水果蔬菜生鲜销售系统是一种利用Django框架开发的电子商务平台&#xff0c;旨在提供高效、便捷的购物体验&#xff0c;同时支持水果蔬菜生鲜产品的在线销售。该系统整合了用户管理、产品管理、购物车、订单管理等核心功能&…

springboot引入第三方jar包放到项目目录中,添加web.xml

参考博客&#xff1a;https://www.cnblogs.com/mask-xiexie/p/16086612.html https://zhuanlan.zhihu.com/p/587605618 1、在resources目录下新建lib文件夹&#xff0c;将jar包放到lib文件夹中 2、修改pom.xml文件 <dependency><groupId>com.lanren312</grou…

【C++】【Opencv】cv::warpAffine()仿射变换函数详解,实现平移、缩放和旋转等功能

仿射变换是一种二维变换&#xff0c;它可以将一个二维图形映射到另一个二维图形上&#xff0c;保持了图形的“形状”和“大小”不变&#xff0c;但可能会改变图形的方向和位置。仿射变换可以用一个线性变换矩阵来表示&#xff0c;该矩阵包含了六个参数&#xff0c;可以进行平移…

数据库课后习题加真题

文章目录 第二章第三章第四到六章某年真题 第二章 第三章 3.8 对于教学数据库的三个基本表&#xff1a; s( 学号 ‾ \underline{学号} 学号​&#xff0c;姓名&#xff0c;年龄, 性别) sc( 学号 , 课程号 ‾ \underline{学号, 课程号} 学号,课程号​, 成绩) c( 课程号 ‾ \un…

移动端表格分页uni-app

使用uni-app提供的uni-table表格 网址&#xff1a;https://uniapp.dcloud.net.cn/component/uniui/uni-table.html#%E4%BB%8B%E7%BB%8D <uni-table ref"table" :loading"loading" border stripe type"selection" emptyText"暂无更多数据…

Java 省考试院自学考试考籍管理系统

1) 项目简介 考籍管理系统是省考试院自学考试管理系统的一部分&#xff0c;包括考生考籍档案管理、考生免考管理、课程顶替、考籍转入转出管理、毕业管理和日志管理等功能模块。该项目的建设方便和加强了省考试院对自学考试考籍的一系列管理操作&#xff0c;社会效应明显。…

为什么软件公司很少用Python开发Web项目?

实际上&#xff0c;Python在Web开发方面有着广泛的应用&#xff0c;许多软件公司也确实使用Python来开发Web项目。 Python拥有诸如Django、Flask等流行的Web框架&#xff0c;这些框架使得开发者能够迅速、高效地开发出Web应用。 然而&#xff0c;Python在Web开发中的使用可能会…

深度学习——(生成模型)DDPM

前置数学知识 1、先验概率和后验概率 先验概率&#xff1a;根据以往经验和分析得到的概率,它往往作为“由因求果”问题中的“因”出现&#xff0c;如 q ( x t ∣ x t − 1 ) q(x_t|x_{t-1}) q(xt​∣xt−1​) 后验概率&#xff1a;指在得到“结果”的信息后重新修正的概率,是…

LeetCo

题目描述如下&#xff1a; 罗马数字包含以下七种字符: I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 M …

component 动态组件的用法

一&#xff1a;前言 <component></component> 标签是Vue框架自定义的标签&#xff0c;它的用途就是可以动态绑定我们的组件&#xff0c;根据数据的不同需求来更换使用不同的组件。 在最上方的图片中&#xff0c;就是使用的 Element Plus 的 Tags 组件&#xff0c;根…

golang学习笔记——接口

文章目录 Go 语言接口例子空接口空接口的定义空接口的应用空接口作为函数的参数空接口作为map的值 类型断言接口值 类型断言例子001类型断言例子002 Go 语言接口 接口&#xff08;interface&#xff09;定义了一个对象的行为规范&#xff0c;只定义规范不实现&#xff0c;由具…

Codeforces Round #909 (Div. 3)

A. Game with Integers 签到题&#xff0c;但是本蒟蒻11分钟才AC&#xff0c;主要还是英文题面不熟练&#xff0c;题目中加粗了after&#xff0c;只有下一步操作之后能被整除才胜利。 英文题面的加粗单词很重要&#xff0c;注意提高签到题速度。 B. 250 Thousand Tons of TNT…

C语言的由来与发展历程

C语言的起源可以追溯到上世纪70年代&#xff0c;由Dennis Ritchie在贝尔实验室开发出来。C语言的设计目标是提供一种简洁、高效、可移植的编程语言&#xff0c;以便于开发底层的系统软件。在那个时代&#xff0c;计算机技术正在迅速发展&#xff0c;出现了多种高级编程语言&…

05-Spring Boot工程中简化开发的方式Lombok和dev-tools

简化开发的方式Lombok和dev-tools Lombok常用注解 Lombok用标签方式代替构造器、getter/setter、toString()等重复代码, 在程序编译的时候自动生成这些代码 注解名功能NoArgsConstructor生成无参构造方法AllArgsConstructor生产含所有属性的有参构造方法,如果不希望含所有属…
最新文章