Flink 应用案例——求网页访问量Top N 实时计算(附可执行代码)

在学习了Flink之后,笔者通过以下案例对Flink API 进行简单复习

目录

案例要求

前置准备

编写主程序(点此跳转至代码)

运行截图


案例要求

以下数据 为某网站的访问日志 现要求通过以下数据 统计出最近10s内最热门的N个页面(即url链接),并且每5s更新一次;即求出页面访问量中的TOP N

{user='Alice', url='./home', time=1679043205254}

{user='Alice', url='./prod?id=1', time=1679043206254}

{user='Mary', url='./prod?id=2', time=1679043207256}

{user='Cary', url='./fav', time=1679043208257}

{user='Mary', url='./home', time=1679043209258}

{user='Cary', url='./prod?id=2', time=1679043210259}

......

前置准备

为方便编写程序 我们把以上数据封装为Event类

Event.class

package com.flink.wc.myflink.bean;

public class Event {
    public String user;

    public String url;
    public long time;

    public Event() {
    }

    public Event(String user, String url, long time) {
        this.user = user;
        this.url = url;
        this.time = time;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", time=" + time +
                '}';
    }
}

同时我们需要自定义源算子 模拟网站实时访问日志

ClickSource.class

package com.flink.wc.myflink.source_sink;

import com.flink.wc.myflink.bean.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class ClickSource  implements SourceFunction<Event> {

    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();
        String[] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home", "./cart","./fav", "./prod?id=1","./prod?id=2"};

        while (running) {
            ctx.collect(new Event(
                users[random.nextInt(users.length)],      // user 和 url 随机组合
                urls[random.nextInt(urls.length)],
                Calendar.getInstance().getTimeInMillis()  //getTimeInMillis 方法返回当前时间
            ));

            // 在这个循环中 每隔一秒 就collect(发送)一个数据
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

为方便程序输出,我们把输出结果封装为UrlViewCount类

UrlViewCount.class

package com.flink.wc.myflink.bean;

public class UrlViewCount {
    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
        this.url = url;
        this.count = count;
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
    }

    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", windowStart=" + windowStart +
                ", windowEnd=" + windowEnd +
                '}';
    }
}

做好了以上前置工作 下面我们开始编写主程序 求y页面pv的TOP N

编写主程序

大致分为以下三个步骤:

读取源数据 设置水位线

首先通过url进行聚合统计

再根据聚合统计的结果进行排序 求出Top N

详细步骤如下:

  • 读取数据 设置水位线
  • 根据url进行keyby分组
  • 设置窗口大小为10s 每5s 执行一次 设置窗口函数 统计各url被访问量
    • 通过 AggregateFunction 进行聚合统计
    • 通过 ProcessWindowFunction 输出聚合统计的结果 和 窗口信息 以便后续计算
  • 根据窗口时间进行keyby分组
  • 在窗口范围内对各数据进行排序
    • 来一个数据就把数据存储在状态变量中 (要利用到之前的数据进行排序 因此要用到状态变量)
    • 来一个数据就根据windowsEnd 注册定时器 (根据定时器的特性 同一个key的同一个时间戳不会被重复注册 因此这里相同的windowsEndtime只会只执行一次定时器,即一个窗口只会执行一次定时器)
      • 触发定时器
      • 触发定时器则说明在该水位线之前的数据已经全部到达 全部存储在了状态变量中
      • 把状态变量列表中的值全部赋值给java中List列表 进行排序
      • 将排序后的结果输出 out.collect()

 代码如下:

TopnTest.class
package com.flink.wc.myflink.process;

// 这三个是自定义的java类
import com.flink.wc.myflink.bean.Event;
import com.flink.wc.myflink.bean.UrlViewCount;
import com.flink.wc.myflink.source_sink.ClickSource;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;

public class TopnTest {

