Spark - 继承 FileOutputFormat 实现向 HDFS 地址追加文件

目录

一.引言

二.源码浅析

1.RDD.saveAsTextFile

2.TextOutputFormat 

3.FileOutputFormat

三.源码修改

1.修改文件生成逻辑 - getRecordWriter

2.允许目录存在 - checkoutputSpecs

3.全部代码 - TextOutputFormatV2

四.追加存储代码实战

五.总结


一.引言

Output directory file XXX already exists 目标目录已经存在,这个报错写 Spark 的同学都不会陌生,它不允许我们在同一个目录持续增加文件存储。在使用 Flink 文件流场景中,我们有向 HDFS 目录追加文件的需求,所以下面我们尝试继承 FileOutputFormat 实现自定义文件追加。

二.源码浅析

自定义实现之前,我们需要明确一下追加文件的两个主要问题:

- 允许目录存在

即避免 Output directory file XXX already exists 的报错

- 避免 File 重复

由于是追加文件,这里我们要避免文件名相同导致追加失败

1.RDD.saveAsTextFile

这个是我们日常使用的文本落地 API:

这里将原始的 RDD[String] 转化为 (NullWritable.get(), text) 的 pairRDD 并调用后续的 saveAsHadoopFile 并传入 TextOutputFormat:

- NullWritable

在Hadoop1.+中是 "Comparable",因此编译器无法找到隐式为其排序,并将使用默认的 "null"。然而,它是一个 `Comparable[NullWritable]` 在 Hadoop2.+ 中,编译器将调用隐式  "Ordering.ordered” 方法来创建为 "NullWritable" 排序。这就是为什么编译器会生成不同的匿名Hadoop1.+和Hadoop2.+中“saveAsTextFile”的类。因此,在这里我们提供了一个显式排序“null”,以确保编译器生成 "saveAsTextFile" 的字节码相同。

这里翻译自官方 API,简言之,我们后续自定义 OutputFormat 时需要将 RDD[String] 转换为 PairRDD 并将 key 置为 NullWritable,否则这里无法调用 saveAsHadoopFile:

2.TextOutputFormat 

前者继承了后者,我们先看下前者复写了哪些函数:

非常简洁,大部分方法都使用父类的实现,这里 getRecordWriter 的具体实现对应我们上面提到的第二个问题即避免文件重复,因为其负责根据 name 轮训生成落地的 Path 地址,我们修改这个函数即可避免追加时文件重复。

3.FileOutputFormat

TextOutputFormat Extends FileOutputFormat,上面 TextOutputFormat 只实现了 Path 相关的工作,所以需要继续到父类 FileOutputFormat 寻找抛出异常的语句:

在文件中搜索 already exists 即可定位到当前函数,是不是很熟悉,因此针对第一个问题避免文件的报错就要修改这里了,最简单的我们把这三行注释掉即可。

三.源码修改

经过上面的源码三部曲,我们如何修改 TextOutputFormat 思路也很清晰了,修改文件生成逻辑、取消抛出异常即可,下面看一下代码实现:

1.修改文件生成逻辑 - getRecordWriter

这里我们在源码中增加了 updateFileName 函数,该函数由用户自己定义输出文件名,常规的我们可以按照 part-00000、part-00001 的顺序继续存储下去,当然如果为了区分追加文件的添加时间与类型,我们也可以给其打上时间戳和自定义标记,都不想用的话也可以直接用 UUID 代替:

import java.util.UUID

UUID.randomUUID().toString

下面看一下 updateFileName 的实现:

这里初始化变量 fileName 为 "",更新时对其加锁并基于当前 name 进行判断,如果当前 fileName 为 "",则代表是第一次保存,因此默认使用 name 的 part-00000,后续再多次存储是,我们就可以获取 fileName 的后缀进行累加输出了,这里使用 DecimalFormat 实现了自动补 0 的操作。

  static String fileName = "";

  DecimalFormat decimalFormat = new DecimalFormat("00000");

  public void updateFileName(String name) {

    synchronized (fileName) {
      if (fileName.equals("")) {
        fileName = name;
      } else {
        fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);
      }
    }
  }

2.允许目录存在 - checkoutputSpecs

