Spark对正常日志文件清洗并分析

目录

日志文件准备:

一.日志数据清洗: 

第一步:数据清洗需求分析:

二.代码实现 

2.1 代码和其详解

2.2创建jdbcUtils来连接Mysql数据库

2.3 运行后结果展示:

三、留存用户分析 

3.1需求概览

3.2.代码实现

3.3 运行后结果展示: 

四、活跃用户分析 

4.1需求概览

4.2代码实现


日志文件准备:

链接:https://pan.baidu.com/s/1dWjIGMttVJALhniJyS6R4A?pwd=h4ec 
提取码:h4ec 
--来自百度网盘超级会员V5的分享

一.日志数据清洗: 

第一步:数据清洗需求分析:

1.读入日志文件并转化为Row类型

  • 按照Tab切割数据
  • 过滤掉字段数量少于8个的

2.对数据进行清洗

  • 按照第一列和第二列对数据进行去重
  • 过滤掉状态码非200
  • 过滤掉event_time为空的数据
  • 将url按照”&”以及”=”切割

3.保存数据

  • 将数据写入mysql表中
  • 将其分成多个字段

二.代码实现 

2.1 代码和其详解

 def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("detlDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._

    //TODO 加载日志文件数据,按照\t分组,过滤出长度小于8的数据,将数据封装到 Row对象中,创建DF

    //创建Row对象
    val rowRdd: RDD[Row] = sc.textFile("in/test.log")
      .map(x => x.split("\t"))
      .filter(x => x.length >= 8)
      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))
    
    //创建Schema
    val schema: StructType = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("url", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType)
      )
    )
    //创建DataFrame
    val logDF: DataFrame = spark.createDataFrame(rowRdd, schema)
    logDF.printSchema()
    logDF.show(3)

    //TODO 删除重复数据,过滤掉状态码非200

    val filterLogs: Dataset[Row] = logDF.dropDuplicates("event_time", "url")
      .filter(x => x(3) == "200")
      .filter(x => StringUtils.isNotEmpty(x(0).toString))
    //单独处理url,并转为Row对象
    val full_logs_rdd: RDD[Row] = filterLogs.map(
      line => {
        val str: String = line.getAs[String]("url")
        val paramsArray: Array[String] = str.split("\\?")
        var paramsMap: Map[String, String] = null
        if (paramsArray.length == 2) {
          val tuples: Array[(String, String)] = paramsArray(1).split("&")
            .map(x => x.split("="))
            .filter(x => x.length == 2)
            .map(x => (x(0), x(1)))
          paramsMap = tuples.toMap
        }
        (
          line.getAs[String]("event_time"),
          paramsMap.getOrElse[String]("userUID", ""),
          paramsMap.getOrElse[String]("userSID", ""),
          paramsMap.getOrElse[String]("actionBegin", ""),
          paramsMap.getOrElse[String]("actionEnd", ""),
          paramsMap.getOrElse[String]("actionType", ""),
          paramsMap.getOrElse[String]("actionName", ""),
          paramsMap.getOrElse[String]("actionValue", ""),
          paramsMap.getOrElse[String]("actionTest", ""),
          paramsMap.getOrElse[String]("ifEquipment", ""),
          line.getAs[String]("method"),
          line.getAs[String]("status"),
          line.getAs[String]("sip"),
          line.getAs[String]("user_uip"),
          line.getAs[String]("action_prepend"),
          line.getAs[String]("action_client")
        )
      }
    ).toDF().rdd

    //   frame.withColumnRenamed("_1","event_time").printSchema()
    
    //再次创建Schema
    val full_logs_schema: StructType = StructType(
      Array(
        StructField("event_time", StringType),
        StructField("userUID", StringType),
        StructField("userSID", StringType),
        StructField("actionBegin", StringType),
        StructField("actionEnd", StringType),
        StructField("actionType", StringType),
        StructField("actionName", StringType),
        StructField("actionValue", StringType),
        StructField("actionTest", StringType),
        StructField("ifEquipment", StringType),
        StructField("method", StringType),
        StructField("status", StringType),
        StructField("sip", StringType),
        StructField("user_uip", StringType),
        StructField("action_prepend", StringType),
        StructField("action_client", StringType),
      )
    )
    //再次创建DataFrame
    val full_logDF: DataFrame = spark.createDataFrame(full_logs_rdd, full_logs_schema)
    full_logDF.printSchema()
    full_logDF.show(2, true)

    //    filterLogs.write
    //    jdbcUtils.dataFrameToMysql(filterLogs, jdbcUtils.table_access_logs, 1)

    jdbcUtils.dataFrameToMysql( full_logDF, jdbcUtils.table_full_access_logs, 1)

    spark.close()
  }