    public static void main(String[] args) throws Exception {
        
        // step1 读取数据 并设置水位线
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<Event> streamSource = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> stream01 = streamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (x, l) -> x.time)
        );


        // step2 聚合统计 每个url的访问量
        SingleOutputStreamOperator<UrlViewCount> stream02 = stream01
                .keyBy(x -> x.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        stream02.print("stream02");


        // step3 合并排序各个url的访问量 计算出Top N
        stream02.keyBy(x -> x.windowEnd)
                .process(new TopN(2))
                .print();

        env.execute();

    }


    // 累加器 AggregateFunction<IN, ACC, OUT> extends Function, Serializable
    private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event event, Long aLong) {
            return aLong + 1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;       // 返回UrlViewCountAgg 的结果 这里是url的计数count
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return null;
        }
    }

    // 增量聚合函数的 getResult作为 全窗口函数的输入
    // just 为了输出窗口信息 ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction
    private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {

        @Override
        // 这里的迭代器对象 Iterable<Long> 就是增量聚合函数UrlViewCountAgg 中累加器聚合的结果,即每个url的count
        public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {

            Long start = context.window().getStart();
            Long end = context.window().getEnd();
                                            // 这里的迭代器只有一个元素 就是聚合函数中增量聚合的结果
            out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
        }
    }

    private static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {

        private Integer n;
        private ListState<UrlViewCount> urlViewCountListState;

        public TopN(Integer n){
            this.n = n;
        }

        @Override
        public void open(Configuration parmeters) throws Exception{
            // 从环境中获取列表状态句柄
            urlViewCountListState =
                    getRuntimeContext()
                            .getListState(new ListStateDescriptor<UrlViewCount>("url-view-count-list", Types.POJO(UrlViewCount.class)));
        }

        @Override
        public void processElement(UrlViewCount value, KeyedProcessFunction<Long, UrlViewCount, String>.Context ctx, Collector<String> out) throws Exception {
            // 来一个数据就加入状态列表中 UrlViewCount{url='./prod?id=2', count=3, windowStart=1678676045000, windowEnd=1678676055000}
//            System.out.println("value: " + value);
            urlViewCountListState.add(value);

            // 这个key是windowEnd 同一个窗口end 说明在同一个窗口 就在这个窗口排序 同一个定时器 过了时间windowEnd+1就执行
            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
//            System.out.println("ctx.getCurrentKey():" + ctx.getCurrentKey());
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{
            ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();

            for (UrlViewCount urlViewCount : urlViewCountListState.get()){
                urlViewCountArrayList.add(urlViewCount);
            }

            urlViewCountListState.clear(); // 一个窗口内的数据已经全部赋值给了列表 把这个状态列表清空 让他去装其他窗口的数据(状态)

            // 接下来就对该窗口的列表进行排序咯
            // 也可以写为lambda表达式
            urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
                @Override
                public int compare(UrlViewCount o1, UrlViewCount o2) {
                    return o2.count.intValue() - o1.count.intValue();
                }
            });

//            System.out.println("urlViewCountArrayList:" + urlViewCountArrayList);
            StringBuilder result = new StringBuilder();

            result.append("=================================================\n");
            result.append("窗口结束时间:").append(new Timestamp(timestamp - 1)).append("\n");
            for (int i = 0; i < this.n; i=i+1){         // 这个循环 关于i 有点问题!!!
                UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);
                String info ="No." + (i +1) + " "
                        + "url:" + UrlViewCount.url + " "
                        +"浏览量:" + UrlViewCount.count + "\n";
                result.append(info);
            }
            result.append("=================================================\n");
            out.collect(result.toString());
        }
    }
}

运行截图

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/1024.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【3.17】MySQL索引整理、回溯(分割、子集问题)

3.1 索引常见面试题 索引的分类 什么是索引&#xff1f; 索引是一种数据结构&#xff0c;可以帮助MySQL快速定位到表中的数据。使用索引&#xff0c;可以大大提高查询的性能。 按「数据结构」分类&#xff1a;Btree索引、Hash索引、Full-text索引。 InnoDB 存储引擎创建的聚簇…

漫画:什么是快速排序算法?

这篇文章&#xff0c;以对话的方式&#xff0c;详细着讲解了快速排序以及排序排序的一些优化。 一禅&#xff1a;归并排序是一种基于分治思想的排序&#xff0c;处理的时候可以采取递归的方式来处理子问题。我弄个例子吧&#xff0c;好理解点。例如对于这个数组arr[] { 4&…

优思学院|六西格玛DMAIC,傻傻搞不清?

DMAIC还是搞不清&#xff1f; DMAIC是一个用于过程改进和六西格玛的问题解决方法论。它是以下五个步骤的缩写&#xff1a; 定义&#xff08;Define&#xff09;&#xff1a;明确问题&#xff0c;设定项目的目标和目的。绘制流程图&#xff0c;并收集数据&#xff0c;以建立未来…

基于bearpi的智能小车--Qt上位机设计

基于bearpi的智能小车--Qt上位机设计 前言一、界面原型1.主界面2.网络配置子窗口模块二、设计步骤1.界面原型设计2.控件添加信号槽3.源码解析3.1.网络链接核心代码3.2.网络设置子界面3.3.小车控制核心代码总结前言 最近入手了两块小熊派开发板,借智能小车案例,进行鸿蒙设备学…

01背包问题c++

问题 问题介绍 有 N 种物品和一个容量是 V 的背包&#xff0c;每种物品都有无限件可用。 第 i 种物品的体积是 vi&#xff0c;价值是 wi。 求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。 输出最大价值。 输入格式 第…

基于Transformer的交通预测模型部分汇总【附源代码】

交通预测一直是一个重要的问题&#xff0c;它涉及到交通运输系统的可靠性和效率。随着人工智能的发展&#xff0c;越来越多的研究者开始使用深度学习模型来解决这个问题。其中&#xff0c;基于Transformer的交通预测模型在近年来备受关注&#xff0c;因为它们具有优秀的建模能力…

设计模式之桥接模式(C++)

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 一、桥接模式是什么&#xff1f; 桥接模式是一种结构型的软件设计模式&#xff0c;将抽象部分与实现部分分离&#xff0c;使他们可…

像ChatGPT玩转Excel数据