把 throw Exception 的异常去掉就好了,这里保留了 Println 提示目录已经存在并开始追加。

3.全部代码 - TextOutputFormatV2

换个 TextOutputFormatV2 实现我们追加文件存储的目的。

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.*;

/**
 * An {@link OutputFormat} that writes plain text files.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormatV2<K, V> extends FileOutputFormat<K, V> {

  static String fileName = "";

  DecimalFormat decimalFormat = new DecimalFormat("00000");

  public void updateFileName(String name) {

    synchronized (fileName) {

      if (fileName.equals("")) {
        fileName = name;
      } else {
        fileName = "part-" + decimalFormat.format(Integer.parseInt(fileName.split("-")[1]) + 1);
      }

    }

  }

  protected static class LineRecordWriter<K, V>
          implements RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    /**
     * Write the object to the byte stream, handling Text as a special
     * case.
     * @param o the object to print
     * @throws IOException if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }

    public synchronized void write(K key, V value)
            throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }

    public synchronized void close(Reporter reporter) throws IOException {
      out.close();
    }
  }

  @Override
  public void checkOutputSpecs(FileSystem ignored, JobConf job)
          throws FileAlreadyExistsException,
          InvalidJobConfException, IOException {
    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null && job.getNumReduceTasks() != 0) {
      throw new InvalidJobConfException("Output directory not set in JobConf.");
    }
    if (outDir != null) {
      FileSystem fs = outDir.getFileSystem(job);
      // normalize the output directory
      outDir = fs.makeQualified(outDir);
      setOutputPath(job, outDir);

      // get delegation token for the outDir's file system
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
              new Path[] {outDir}, job);

      // check its existence
      if (fs.exists(outDir)) {
        System.out.println("Output directory " + outDir + " already exists, Start Append!");
      }
    }
  }

  public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
                                            JobConf job,
                                            String name,
                                            Progressable progress)
          throws IOException {
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
            "\t");

    updateFileName(name);

    if (!isCompressed) {
      Path file = FileOutputFormat.getTaskOutputPath(job, fileName);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      Class<? extends CompressionCodec> codecClass =
              getOutputCompressorClass(job, GzipCodec.class);
      // create the named codec
      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
      // build the filename including the extension
      Path file =
              FileOutputFormat.getTaskOutputPath(job,
                      fileName + codec.getDefaultExtension());
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new com.CommonTool.TextOutputFormatV2.LineRecordWriter<K, V>(new DataOutputStream
              (codec.createOutputStream(fileOut)),
              keyValueSeparator);
    }
  }
}

四.追加存储代码实战

  def main(args: Array[String]): Unit = {

    val (argsList, argsMap) = ArgsParseUtil.parseArgs(args)

    val conf = (new SparkConf).setAppName("AppendFileToHdfs").setMaster("local[*]")

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    val sc = spark.sparkContext

    val output = argsMap.getOrElse("output", "./append_output")

    val data = sc.parallelize(0 to 1000).mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }

    (0 until 10).foreach(epoch => {
      // 存储一次无法继续存储
      // data.saveAsTextFile(output)
       data.saveAsHadoopFile(output, classOf[NullWritable], classOf[Text], classOf[TextOutputFormatV2[NullWritable, String]])
    })

  }

首先转化为 (NullWritable.get(), text) 的 pairRDD,随后调用 saveAsHadoopFile 方法并传入我们自定义的 TextOutputFormatV2 即可,由于我们 for 循环了 10 次,所以打印了 10 个相关日志。

下面再看下生成的文件:

每个文件存 250 个数字,每次存储 4 个 part,10 次 40 个且保持递增顺序。

五.总结

想要结合源码进行修改时,结合自己的需求,带着问题去找对应的函数再复写就 OK 了,由于这里逻辑比较简单所以我们也没踩太多坑。上面采用的是单次 Job 连续存储,所以 Format 里 FileName 能够达到累加的情况,如果是多个 Job 重复启动,则每次获取的都是 part-00000,这时如果还想要保持文件名递增的话可以使用 FileSystem.listStatus 遍历文件夹获取 modifyTime 最新的文件并取其 name 即可拿到最新的文件名 part,此时将参数传入 TextFormat 并修改 update 逻辑即可实现多 Job 重复启动且文件名递增的需求了。当然了,使用 UUID + Date 是最省事滴。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/2248.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

关于STM32用DMA传输UART空闲中断中接收的数据时无法接收数据问题以及解决办法

一、stm32 cube ide 配置 1、DMA串口接收数据的ide配置如下图所示 串口1相关的设置及printf函数的使用&#xff0c;这里没放&#xff0c;建议先实现串口打印功能 2、相关的知识点 普通模式和循环模式的区别在于&#xff0c;普通模式下&#xff0c;DMA只会接收一次数据&#x…

微前端(无界)

前言&#xff1a;微前端已经是一个非常成熟的领域了&#xff0c;但开发者不管采用哪个现有方案&#xff0c;在适配成本、样式隔离、运行性能、页面白屏、子应用通信、子应用保活、多应用激活、vite 框架支持、应用共享等用户核心诉求都或存在问题&#xff0c;或无法提供支持。本…

DS18B20温度传感器简介和1-Wire驱动程序

目录DS18B20简介DS18B20的两种供电方式64位ROM温度传感器1-Wire Bus简介DS18B20通信时序初始化ROM相关命令(后续包含任何数据交换的操作)功能相关命令(后续包含任何数据交换的操作)单个DS18B20读取温度值驱动多个DS18B20读取温度值驱动DS18B20简介 DS18B20数字温度计提供9位到…

学习系统编程No.7【进程替换】

引言&#xff1a; 北京时间&#xff1a;2023/3/21/7:17&#xff0c;这篇博客本来昨天晚上就能开始写的&#xff0c;但是由于笔试强训的原因&#xff0c;导致时间用在了做题上&#xff0c;通过快2个小时的垂死挣扎&#xff0c;我充分意识到了自己做题能力的缺陷和运用新知识的缺…

致远OA敏感信息泄露漏洞合集(含批量检测POC)

文章目录前言敏感信息泄露A6 status.jsp 信息泄露漏洞漏洞描述漏洞影响网络测绘漏洞复现POC 批量检测getSessionList.jsp Session泄漏漏洞漏洞描述网络测绘批量检测POC致远OA 帆软组件 ReportServer 目录遍历漏洞漏洞描述漏洞影响网络测绘POC(批量检测)A6 createMysql.jsp 数据…

Java stream性能比较

环境 Ubuntu 22.04IntelliJ IDEA 2022.1.3JDK 17CPU&#xff1a;8核 ➜ ~ cat /proc/cpuinfo | egrep -ie physical id|cpu cores physical id : 0 cpu cores : 1 physical id : 2 cpu cores : 1 physical id : 4 cpu cores : 1 physical id : 6 cpu cores : 1 physical id …

浏览器工作原理

一、JavaScript 的历史 JavaScript&#xff08;简称JS&#xff09;Web前端开发的脚本语言。 它诞生1995年&#xff0c;由网景公司的 Brendan Eich 开发。最初&#xff0c;JavaScript 被设计用于在网页上嵌入动态内容和交互式功能。 1996年&#xff0c;JavaScript 1.1 成为国…

C++虚函数与多态

C虚函数与多态虚函数抽象类纯虚函数虚析构函数多态虚函数的几个问题纯虚函数和ADT虚函数 virtual修饰的成员函数就是虚函数&#xff0c; 1.虚函数对类的内存影响&#xff1a;增加一个指针类型大小&#xff08;32位和64位&#xff09; 2.无论有多少个虚函数&#xff0c;只增加一…

【ansible】模块介绍超详解(下)

目录 六&#xff0c;软件包管理 1&#xff0c;yum_repository模块 &#xff08;1&#xff09;yum_repository模块常用选项 &#xff08;2&#xff09;yum_repository模块案例 2&#xff0c;mount模块 &#xff08;1&#xff09;mount模块选项 &#xff08;2&#xff09;mount模…

大数据简介

大数据概论和职业规划Linux服务器系统Hadoop概论HDFS分布式文件系统Hive数据仓库SparSQL指令Zepplin框架Sqoop框架Superset数据可视化大数据数仓实战-didi出行大数据概念大数据特点大数据应用场景大数据分析业务步骤大数据职业规划大数据学习路线。大数据概念数据&#xff1a;世…

基于YOLOv5的舰船检测与识别系统(Python+清新界面+数据集)

摘要&#xff1a;基于YOLOv5的舰船检测与识别系统用于识别包括渔船、游轮等多种海上船只类型&#xff0c;检测船舰目标并进行识别计数&#xff0c;以提供海洋船只的自动化监测和管理。本文详细介绍船舰类型识别系统&#xff0c;在介绍算法原理的同时&#xff0c;给出Python的实…

【系统开发】WebSocket + SpringBoot + Vue 搭建简易网页聊天室

文章目录一、数据库搭建二、后端搭建2.1 引入关键依赖2.2 WebSocket配置类2.3 配置跨域2.4 发送消息的控制类三、前端搭建3.1 自定义文件websocket.js3.2 main.js中全局引入websocket3.3 App.vue中声明websocket对象3.4 聊天室界面.vue3.5 最终效果一、数据库搭建 很简单的一个…

数据结构与算法——二叉树+带你实现表达式树(附源码)

&#x1f4d6;作者介绍&#xff1a;22级树莓人&#xff08;计算机专业&#xff09;&#xff0c;热爱编程&#xff1c;目前在c&#xff0b;&#xff0b;阶段&#xff0c;因为最近参加新星计划算法赛道(白佬)&#xff0c;所以加快了脚步&#xff0c;果然急迫感会增加动力>——…

ThreadLocal详解

一、什么是ThreadLocal 1、什么是ThreadLocal&为什么用ThreadLocal ThreadLocal&#xff0c;即线程本地变量&#xff0c;在类定义中的注释如此写This class provides thread-local variables。如果创建了一个ThreadLocal变量&#xff0c;那么访问这个变量的每个线程都会有…

C++基础算法④——排序算法(插入、桶附完整代码)

排序算法 1.插入排序 2.桶排序 1.插入排序 基本思想&#xff1a;将初始数据分为有序部分和无序部分&#xff1b;每一步将无序部分的第一个值插入到前面已经排好序的有序部分中&#xff0c;直到插完所有元素为止。步骤如下&#xff1a; 每次从无序部分中取出第一个值&#x…

图像分类卷积神经网络模型综述

图像分类卷积神经网络模型综述遇到问题 图像分类&#xff1a;核心任务是从给定的分类集合中给图像分配一个标签任务。 输入&#xff1a;图片 输出&#xff1a;类别。 数据集MNIST数据集 MNIST数据集是用来识别手写数字&#xff0c;由0~9共10类别组成。 从MNIST数据集的SD-1和…

在Clion开发工具上使用NDK编译可以在安卓上执行的程序

1. 前言 因为工作需要&#xff0c;我要将一份C语言代码编译成可执行文件传送到某安卓系统里执行。 众所周知&#xff0c;使用ndk编译代码有三种使用方式&#xff0c;分别是基于 Make 的 ndk-build、CMake以及独立工具链。以前进行ndk编程都是使用ndk-build进行的&#xff0c;新…

RocketMQ的基本概念、系统架构、单机安装与启动

RocketMQ的基本概念、系统架构、单机安装与启动 文章目录RocketMQ的基本概念、系统架构、单机安装与启动一、基本概念1、消息&#xff08;Message&#xff09;2、主题&#xff08;Topic&#xff09;3、标签&#xff08;Tag&#xff09;4、队列&#xff08;Queue&#xff09;5、…

C# 教你如何终止Task线程

我们在多线程中通常使用一个bool IsExit类似的代码来控制是否线程的运行与终止&#xff0c;其实使用CancellationTokenSource来进行控制更为好用&#xff0c;下面我们将介绍CancellationTokenSource相关用法。C# 使用 CancellationTokenSource 终止线程使用CancellationTokenSo…

【Leetcode】-有效的括号

作者&#xff1a;小树苗渴望变成参天大树 作者宣言&#xff1a;认真写好每一篇博客 作者gitee:gitee 如 果 你 喜 欢 作 者 的 文 章 &#xff0c;就 给 作 者 点 点 关 注 吧&#xff01; 文章目录前言前言 今天我们再来讲一期关于题目的博客&#xff0c;我挑选的是一道leet…
最新文章