基于埋点日志数据的网络流量统计 - PV、UV

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

一、 网站总流量数统计 - PV

  1. 需求分析

  2. 代码实现

   方式一

   方式二

   方式三:使用process算子实现

   方式四:使用process算子实现

二、网站独立访客数统计 - UV

  1. 需求分析

  2. 代码实现


一、 网站总流量数统计 - PV

  PV全称 Page View,也就是一个网站的页面浏览量。每当用户进入网站加载或者刷新某个页面时,就会给该网站带来PV量,它往往用来衡量一个网站的流量和用户活跃度。当然了,单个指标并不能全面的反映网站的实际情况,往往需要结合其他的指标进行分析。

1. 需求分析

  埋点采集到的数据格式大概是这个样子(文件已上传资源)

在这里插入图片描述第一个是userId、第二个是itemId、第三个是categoryId、第四个是behavior、第五个是timestamp

所以我们要统计PV的话要先从第四列中筛选出PV,然后再进行累加,求出最终的PV

2. 代码实现

方式一:
  先用 readTextFile()读取文件,然后将读取到的每行数据封装成一个bean对象,再通过 filter过滤出我们需要的PV数据,这时得到的都是封装好的一个个对象没法直接sum,所以通过 map将数据映射为一个个的元组类型(PV,1)然后,使用 keyBy()将他们分到同一个并行度中进行 sum,得出最终的结果。
public class Flink01_Project_PV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 将数据封装成 UserBehavior 对象
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] data = line.split(",");
                        return new UserBehavior(
                                Long.valueOf(data[0]),
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2]),
                                data[3],
                                Long.valueOf(data[4]));
                    }
                })
                // 过滤出行为为PV的数据
                .filter(new FilterFunction<UserBehavior>() {
                    @Override
                    public boolean filter(UserBehavior value) throws Exception {
                        return "pv".equals(value.getBehavior());
                    }
                })
                // 因为直接求和的话没法求,所以做一次映射,映射成 (PV,1) 这样的结构
                .map(new MapFunction<UserBehavior, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior value) throws Exception {
                        return Tuple2.of(value.getBehavior(),1L);
                    }
                })
                // 然后 将他们通过key进行分组,进入同一个并行度里面 进行求和
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 进行求和
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

方式二:

  这种方式省去了方式一的封装对象,其他的思路都一样。

public class Flink01_Project_PV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 直接过滤出我们想要的数据
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        String[] data = value.split(",");
                        return "pv".equals(data[3]);
                    }
                })
                // 然后将结构转换为元组类型 (PV,1)
                .map(new MapFunction<String, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] data = value.split(",");
                        return Tuple2.of(data[3],1L);
                    }
                })
                // 通过key分组
                .keyBy(new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                })
                // 求和
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

方式三:使用process算子实现

  首先使用readTextFile读取数据,使用map将读取到的数据封装为对象,然后使用keyBy进行分组,最后使用process算子进行求解

  • 为什么要使用keyBy():目的是让pv数据进入同一个并行度,如果不使用直接process的话,两个并行度里面都有一个sum,结果就不对了
  • 为什么不使用filter过滤呢?因为我们的过滤逻辑是再process里面完成的,所以不用再额外过滤