1.引言 最近ChatGPT的出现&#xff0c;把人工智能又带起了一波浪潮。机器人能否替代人类又成了最近热门的话题。 今天我们推荐的一个玩法和ChatGPT有点不一样。我们的课题是“让用户可以使用自然语言从Excel查询到自己想要的数据”。 要让自然语言可以从Excel中查数据&#…

通过百度文心一言大模型作画尝鲜,感受国产ChatGPT的“狂飙”

3月16日下午&#xff0c;百度于北京总部召开新闻发布会&#xff0c;主题围绕新一代大语言模型、生成式AI产品文心一言。百度创始人、董事长兼首席执行官李彦宏&#xff0c;百度首席技术官王海峰出席&#xff0c;并展示了文心一言在文学创作、商业文案创作、数理推算、中文理解、…

用Qt画一个温度计

示例1 以下是用Qt绘制一个简单的温度计的示例代码&#xff1a; #include <QPainter> #include <QWidget> #include <QApplication> class Thermometer : public QWidget { public:Thermometer(QWidget *parent 0); protected:void paintEvent(QPaintEvent …

【Hive】配置

目录 Hive参数配置方式 参数的配置方式 1. 文件配置 2. 命令行参数配置 3. 参数声明配置 配置源数据库 配置元数据到MySQL 查看MySQL中的元数据 Hive服务部署 hiveserver2服务 介绍 部署 启动 远程连接 1. 使用命令行客户端beeline进行远程访问 metastore服务 …

LC-146.LRU 缓存

题解&#xff1a;https://leetcode.cn/problems/lru-cache/solution/lru-ce-lue-xiang-jie-he-shi-xian-by-labuladong/ 文章目录[146. LRU 缓存](https://leetcode.cn/problems/lru-cache/)思路从0开始实现使用LinkedHashMap实现拓展&#xff1a;[460. LFU 缓存](https://leet…

【2024考研】计算机考研,4轮复习时间安排

文章目录&#x1f3a8;第1轮复习&#xff08;暑假前&系统课&#xff09;英语1/2数学1/2专业课408&#x1f3a8;第2轮复习&#xff08;开学前&真题&#xff09;英语1/2试卷数学1/2试卷专业课408试卷&#x1f3a8;第3轮复习&#xff08;报名前&政治&#xff09;政治试…

什么是数据治理,如何保障数据质量?_光点科技

随着信息化和数据化的发展&#xff0c;数据已经成为企业最为重要的资产之一。数据治理作为一种管理和保障数据质量的方法&#xff0c;越来越受到企业的重视。什么是数据治理&#xff1f;数据治理是一种管理和保障数据质量的方法。数据治理的主要目的是确保数据的可靠性、准确性…

Android APP隐私合规检测工具Camille使用

目录一、简介二、环境准备常用使用方法一、简介 现如今APP隐私合规十分重要&#xff0c;各监管部门不断开展APP专项治理工作及核查通报&#xff0c;不合规的APP通知整改或直接下架。camille可以hook住Android敏感接口&#xff0c;检测是否第三方SDK调用。根据隐私合规的场景&a…

二、数据结构-线性表

目录 &#x1f33b;&#x1f33b;一、线性表概述1.1 线性表的基本概念1.2 线性表的顺序存储1.2.1 线性表的基本运算在顺序表上的实现1.2.2 顺序表实现算法的分析1.2.3 单链表类型的定义1.2.4 线性表的基本运算在单链表上的实现1.3 其他运算在单链表上的实现1.3.1 建表1.3.2 删除…

Adam优化器算法详解及代码实现

文章目录学习率调整与梯度估计修正RMSprop 算法动量法Adam学习率调整与梯度估计修正 在介绍Adam算法之前&#xff0c;先谈谈Adam中两个关键的算法&#xff1a;学习率调整&#xff08;RMSprop 算法&#xff09;与梯度估计修正。 RMSprop 算法 学习率是神经网络优化时的重要超…

计算机组成原理(3)-哈工大

概述存储器分类按存储介质分类第一个是易失的&#xff0c;后面三个是非易失的按存取方式分类按在计算机中的作用分类RAM可读可写 ROM只读存储器的层次结构存储器的三个主要特性的关系缓存-主存层次和主存-辅存层次时间局部性就是cpu访问了一个数据&#xff0c;在不久的将来可能…

python学习——【第六弹】

前言 上一篇文章 python学习——【第五弹】中我们了解了python中的不可变序列元组&#xff0c;这篇文章接着介绍可变序列 字典。 字典 字典的实现原理&#xff1a; 字典&#xff0c;顾名思义其实现原理和字典类似&#xff0c;字典中的元素都是key—value&#xff0c;以键值对…

操作系统学习笔记 ---- 网络系统

1 DMA技术 直接内存访问&#xff08;Direct Memory Access&#xff09; 技术。 在进行 I/O 设备和内存的数据传输的时候&#xff0c;数据搬运的工作全部交给 DMA 控制器&#xff0c;而 CPU 不再参与任何与数据搬运相关的事情&#xff0c;这样 CPU 就可以去处理别的事务。 DM…