SparkStreaming_window_sparksql_reids

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/**
  * 统计,截止到目前为止出现的每一个key的次数
  * window窗口操作,每个多长M时间,通过过往N长时间内产生的数据
  * M就是滑动长度sliding interval
  * N就是窗口长度window length
  */
object Demo05_WCWithWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("WordCountUpdateStateByKey")
      .setMaster("local[*]")
    val batchInterval = 2
    val duration = Seconds(batchInterval)
    val ssc = new StreamingContext(conf, duration)
    val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)
    val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​
    val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,
      windowDuration = Seconds(batchInterval * 3),
      slideDuration = Seconds(batchInterval * 2))
​
    ret.print()
​
    ssc.start()
    ssc.awaitTermination()
  }
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/**
 * SparkStreaming整合SparkSQL的案例之,热门品类top3排行
 * 输入数据格式:
 * id brand category
 * 1 huwei watch
 * 2 huawei phone
 *
 */
object Demo06_SQLWithStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingIntegerationSQL")
      .setMaster("local[*]")
    val batchInterval = 2
    val duration = Seconds(batchInterval)
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val ssc = new StreamingContext(spark.sparkContext, duration)
    ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")
    val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)
    //001 mi moblie
    val pairs:DStream[(String, Int)] = lines.map(line => {
      val fields = line.split("\\s+")
      if(fields == null || fields.length != 3) {
        ("", -1)
      } else {
        val brand = fields(1)
        val category = fields(2)
        (s"${category}_${brand}", 1)
      }
    }).filter(t => t._2 != -1)
​
    val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​
    usb.foreachRDD((rdd, bTime) => {
      if(!rdd.isEmpty()) {//category_brand count
        import spark.implicits._
        val df = rdd.map{case (cb, count) => {
          val category = cb.substring(0, cb.indexOf("_"))
          val brand = cb.substring(cb.indexOf("_") + 1)
          (category, brand, count)
        }}.toDF("category", "brand", "sales")
​
        df.createOrReplaceTempView("tmp_category_brand_sales")
        val sql =
          """
            |select
            |  t.category,
            |  t.brand,
            |  t.sales,
            |  t.rank
            |from (
            |  select
            |    category,
            |    brand,
            |    sales,
            |    row_number() over(partition by category order by sales desc) rank
            |  from tmp_category_brand_sales
            |) t
            |where t.rank < 4
            |;
                    """.stripMargin
        spark.sql(sql).show()
      }
    })
​
    ssc.start()
    ssc.awaitTermination()
  }
