spark: 从pulsar中读取数据

一、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>pulsar-demo2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <pulsar.version>2.8.0</pulsar.version>
        <jackson.version>2.10.5</jackson.version>
        <!--<jackson.version>2.6.7</jackson.version>-->

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-all</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-kafka</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-spark</artifactId>
            <version>${pulsar.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-streaming_2.10</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

<!--

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>-->


        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>${jackson.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        
    </dependencies>

</project>

二、demo程序

package cn.edu.tju;

import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.spark.SparkStreamingPulsarReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.util.HashSet;
import java.util.Set;

public class SparkTest {
    public static void main(String[] args) throws Exception{
        String serviceUrl = "pulsar://xx.xx.xx.xx:6650/";
        String topic = "persistent://public/default/my-topic11";
        String subs = "test_sub";

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(4));

        ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData();

        Set<String> set = new HashSet<>();
        set.add(topic);
        pulsarConf.setTopicNames(set);
        pulsarConf.setSubscriptionName(subs);

        SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver(
                serviceUrl,
                pulsarConf,
                new AuthenticationDisabled());

        JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

        JavaDStream<String> resultStream = lineDStream.map(new Function<byte[], String>() {
            @Override
            public String call(byte[] bytes) throws Exception {
                return new String(bytes);
            }
        });

        resultStream.print();

        jsc.start();
        jsc.awaitTermination();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/497652.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++的非类型模板参数与模板分离编译(模板显式实例化)

非类型模板参数与模板分离编译&#xff08;模板显式实例化&#xff09; 文章目录 非类型模板参数与模板分离编译&#xff08;模板显式实例化&#xff09;前言一、非类型模板参数二、模版分离编译1. 分离编译概念2. 模版的分离编译问题案例解决方法 总结 前言 ​ 本篇博客文章介…

【python分析实战】成本:揭示电商平台月度开支与成本结构占比 - 过于详细 【收藏】

重点关注本文思路&#xff0c;用python分析&#xff0c;方便大家实验复现&#xff0c;代码每次都用全量的&#xff0c;其他工具自行选择。 全文3000字&#xff0c;阅读10min&#xff0c;操作1小时 企业案例实战欢迎关注专栏 每日更新&#xff1a;https://blog.csdn.net/cciehl/…

深度学习语义分割篇——DeepLabV2原理详解篇

&#x1f34a;作者简介&#xff1a;秃头小苏&#xff0c;致力于用最通俗的语言描述问题 &#x1f34a;专栏推荐&#xff1a;深度学习网络原理与实战 &#x1f34a;近期目标&#xff1a;写好专栏的每一篇文章 &#x1f34a;支持小苏&#xff1a;点赞&#x1f44d;&#x1f3fc;、…

Java八股文(数据结构)

Java八股文の数据结构 数据结构 数据结构 请解释以下数据结构的概念&#xff1a;链表、栈、队列和树。 链表是一种线性数据结构&#xff0c;由节点组成&#xff0c;每个节点包含了指向下一个节点的指针&#xff1b; 栈是一种后进先出&#xff08;LIFO&#xff09;的数据结构&a…

linux中查看内存占用空间

文章目录 linux中查看内存占用空间 linux中查看内存占用空间 使用 df -h 查看磁盘空间 使用 du -sh * 查看每个目录的大小 注意这里是当前目录下的文件大小&#xff0c;查看系统的可以回到根目录 经过查看没有发现任何大的文件夹。 继续下面的步骤 如果您的Linux磁盘已满&a…

安全上网,防止上网被记录(v2ray实现加密通信)

近期听一位亲威说&#xff0c;她在公司休闲的时候上了哪个网站&#xff0c;浏览了过的网站IT部门的人都会知道&#xff0c;这是因为现在大多数网络设备&#xff0c;像路由与交换机都有记录访问网站地址记录功能&#xff0c;涉及还可以设置成记录到交互的内容。要想保密&#xf…

第4章.精通标准提示,引领ChatGPT精准输出

标准提示 标准提示&#xff0c;是引导ChatGPT输出的一个简单方法&#xff0c;它提供了一个具体的任务让模型完成。 如果你要生成一篇新闻摘要。你只要发送指示词&#xff1a;汇总这篇新闻 : …… 提示公式&#xff1a;生成[任务] 生成新闻文章的摘要&#xff1a; 任务&#x…

5.6 物联网RK3399项目开发实录-Android开发之(wulianjishu666)

物联网入门到项目实干案例下载&#xff1a; https://pan.baidu.com/s/1fHRxXBqRKTPvXKFOQsP80Q?pwdh5ug --------------------------------------------------------------------------------------------------------------------------------- U-Boot 使用 前言 RK U-B…

机器学习-生存分析:基于QHScrnomo模型的乳腺癌患者风险评估与个性化预测

一、引言 乳腺癌作为女性常见的恶性肿瘤之一&#xff0c;对女性健康构成威胁。随着医疗技术的不断进步&#xff0c;个性化医疗逐渐成为乳腺癌治疗的重要方向。通过深入研究乳腺癌患者的风险评估和个性化预测&#xff0c;可以帮助医生更准确地制定治疗方案&#xff0c;提高治疗效…

【R语言从0到精通】-1-下载R语言与R最基础内容

在本科&#xff0c;没有人教的情况下&#xff0c;艰难的自学了R语言&#xff0c;因此我想能出一个R语言系列教程&#xff0c;在帮助大家的同时&#xff0c;温故而知新&#xff0c;特别如果你是生物或者医学从业者&#xff0c;那本教程正好合适&#xff0c;因为我也是生物人&…

【计算机网络篇】数据链路层(4.1)可靠传输的相关概念

文章目录 &#x1f354;可靠传输的相关概念⭐分组丢失⭐分组失序⭐分组重复 &#x1f95a;注意 &#x1f354;可靠传输的相关概念 使用差错检测技术&#xff08;例如循环冗余校验CRC&#xff09;&#xff0c;接收方的数据链路层就可以检测出帧在传输过程中是否产生了误码&…

Yarn简介及Windows安装与使用指南

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

“预防儿童烧烫伤”科普安全课堂走进嘉鱼县第一小学

为提高嘉鱼县儿童烧烫伤安全意识、隐患识别能力以及突发应急处置能力&#xff0c;3月26日下午&#xff0c;在中国社会福利基金会烧烫伤关爱公益基金、嘉鱼县妇女联合会、嘉鱼县教育局的支持下&#xff0c;嘉鱼县蒲公英社会工作服务中心走进嘉鱼县第一小学开展预防儿童烧烫伤科普…

Unity2018发布安卓报错 Exception: Gradle install not valid

Unity2018发布安卓报错 Exception: Gradle install not valid Exception: Gradle install not valid UnityEditor.Android.GradleWrapper.Run (System.String workingdir, System.String task, System.Action1[T] progress) (at <c67d1645d7ce4b76823a39080b82c1d1>:0) …

探索智慧农业精准除草,基于高精度YOLOv5全系列参数【n/s/m/l/x】模型开发构建农田作物场景下杂草作物分割检测识别分析系统

智慧农业是未来的一个新兴赛道&#xff0c;随着科技的普及与落地应用&#xff0c;会有更加广阔的发展空间&#xff0c;关于农田作物场景下的项目开发实践&#xff0c;在我们前面的博文中也有很堵相关的实践&#xff0c;单大都是偏向于目标检测方向的&#xff0c;感兴趣可以自行…

QT布局管理和空间提升为和空间间隔

QHBoxLayout&#xff1a;按照水平方向从左到右布局&#xff1b; QVBoxLayout&#xff1a;按照竖直方向从上到下布局&#xff1b; QGridLayout&#xff1a;在一个网格中进行布局&#xff0c;类似于HTML的table&#xff1b; 基本布局管理类包括&#xff1a;QBoxLayout、QGridL…

ubuntu编译OpenCV and seetaFace2

opencv opencv-4.5.2 opencv_contrib-4.5.2 SeetaFace2 SeetaFace2-master https://github.com/seetafaceengine 指定安装目录&#xff0c;和OpenCV放一个目录下了 安装前 安装 安装后 Qt安装 Windows下 Linux下 报错1 原因&#xff1a; 报错…

20221124 kafka实时数据写入Redis

一、上线结论 实现了将用户线上实时浏览的沉浸式视频信息&#xff0c;保存在Redis中这样一个功能。为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景&#xff0c;后续也能扩展到其他所有场景。用于两个场景&#xff1a;&#xff08;1&#xf…

【SQL】1661. 每台机器的进程平均运行时间 (四种写法;自连接;case when;窗口函数lead();)

前述 Sql窗口分析函数【lead、lag详解】 Hive 分析函数lead、lag实例应用 lag &#xff1a;用于统计窗口内往上第n行值lead &#xff1a;用于统计窗口内往下第n行值 lead(列名,1,0) over (partition by 分组列 order by 排序列 rows between 开始位置 preceding and 结束位置…

ChatGPT与传统搜索引擎的区别:智能对话与关键词匹配的差异

引言 随着互联网的快速发展&#xff0c;信息的获取变得比以往任何时候都更加便捷。在数字化时代&#xff0c;人们对于获取准确、及时信息的需求愈发迫切。传统搜索引擎通过关键词匹配的方式为用户提供了大量的信息&#xff0c;然而&#xff0c;这种机械式的检索方式有时候并不…
最新文章