kafak消费数据,webSocket实时推送数据到前端

1.导入webSocket依赖

 <!--websocket依赖包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.编写webSocket类

package com.skyable.device.config.webSocket;


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * @author Administrator
 */
@ServerEndpoint("/vehicle/{domainId}")
@Component
@Slf4j
public class WebSocketServer {
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static final Set<Session> SESSIONS = new HashSet<>();


    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        log.info("webSocket link close");
    }

    /**
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        error.printStackTrace();
    }

    /**
     * 接收数据
     *
     * @param data
     */
    public static void sendDataToClients(String data) {
        for (Session session : SESSIONS) {
            try {
                session.getBasicRemote().sendText(data);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @OnOpen
    public void onOpen(Session session) {
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        /**
         * 接收domainId
         */
        SESSIONS.add(session);
        sendDataToClients();
    }


    public void sendDataToClients() {
        for (Session session : SESSIONS) {
            try {
                session.getBasicRemote().sendText("webSocket link succeed");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.skyable.device.config.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Administrator
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.kafak消费数据后调用webSocket方法

  /**
     * 获取kafka数据
     *
     * @param
     */
    @Override
    public void saveBatch(String jsonValue) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            //位置
            JsonNode jsonNode = objectMapper.readTree(jsonValue);
            if (jsonNode.has(VehicleConstant.LOCATION)) {
                RealTimePosition realTimePosition = new RealTimePosition();
                JsonNode locationNode = jsonNode.get("location");
                String vehicleId = locationNode.get("vehicleId").asText();
                double longitude = Double.parseDouble(locationNode.get("longitude").asText());
                double latitude = Double.parseDouble(locationNode.get("latitude").asText());
                long timeStamp = locationNode.get("timestamp").asLong();
                realTimePosition.setTimeStamp(timeStamp);
                realTimePosition.setLatitude(String.valueOf(latitude));
                realTimePosition.setLongitude(String.valueOf(longitude));
                realTimePosition.setVehicleId(vehicleId);
                VehicleLocationVo locationVo = deviceMapMapper.selectLonLat(vehicleId);
                if (!Objects.isNull(locationVo)) {
                    //计算距离
                    RedisUtil.addLocation(vehicleId, Double.parseDouble(locationVo.getLongitude()), Double.parseDouble(locationVo.getLatitude()), "l1");
                    RedisUtil.addLocation(vehicleId, longitude, latitude, "l2");
                    Double result = RedisUtil.calculateDistance(vehicleId, "l1", "l2");
                    Double meters = RedisUtil.convertMilesToKilometers(result);
                    DecimalFormat decimalFormat = new DecimalFormat("#.###");
                    String distance = decimalFormat.format(meters);
                    realTimePosition.setDistance(Double.parseDouble(distance));
                } else {
                    realTimePosition.setDistance(0);
                }
                //获取省份
                Map<String, Object> position = addressUtil.getPosition(longitude, latitude, null, null, null);
                Map data = (Map) position.get("data");
                String provinceName = data.get("shortname").toString().replaceAll("\"", "");
                realTimePosition.setArea(provinceName);
                deviceMapMapper.insertRealTimePosition(realTimePosition);
                RedisUtil.addZSetValue(VehicleConstant.VEHICLE_LOCATION, String.valueOf(vehicleId), timeStamp);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        try {
            //报警
            JsonNode jsonNode = objectMapper.readTree(jsonValue);
            if (jsonNode.has(VehicleConstant.ALERT)) {
                JsonNode alertNode = jsonNode.get("alert");
                String vehicleId = alertNode.get("vehicleId").asText();
                Integer alertType = alertNode.get("alertType").asInt();
                long timeStamp = alertNode.get("timestamp").asLong();
                Alerts alerts = new Alerts();
                alerts.setAlertType(alertType);
                alerts.setTimeStamp(timeStamp);
                alerts.setVehicleId(vehicleId);
                deviceMapMapper.insertAlerts(alerts);
                RedisUtil.addZSetValue(VehicleConstant.VEHICLE_ALERT, String.valueOf(vehicleId), timeStamp);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //webSocket发送消息
        VehicleAllVo vehicles = vehicles();
        WebSocketServer.sendDataToClients(vehicles.toString());
    }

4.发送消息内容

VehicleAllVo vehicles = vehicles();
该方法就是发送的具体内容

5.kafak消费者

package com.skyable.device.listener.Vehicle;

import com.alibaba.fastjson.JSON;
import com.skyable.common.config.CloudApplicationContext;
import com.skyable.common.constants.kafka.KafkaTopicConstants;
import com.skyable.device.config.webSocket.WebSocketServer;
import com.skyable.device.entity.vehicle.Vehicle;
import com.skyable.device.service.DeviceMapService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

/**
 * Description:
 *
 * @author yangJun
 * @date: 2023-08-18-14:12
 */
@Service
@Component
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class VehicleDataKafkaListener {
    private final DeviceMapService deviceMapService;

    @KafkaListener(topics = KafkaTopicConstants.TOPIC_VEHICLE_RECORD, groupId = "rx_1_thing", containerFactory = "batchFactory")
    public void dealDeviceDataToScript(List<ConsumerRecord<String, String>> recordList) {
        recordList.parallelStream()
                .map(ConsumerRecord::value)
                .forEach(this::saveVehicleData);
    }

    private void saveVehicleData(String jsonValue) {
        log.info("kafka data:" + jsonValue);
        deviceMapService.saveBatch(jsonValue);
    }
}
package com.skyable.device.listener.Vehicle;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName KafkaConsumerConfig
 * @Description Kafka消费者配置
 * @Author gaoy
 * @Date 2021/2/25 15:02
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPollRecords;


    /**
     * 批量消费工厂bean
     * @return
     */
    @Bean
    KafkaListenerContainerFactory batchFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new
                ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 开启批量监听
        factory.setBatchListener(true);
        factory.setConcurrency(concurrency);
        // 设置手动提交ackMode
        // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean
    public Map consumerConfigs() {
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        //设置每次接收Message的数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //开启幂等性。
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        return props;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/99173.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

爬虫逆向实战(二十二)--某恩数据电影票房

一、数据接口分析 主页地址&#xff1a;某恩数据 1、抓包 通过抓包可以发现数据接口是API/GetData.ashx 2、判断是否有加密参数 请求参数是否加密&#xff1f; 无请求头是否加密&#xff1f; 无响应是否加密&#xff1f; 通过查看“响应”模块可以发现&#xff0c;响应是…

团队多人共用一个WhatsApp是如何做到的?

WhatsApp是如今许多跨境企业用来跟客户进行沟通的重要聊天工具&#xff0c;但是在使用WhatsApp时有一个问题是比较突出的&#xff0c;企业一般拥有的WhatsApp账户是有限的&#xff0c;当很多客户同时上门咨询的话&#xff0c;客服就很难应对。但是如果能够实现团队多人共用一个…

VSCode下载、安装及配置、调试的一些过程理解

第一步先下载了vscode&#xff0c;官方地址为&#xff1a;https://code.visualstudio.com/Download 第二步安装vscode&#xff0c;安装环境是win10&#xff0c;安装基本上就是一步步默认即可。 第三步汉化vscode&#xff0c;这一步就是去扩展插件里面下载一个中文插件即可&am…

企业内部wiki的应用场景与岗位有哪些?有价值吗?

企业内部Wiki是一种基于Web的协作知识管理平台&#xff0c;它允许员工在一个统一的平台上创建、编辑和共享知识。它可以用于许多不同的场景和岗位&#xff0c;为企业提供了许多价值。下面将介绍企业内部Wiki的应用场景和岗位&#xff0c;并探讨其价值。 应用场景&#xff1a; …

解决uniapp下拉框 内容被覆盖的问题

1. 下拉框 内容被覆盖的问题 场景: 现在是下拉框被表格覆盖了 解决办法: 在表格上添加css 样式来解决这个问题 .add-table{display: static;overflow: visible; } display: static: 将元素会按照默认的布局方式进行显示&#xff0c;不会分为块状或行内元素。 overflow: vi…

设计模式-装饰模式

文章目录 一、简介二、基本概念三、装饰模式的结构和实现类图解析&#xff1a;装饰器的实现方式继承实现&#xff1a;组合实现&#xff1a;继承和组合对比 四、装饰模式的应用场景五、与其他模式的关系六、总结 一、简介 装饰模式是一种结构型设计模式&#xff0c;它允许动态地…

python节假日库holidays——查询国家节假日

节假日—计算某天是否为节假日 参考学习&#xff1a; ​ Python holidays模块 ​ Python实现节假日查询 ​ Python怎么获取节假日信息 pip install holidaysimport holidayscn_holidays holidays.CountryHoliday(CN) print(cn_holidays)from datetime import dateif date(…

【LeetCode】剑指 Offer <二刷>(3)

目录 题目&#xff1a;剑指 Offer 06. 从尾到头打印链表 - 力扣&#xff08;LeetCode&#xff09; 题目的接口&#xff1a; 解题思路&#xff1a; 代码&#xff1a; 过啦&#xff01;&#xff01;&#xff01; 题目&#xff1a;剑指 Offer 07. 重建二叉树 - 力扣&#xf…

2023.8.28日论文阅读

文章目录 NestFuse: An Infrared and Visible Image Fusion Architecture based on Nest Connection and Spatial/Channel Attention Models(2020的论文)本文方法 LRRNet: A Novel Representation Learning Guided Fusion Network for Infrared and Visible Images本文方法学习…

.netcore grpc截止时间和取消详解

一、截止时间概述 截止时间功能让 gRPC 客户端可以指定等待调用完成的时间。 超过截止时间时&#xff0c;将取消调用。 设定一个截止时间非常重要&#xff0c;因为它将提供调用可运行的最长时间。它能阻止异常运行的服务持续运行并耗尽服务器资源。截止时间对于构建可靠应用非…

【分享】PDF如何拆分成2个或多个文件呢?

当我们需要把一个多页的PDF文件拆分成2个或多个独立的PDF文件&#xff0c;可以怎么操作呢&#xff1f;这种情况需要使用相关工具&#xff0c;下面小编就来分享两个常用的工具。 1. PDF编辑器 PDF编辑器不仅可以用来编辑PDF文件&#xff0c;还具备多种功能&#xff0c;拆分PDF文…

Markdown Preview Plus Chrome插件使用

Markdown Preview Plus Chrome插件使用 1.插件说明2.插件下载3.插件配置4.文档样式4.1 网页显示4.2 导出PDF 系统&#xff1a;Win10 Chrome&#xff1a;113.0.5672.127 Markdown Preview Plus&#xff1a;0.7.3 1.插件说明 一般 markdown 工具自带的预览功能比较简单&#xff…

如何使用CRM系统进行精细化管理客户?

客户是企业的生命线&#xff0c;对客户进行精细化管理&#xff0c;是提高企业收益的关键。那么&#xff0c;如何进行客户管理&#xff1f;CRM系统可以实现精细化管理客户&#xff0c;提升客户的价值。下面我们就来详细说一说。 1、获取客户信息 Zoho CRM系统可以通过web表单、…

TS 入门

TS 入门 interface 约束作用数组的声明方式函数的定义联合类型、交叉类型、断言类型类的方面 interface 约束作用 数组的声明方式 函数的定义 联合类型、交叉类型、断言类型 类的方面 这是代码的地址&#xff1a; 代码的地址

Ansible学习笔记7

user模块&#xff1a; user模块用于管理用户账户和用户属性。 如果是windows要换一个win_user模块。 创建用户&#xff1a;present&#xff1a; [rootlocalhost ~]# ansible group1 -m user -a "nameaaa statepresent" 192.168.17.106 | CHANGED > {"ansi…

在VScode中执行npm、yarn命令报错解

在VScode中执行npm、yarn命令报错解 我使用的是vnm安装好npm&#xff0c;在WindowsR 界面是可以运行查看出版本的&#xff1b;但是在VScode中报错。 查了很多资料&#xff0c;我这种情况的原因是在VScode中默认使用的终端是Powershell&#xff0c;然后我切换到系统的cmd则可以…

11.添加侧边栏,并导入数据

修改CommonAside的代码&#xff1a; <template><div><el-menu default-active"1-4-1" class"el-menu-vertical-demo" open"handleOpen" close"handleClose":collapse"isCollapse"><!--<el-menu-it…

时序预测 | MATLAB实现基于PSO-LSTM、LSTM时间序列预测对比

时序预测 | MATLAB实现基于PSO-LSTM、LSTM时间序列预测对比 目录 时序预测 | MATLAB实现基于PSO-LSTM、LSTM时间序列预测对比效果一览基本描述程序设计参考资料 效果一览 基本描述 MATLAB实现基于PSO-LSTM、LSTM时间序列预测。 1.Matlab实现PSO-LSTM和LSTM神经网络时间序列预测…

UDS 29 认证服务

UDS协议定义了一套标准的诊断服务&#xff0c;包括会话控制、诊断请求、诊断响应和ECU编程等功能。通过UDS协议&#xff0c;诊断工具可以向ECU发送特定的请求&#xff0c;获取ECU的状态信息和故障码&#xff0c;诊断和解决故障问题。UDS是ISO 14229标准定义的一种通信协议&…

QT可执行程序打包成安装程序

目录 1.将QT程序先放到一个文件中 2.下载QtInstallerFramework-win-x86.exe 3.将setup.exe单独拷贝出来&#xff0c;进行安装测试 4.测试安装后的程序是否可执行 1.将QT程序先放到一个文件中 &#xff08;1&#xff09;QT切换到release模式&#xff0c;编译后在构建目录生…
最新文章