public class Flink02_Project_PV_process {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 封装成 UserBehavior 对象
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String line) throws Exception {
                        String[] data = line.split(",");
                        return new UserBehavior(
                                Long.valueOf(data[0]),
                                Long.valueOf(data[1]),
                                Integer.valueOf(data[2]),
                                data[3],
                                Long.valueOf(data[4]));
                    }
                })
                // 通过key分组
                .keyBy(new KeySelector<UserBehavior, String>() {
                    @Override
                    public String getKey(UserBehavior value) throws Exception {
                        return value.getBehavior();
                    }
                })
                // 使用proces算子实现 PV 的统计
                .process(new ProcessFunction<UserBehavior, String>() {
                    // 定义累加变量
                    long sum =0L ;
                    @Override
                    public void processElement(UserBehavior value, Context ctx, Collector<String> out) throws Exception {
                        // 判断用户行为是否是PV
                        if ("pv".equals(value.getBehavior())){
                            // 条件满足 sum+1
                            sum++;
                            // 将结果收集
                            out.collect("pv = "+sum);
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

方式四:使用process算子实现

  方式四相比于方式三省去了对象的封装,其他思路一样。

public class Flink02_Project_PV_process {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 通过key分组
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }
                })
                // 直接使用process求 PV
                .process(new ProcessFunction<String, String>() {
                    // 定义累加变量
                    long sum = 0L;
                    @Override
                    public void processElement(String line, Context ctx, Collector<String> out) throws Exception {
                    	// 将过来的每行数据切割
                        String[] datas = line.split(",");
                        // 判断是否是我们想要的数据
                        if("pv".equals(datas[3])){
                        	// 符合条件,将累加变量+1
                            sum++;
                            // 收集结果
                            out.collect("pv = "+sum);
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述


二、网站独立访客数统计 - UV

  UV全称 Unique Visitor,也就是独立访客数。在PV中,我们统计的是所有用户对所有页面的浏览行为,也就是同一个用户的浏览行为会被重复统计。实际上我们关注的是在某一特定范围内(一天、一周或者一个月)内访问该网站的用户数,也就是每个访客只计算一次。它能从侧面反映出该网站的受欢迎程度和用户规模的大小。

1. 需求分析

  要统计UV量的话,只需要对全量的PV,使用userId去重,然后就能得到独立访客数了。

2. 代码实现

  先filter过滤出PV数据,然后通过keyBy将PV分到同一组,然后使用process进行处理,处理方法是:用set集合存放userId,如果下一个userId可以加入该集合说明是一个新的独立访客,则收集当前集合的大小,若加入失败,说明集合中已经存在该userId,也就是不是一个新的独立访客,也就不做处理了。

public class Flink03_Project_UV {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .readTextFile("input/UserBehavior.csv")
                // 过滤出 PV 数据
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return "pv".equals(value.split(",")[3]);
                    }
                })
                // 将PV的数据分到同一个组里面
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value.split(",")[3];
                    }
                })
                // 对同一组里面的数据进行处理
                .process(new ProcessFunction<String, String>() {
                    // 存放 userId 的容器,回自动对数据进行去重,最后直接拿它的大小就知道UV了
                    Set<Long> userIdSet = new HashSet<>();
                    @Override
                    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                        Long userId = Long.valueOf(value.split(",")[0]);
                        // 向set中添加userId,判断是否添加成功
                        if (userIdSet.add(userId)) {
                            // 添加成功的话,说明是一个新的独立访客,收集到此时容器大小
                            out.collect("UV = "+ userIdSet.size());
                        }
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/56853.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【雕爷学编程】MicroPython动手做(28)——物联网之Yeelight 5

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

安防视频综合管理合平台EasyCVR可支持的视频播放协议有哪些?

EasyDarwin开源流媒体视频EasyCVR安防监控平台可提供视频监控直播、云端录像、云存储、录像检索与回看、智能告警、平台级联、云台控制、语音对讲、智能分析等能力。 视频监控综合管理平台EasyCVR具备视频融合能力&#xff0c;平台基于云边端一体化架构&#xff0c;具有强大的…

2023 7.31~8.6 周报 (多尺度的DL-FWI + 自然图像的风格迁移速度模型)

->目录<- 0 上周回顾1 本周论文背景简述2 模型架构3 风格化速度模型4 训练与实际数据的测试5 存在的一些问题6 总结和下一步工作 0 上周回顾 上周完成了VelocityGAN的重现和学习. 认识到了利用判别器网络对于常规网络进行约束是很一种很高效的设计思路. 1 本周论文背景…

【物理】带电粒子在磁场和电场中移动的 3D 轨迹研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

innovus: 让ndr使用自定义via def

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 让ndr 使用指定via def可以用add_ndr -via命令&#xff0c;如果现有的via list无法满足要求&#xff0c;可以用 add_via_definition -via_rule -row_col去创建。

天工开物 #7 Rust 与 Java 程序的异步接口互操作

许多语言的高性能程序库都是建立在 C/C 的核心实现上的。 例如&#xff0c;著名 Python 科学计算库 Pandas 和 Numpy 的核心是 C 实现的&#xff0c;RocksDB 的 Java 接口是对底层 C 接口的封装。 Rust 语言的基本目标之一就是替代 C 在这些领域的位置&#xff0c;为开发者提供…

小程序如何上传商品图片

了解如何在小程序商城中上传商品图片是非常重要的&#xff0c;因为商品图片的质量和展示效果直接影响到用户对商品的购买决策。下面&#xff0c;我将介绍怎么在小程序上传产品图片的方法和注意事项。 1. 图片准备&#xff1a;在上传商品图片之前&#xff0c;首先要准备好商品图…

CTFSHOW php 特性

web89 数组绕过正则 include("flag.php"); highlight_file(__FILE__);if(isset($_GET[num])){$num $_GET[num]; get numif(preg_match("/[0-9]/", $num)){ 是数字 就输出 nodie("no no no!");}if(intval($num)){ 如果是存在整数 输出 flagecho …

代码调试2:coco数据集生成深度图

代码调试:coco数据集生成深度图 作者:安静到无声 个人主页 问题1:图片存在异常,跳过不处理 在获取深度图的时候,直接执代码,会产生以下错误:RuntimeError和ValueError。 因此我重新修改了代码,如果出现以下两种错误,则执行下一次循环,代码如下: 修改之后代码可以…

二叉树的前,中,后序的非递归实现(c++)

前言 对于二叉树来说&#xff0c;遍历它有多种方式&#xff0c;其中递归遍历是比较简单的&#xff0c;但是非递归的实现就有一定的难度&#xff0c;在这里介绍一种非递归实现二叉树遍历的方式。 1.前序遍历 1.1思路 其实对于二叉树的非递归实现&#xff0c;实际上就是用代码来…

【JAVA】 javaSE中的数组|数组的概念使用

数组的概念 什么是Java中的数组 数组&#xff1a;可以看成是相同类型元素的一个集合。在内存中是一段连续的空间。在java中&#xff0c;包含6个整形类型元素的数组&#xff0c;可以看做是酒店中连续的6个房间. 1. 数组中存放的元素其类型相同 2. 数组的空间是连在一起的 3…

哪些情况下需要使用爬虫IP

不知道小伙伴们有没有遇到过这种场景&#xff1a;上网闲逛&#xff0c;看一些搞笑的视频或者想下载一些酷炫的文件&#xff0c;正点击呢&#xff0c;结果却发现被网站限制了&#xff0c;无法访问或者下载&#xff1f; 别急&#xff0c;今天我来告诉大家&#xff0c;如何借助使…

【JavaEE初阶】博客系统后端

文章目录 一. 创建项目 引入依赖二. 设计数据库三. 编写数据库代码四. 创建实体类五. 封装数据库的增删查改六. 具体功能书写1. 博客列表页2. 博客详情页3. 博客登录页4. 检测登录状态5. 实现显示用户信息的功能6. 退出登录状态7. 发布博客 一. 创建项目 引入依赖 创建blog_sy…

yolov3-spp 训练结果分析:网络结果可解释性、漏检误检分析

1. valid漏检误检分析 ①为了探查第二层反向找出来的目标特征在最后一层detector上的意义&#xff01;——为什么最后依然可以框出来目标&#xff0c;且mAP还不错的&#xff1f; ②如何进一步提升和改进这个数据的效果&#xff1f;可以有哪些优化数据和改进的地方&#xff1f;让…

页面技术基础-html

页面技术基础-html 环境准备&#xff1a;在JDBC中项目上完成代码定义 1. 新建一个 Module:filr->右键 -》Module -》Java-》next->名字(html_day1)->finish 2. 在 Moudle上右键-》第二个选项&#xff1a;add framework .. -> 选择JavaEE下第一个选项 Web Apllicat…

计及需求响应和电能交互的多主体综合能源系统主从博弈优化调度策略(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

通向架构师的道路之apache_tomcat_https应用

一、总结前一天的学习 通过上一章我们知道、了解并掌握了Web Server结合App Server是怎么样的一种架构&#xff0c;并且亲手通过Apache的Http Server与Tomcat6进行了整合的实验。 这样的架构的好处在于&#xff1a; 减轻App Server端的压力&#xff0c;用Web Server来分压…

python——案例8:设定列表:listl=[0,1,2,3,4,5],求列表之和

案例8&#xff1a;设定列表&#xff1a;listl[0,1,2,3,4,5],求列表之和total0 list1[0,1,2,3,4,5] #列表lis1for ele in range(0,len(list1)):totaltotallist1[ele] print("列表中元素之和&#xff1a;",total) #输出结果

13 springboot项目——准备数据和dao类

13.1 静态资源下载 https://download.csdn.net/download/no996yes885/88151513 13.2 静态资源位置 css样式文件放在static的css目录下&#xff1b;static的img下放图片&#xff1b;template目录下放其余的html文件。 13.3 创建两个实体类 导入依赖&#xff1a;lombok <!…

1400*C. Computer Game

Example input 6 15 5 3 2 15 5 4 3 15 5 2 1 15 5 5 1 16 7 5 2 20 5 7 3 output 4 -1 5 2 0 1 解析&#xff1a; k个电&#xff0c; 第一种为 k>a 时&#xff0c;只玩游戏 k-a; 第二种&#xff0c;k>b,一边玩一边充电 k-b 问完成n轮游戏的情况下&#xff0c;优先第…
最新文章