2.2创建jdbcUtils来连接Mysql数据库

object jdbcUtils {
  val url = "jdbc:mysql://192.168.61.141:3306/jsondemo?createDatabaseIfNotExist=true"
  val driver = "com.mysql.cj.jdbc.Driver"
  val user = "root"
  val password = "root"

  val table_access_logs: String = "access_logs"
  val table_full_access_logs: String = "full_access_logs"
  val table_day_active:String="table_day_active"
  val table_retention:String="retention"

  val table_loading_json="loading_json"
  val table_ad_json="ad_json"
  val table_notification_json="notification_json"
  val table_active_background_json="active_background_json"
  val table_comment_json="comment_json"
  val table_praise_json="praise_json"

  val table_teacher_json="teacher_json"

  val properties = new Properties()
  properties.setProperty("user", jdbcUtils.user)
  properties.setProperty("password", jdbcUtils.password)
  properties.setProperty("driver", jdbcUtils.driver)

  def dataFrameToMysql(df: DataFrame, table: String, op: Int = 1): Unit = {
    if (op == 0) {
      df.write.mode(SaveMode.Append).jdbc(jdbcUtils.url, table, properties)
    } else {
      df.write.mode(SaveMode.Overwrite).jdbc(jdbcUtils.url, table, properties)
    }
  }

  def getDataFtameByTableName(spark:SparkSession,table:String):DataFrame={
    val frame: DataFrame = spark.read.jdbc(jdbcUtils.url, table, jdbcUtils.properties)
    frame
  }

}

2.3 运行后结果展示:

初次清理后的日志数据

 清理完url的数据

三、留存用户分析 

3.1需求概览

1.计算用户的次日留存率

  • 求当天新增用户总数n
  • 求当天新增的用户ID与次日登录的用户ID的交集,得出新增用户次日登录总数m (次日留存数)
  • m/n*100%

2.计算用户的次周留存率

3.2.代码实现