​
  def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {
    Option(seq.sum + option.getOrElse(0))
  }
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{
  val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可
  jedis.auth("root")
  jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/278643.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Redis交响乐】Redis中的通用命令

文章目录 1. 基本命令 get set2. 全局命令(1)keys(2)exists(3)del(4)expire && ttl面试题: redis中key的过期策略是怎么实现的?定时器的实现原理(1)基于优先级队列/堆(2)基于时间轮实现的定时器 (5) type 我们知道,redis是按照键值对的方式存储数据的. Redis中基本的命…

pytest pytest-emoji通过表情包展示执行状态

pytest-emoji 是一个用于在 Pytest 测试运行期间显示 emoji 表情的插件。它可以为测试结果添加一些有趣的表情符号&#xff0c;以增加测试报告的可读性和趣味性。 使用 pytest-emoji 插件非常简单&#xff0c;只需按照以下步骤进行操作&#xff1a; 首先&#xff0c;确保已经安…

python实现图像的二维傅里叶变换——冈萨雷斯数字图像处理

原理 二维傅里叶变换是一种在图像处理中常用的数学工具&#xff0c;它将图像从空间域&#xff08;我们通常看到的像素排列&#xff09;转换到频率域。这种变换揭示了图像的频率成分&#xff0c;有助于进行各种图像分析和处理&#xff0c;如滤波、图像增强、边缘检测等。 在数学…

Glary Utilities Pro - 电脑系统优化全面指南:详尽使用教程

软件简介&#xff1a; Glary Utilities Pro 是一款全面的电脑优化工具&#xff0c;它旨在帮助用户提升计算机的性能和稳定性。这款软件提供了多种功能&#xff0c;包括系统清理、优化、修复以及保护。通过一键扫描&#xff0c;它可以识别并清除无用文件、临时数据、注册表错误等…

云短信平台优惠活动 - 华为OD统一考试

OD统一考试 题解: Java / Python / C++ 题目描述 某云短信厂商,为庆祝国庆,推出充值优惠活动。 现在给出客户预算,和优惠售价序列,求最多可获得的短信总条数。 输入描述 第一行客户预算M,其中 0<=M<=100 第二行给出售价表,P1,P2,… Pn, 其中 1<=n<=100…

【网络技术】【Kali Linux】Wireshark嗅探(三)用户数据报(UDP)协议

一、实验目的 本次实验使用wireshark流量分析工具进行网络嗅探&#xff0c;旨在了解UDP协议的报文格式。 二、网络环境设置 本次实验使用Kali Linux虚拟机完成&#xff0c;主机操作系统为Windows 11&#xff0c;虚拟化平台选择Oracle VM VirtualBox&#xff0c;组网模式选择…

C# 使用ZXing.Net生成二维码和条码

写在前面 条码生成是一个经常需要处理的功能&#xff0c;本文介绍一个条码处理类库&#xff0c;ZXing用Java实现的多种格式的一维二维条码图像处理库&#xff0c;而ZXing.Net是其.Net版本的实现。 在WinForm下使用该类库需要从NuGet安装两个组件&#xff1a; ZXing.Net ZXing…

什么是计算机视觉

计算机视觉&#xff08;Computer Vision&#xff09;是一门研究如何让计算机能够理解和分析数字图像或视频的学科。简单来说&#xff0c;计算机视觉的目标是让计算机能够像人类一样对视觉信息进行处理和理解。为实现这个目标&#xff0c;计算机视觉结合了图像处理、机器学习、模…

软件测试/测试开发丨Selenium环境安装配置

一、selenium 环境配置 1、下载浏览器 目前比较常用的浏览器是 Google Chrome 浏览器&#xff0c;所以本教程以 chrome 为主&#xff0c;后面简介一下其他浏览器的环境配置。 chrome 下载: www.google.cn/chrome/ 2、chromedriver 环境配置 chromedriver 是chromedriver提…

鸿蒙HarmonyOS-图表应用

简介 随着移动应用的不断发展&#xff0c;数据可视化成为提高用户体验和数据交流的重要手段之一。在HarmonyOS应用开发中&#xff0c;一个强大而灵活的图表库是实现这一目标的关键。而MPChart就是这样一款图表库&#xff0c;它为开发者提供了丰富的功能和灵活性&#xff0c;使得…

大创项目推荐 深度学习乳腺癌分类

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度&#xff0c;召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

蓝牙物联网健康管理系统设计方案

随着我国医疗体制改革的快速发展&#xff0c;以及信息科技的更新换代&#xff0c;远程健康管理逐步成为医疗卫生健康服务的发展趋势。物联网技术推动着医疗健康服务体系发生重大改变&#xff0c;传统的定期至社区医院问诊的保健模式&#xff0c;被远程健康服务模式所取代。开发…

2023-12-29 服务器开发-centos部署ftp

摘要: 2023-12-29 服务器开发-centos-部署ftp 部署ftp vsftpd&#xff08;very secure FTP daemon&#xff09;是Linux下的一款小巧轻快、安全易用的FTP服务器软件。本教程介绍如何在Linux实例上安装并配置vsftpd。 前提条件 已创建ECS实例并为实例分配了公网IP地址。 背景…

Evidential Deep Learning to Quantify Classification Uncertainty

本片文章发表于NeurIPS 2018。 文章链接&#xff1a;https://arxiv.org/abs/1806.01768 一、概述 近年来&#xff0c;神经网络在不同领域取得了革命性的进步&#xff0c;尤其是在dropout、normalization以及skip connection等方法被提出之后&#xff0c;撼动了整个机器学习领…

鸿蒙4.0实战教学—基础ArkTS(简易视频播放器)

构建主界面 主界面由视频轮播模块和多个视频列表模块组成&#xff0c;效果图如图&#xff1a; VideoData.ets中定义的视频轮播图数组SWIPER_VIDEOS和视频列表图片数组HORIZONTAL_VIDEOS。 // VideoData.ets import { HorizontalVideoItem } from ./HorizontalVideoItem; impo…

楼宇对讲门铃选型分析

目前很多的高层住宅都使用了对讲门铃了&#xff0c;在频繁使用中&#xff0c;门铃会出现的越来越多种类&#xff0c;下面我就简单的介绍会有用到的几款芯片. 语音通话芯片&#xff1a;D34018,D34118,D5020,D31101; D34018 单片电话机通话电路&#xff0c;合并了必 需的放大器…

力扣刷题记录(21)LeetCode:121、123、188、309

目录 121. 买卖股票的最佳时机 123. 买卖股票的最佳时机 III 188. 买卖股票的最佳时机 IV 309. 买卖股票的最佳时机含冷冻期 如果某一天出售股票可以得到最大利润&#xff0c;那么股票买入的价格一定是这天之前股票的最低价格。 所以我们可以在遍历股票价格的时候不断更新股…

Jetson Orin安装riva以及llamaspeak,使用 Riva ASR/TTS 与 Llama 进行实时交谈,大语言模型成功运行笔记

NVIDIA 的综合语音 AI 工具包 RIVA 可以处理这种情况。此外&#xff0c;RIVA 可以构建应用程序&#xff0c;在本地设备&#xff08;如 NVIDIA Jetson&#xff09;上处理所有这些内容。 RIVA 是一个综合性库&#xff0c;包括&#xff1a; 自动语音识别 &#xff08;ASR&#x…

CSS 缩减中心动画

<template><!-- mouseenter"startAnimation" 表示在鼠标进入元素时触发 startAnimation 方法。mouseleave"stopAnimation" 表示在鼠标离开元素时触发 stopAnimation 方法。 --><!-- 容器元素 --><div class"container" mou…

002文章解读与程序——中国电机工程学报EI\CSCD\北大核心《计及源荷不确定性的综合能源生产单元运行调度与容量配置两阶段随机优化》已提供下载资源

&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;&#x1f446;下载资源链接&#x1f4…
最新文章