object Retention {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("retentionDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val logs: DataFrame = jdbcUtils.getDataFtameByTableName(spark, jdbcUtils.table_full_access_logs)
    logs.printSchema()
    logs.show(3, false)

    //    logs.createOrReplaceTempView("logs"):
    //    过滤出所有的事件为Registered的日志,并且修改事件时间(event_time)为注册时间(registered——time)
    //    找出注册用户id和注册时间
    val registered: DataFrame = logs.filter('actionName === "Registered")
      .withColumnRenamed("event_time", "register_time")
      .select("userUID", "register_time")
    registered.printSchema()
    registered.show(3, false)

    //    找出ActionName为Signin的日志数据
    val signin: DataFrame = logs.filter('actionName === "Signin")
      .withColumnRenamed("event_time", "signin_time")
      .select("userUID", "signin_time")
    signin.printSchema()
    signin.show(3, false)

    //    两个DF关联(几种写法和可能出现的问题)
    val joined: DataFrame = registered.join(signin, Seq("userUID"), "left")
    //    registered.join(signin,$"userUID","left")   显示是模棱两可的得先使两表userUID相等
    //    val joined2: DataFrame = registered.as("r1").join(signin.as("s1"), $"r1.userUID" === $"s1.userUID", "left")
    //    joined2.printSchema()
    //    joined2.show(3,false)  //会显示相同的id,在后续的操作中会有两个userUID,再次使用很难使用
    //    joined.printSchema()
    //    joined.show(3, false)

    val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
    val mydefDataformat: UserDefinedFunction = spark.udf.register("mydefDataformat", (event_time: String) => {
      if (StringUtils.isEmpty(event_time))
        0
      else
        dateFormat.parse(event_time).getTime
    })

    val joinedFrame: DataFrame = joined.withColumn("register_date", mydefDataformat($"register_time"))
      .withColumn("signin_date", mydefDataformat($"signin_time"))
    //      .drop("")
    joinedFrame.printSchema()
    joinedFrame.show(3, false)

    //    求出前一天注册,当天登录的用户数量,过滤注册时间加上86400000查询第二天登录的用户,filter操作==需要变成===
    val signinNumDF: DataFrame = joinedFrame.filter('register_date + 86400000 === 'signin_date)
      .groupBy($"register_date")
      .agg(countDistinct('userUID).as("signNum"))
    signinNumDF.printSchema()
    signinNumDF.show(3, false)

    //    求出当前注册用户的数量
    val registerNumDF: DataFrame = joinedFrame.groupBy('register_date)
      .agg(countDistinct("userUID").as("registerNum"))
    registerNumDF.printSchema()
    registerNumDF.show(3, false)

    //    求出留存率
    val joinRegisAndSigninDF: DataFrame = signinNumDF.join(registerNumDF, Seq("register_date"))

    joinRegisAndSigninDF.printSchema()
    joinRegisAndSigninDF.show(3, false)

    val resultRetention: DataFrame = joinRegisAndSigninDF.select('register_date, ('signNum / 'registerNum).as("percent"))
    resultRetention.show()

    jdbcUtils.dataFrameToMysql(resultRetention,jdbcUtils.table_retention,1)

    spark.close()
  }

}

3.3 运行后结果展示: 

 

四、活跃用户分析 

4.1需求概览

  1. 读取数据库,统计每天的活跃用户数
  2. 统计规则:有看课和买课行为的用户才属于活跃用户
  3. 对UID进行去重

4.2代码实现

object Active {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("activeDemo").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._

//  读取清洗后的日志数据并过滤出活跃用户
    val logs: DataFrame = jdbcUtils.getDataFtameByTableName(spark, jdbcUtils.table_full_access_logs)

    val ds: Dataset[Row] = logs.filter($"actionName" === "BuyCourse" || $"actionName" === "StartLearn")
    ds.printSchema()
    ds.show(3,false)
//  修改DataSet=>二元组
    val ds2: Dataset[(String, String)] = ds.map(x =>
      (
        x.getAs[String]("userSID"),
        x.getAs[String]("event_time").substring(0, 10)
      )
    )
    ds2.show()

//    按天进行聚合,求出活跃用户数并去重
    val frame: DataFrame = ds2.withColumnRenamed("_2", "date")
      .withColumnRenamed("_1", "userid")
      .groupBy($"date")
      .agg(countDistinct("userid").as("activeNum"))

    frame.printSchema()
    frame.show(3,false)
//   JdbcUtils中新增活跃用户变量
    jdbcUtils.dataFrameToMysql(frame,jdbcUtils.table_day_active,1)
    println("操作结束")
    spark.close()
  }

}

4.3 运行后结果展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/10194.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

T 级数据量迁移!知名云巨头如何从 Jira 切换至 ONES?

2021 年,Atlassian 旗下 Jira&Confluence 等系列产品 Server 版(本地私有化部署版)全面停售,并将在 2024 年停止维护,Server 版客户必须迁移至 Cloud(云)或 Data Center(数据中心…

全网最详细,Jmeter性能测试-性能基础详解,控制器不同选择(四)

目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 逻辑控制器 提前说…

一篇文章让你搞懂TypeScript中的??和?:和?.和!.是什么意思

TypeScript中的??和?:和?.和!.是什么意思?知识回调(不懂就看这儿!)场景复现核心干货???:?.!.知识回调(不懂就看这儿!) 知识专栏专栏链接TypeScript知识专栏https://blog.csdn.net/xsl_…

Scrapy爬虫框架(概念)

Scrapy 入门教程 | 菜鸟教程 (runoob.com) Scrapy是一个快速功能强大的网络爬虫框架 Scrapy的安装 通过 pip 安装 Scrapy 框架: pip install Scrapy 安装后小测: 执行 scrapy ‐h Scrapy不是一个函数功能库,而是一个爬虫框架。 Scrapy架构图(绿线是…

CentOS7 虚拟机 双网卡绑定

一、网卡绑定模式 模式类型特点mode0round-robin(平衡轮询策略)基于per packet方式,轮询往每条链路发送报文。提供负载均衡和容错的能力,当有链路出问题,会把流量切换到正常的链路上。交换机端需要配置聚合口。mode1a…

【论文笔记】CRN: Camera Radar Net for Accurate, Robust, Efficient 3D Perception

原文链接:https://arxiv.org/abs/2304.00670 1. 引言 本文提出两阶段融合方法CRN,能使用相机和雷达生成语义丰富且位置精确的BEV特征。具体来说,首先将图像透视特征转换到BEV下,该步骤依赖雷达,称为雷达辅助的视图变换…

C#基础复习

语句 目录 语句 switch: 跳转语句 标签语句 标签: 标签语句的作用域 goto语句 using 语句 资源的包装使用 using 语句示例: 多个资源和嵌套 语句是描述某个类型或让程序执行某个动作的源代码指令 块在语法上算作一个单条嵌入语句。任何语…

电脑无法正常关机?点了关机又会自动重启

“真木马”相信不少朋友遇到过电脑关机自动重启现象,一点关机,但随后电脑有会进入重启状态,就是一直不会停,属实是很难崩。 目录 一、问题症状 二、问题原因 三、解决方案 方法一: 1.关闭系统发生错误时电脑自动…

企业数字化转型全是坑?这几篇数字化转型成功案例,减少70%损失

这篇给大家整理了200企业数字化转型案例合集,涵盖了制造、建筑、教育、零售、互联网等10行业的大中小型企业数字化转型思路,希望对大家有所帮助。 案例全部整合在这篇文章中,点击即可查看>>数字化干货资料合集! 01 首先&…

C++编程法则365条一天一条(359)认识各种初始化术语

文章目录Default initialization默认初始化Copy initialization拷贝初始化Aggregate initialization聚合初始化Direct initialization直接初始化list_initialization列表初始化value_initialization值初始化参考: https://en.cppreference.com/w/cpp/language/copy_…

项目打包发布流程

---》》》项目打包发布 1.编译并构建项目 2.部署 npm i npm run build scp2:需要写代码 ---》》》 后续有空更新:赋几个链接: Jenkins官网 nullhttps://www.jenkins.io/zh/一文详解Jenkins的安装与配置Jenkins是一个基于Java开发的开源…

ERROR:org.apache.hadoop.hbase.PleaseHoldException: Master is initializing错误

一、问题 重新安装hbase后,在hbase shell中查看所有命名空间时,出现了ERROR:org.apache.hadoop.hbase.PleaseHoldException: Master is initializing错误。 二、方法 1、root用户下,关闭hbase stop-hbase.sh 2、执行以下命令删除HDFS下的hb…

深度学习环境配置超详细教程【Anaconda+PyTorch(GPU版)+CUDA+cuDNN】

深度学习环境配置 入门深度学习,首先要做的事情就是要搭建深度学习的环境。不管你是Windows用户,Mac用户还是Ubuntu用户,只要电脑配置允许,都可以做深度学习,毕竟Windows、Mac和Ubuntu系统都可以进行深度学习环境的搭…

惊呆了,2小时我就学会了Charles抓包的详细教程

目录 一、什么是Charles 二、下载Charles 三、设置Charles代理 四、配置设备代理 五、抓包操作 六、常见问题及解决方法 抓包不到某些应用程序 Charles抓包后网站出现异常 七、总结 一、什么是Charles Charles是一个跨平台的HTTP代理服务工具,可以用来查看…

软件测试工作主要做什么

随着信息技术的发展和普及,人们对软件的使用越来越普及。但是在软件的使用过程中,软件的效果却不尽如人意。为了确保软件的质量,整个软件业界已经逐渐意识到测试的重要性,也有越来越多的小伙伴加入了软件测试这个行业中来。软件测…

从FPGA说起的深度学习(六)-任务并行性

这是新的系列教程,在本教程中,我们将介绍使用 FPGA 实现深度学习的技术,深度学习是近年来人工智能领域的热门话题。在本教程中,旨在加深对深度学习和 FPGA 的理解。用 C/C 编写深度学习推理代码高级综合 (HLS) 将 C/C 代码转换为硬…

ServletAPI详解(四)-HttpServletResponse

我们来看第三个方法,HttpServletResponse 在servlet运行原理中提到,servlet代码中的doXXX方法的目的就是根据请求计算响应,然后将响应数据设置到HttpServletResponse对象中,然后 Tomcat 就会把这个 HttpServletResponse 对象按照 HTTP 协议的格式, 转成一个字符串, 并通过 Soc…

Linux Shell 实现一键部署二进制Rabbitmq

rabbitmq 前言 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代…

MPC的560x系列的运行模式的介绍

一、模式简介 1、运行模式 一共11种模式,分别为RESET、DRUN、SAFE、TEST、RUN0、RUN1、RUN2、RUN3、HALT、STOP、STANDBY。其中RESET、DRUN、SAFE、TEST是系统工作模式,用户不用个特别关系,而后面几种是用于经常使用到的工作模式。 RESET&a…

Linux搭建docker

1. 查看系统的内核版本 [rootwide ~]# uname -r 3.10.0-1160.el7.x86_642. 将yum更新到最新版本 [rootwide ~]# yum upate -y Complete!3. 安装Docker所需的依赖包 [rootwide ~]# sudo yum install -y yum-utils device-mapper-persistent-data lvm2 Loaded plugins: fastes…
最新文章