Parquet文件推送数据到OSS

1. 任务背景

  • 任务说明:公司 saas 数据分析类产品,客户需要把行为数据回传到客户指定文件系统中(oss)
  • 周期:T+1
  • 数据格式:parquet
  • 数据范围:部分表全量,部分表增量
  • 其他要求:

需要历史数据,部分应用部分周期需要全量一个文件,部分历史需要每天一个文件,新的数据 T+1 。

每个文件上传成功后需要一个状态空文件 _SUCCESS 文件

2. 任务分析

a. 分析

  • 本数据平台 impala+kudu+hive 架构,impala-shll可导出 csv文件。

注:如果系统简单,要求简单,也可以选择数据存成hive表形式,直接文件可parquet文件。这里选择用 impala- shell 导出方式是客户有复杂的要求等等。

  • csv 转 parquet ,并且 parquet文件需要携带 schema 信息。
  • 脚本需要支持按某一时间段的每天处理。

b. 方案

(1)python 做脚本

(2)impala-shell 命令方式导出符合要求的csv文件

(3)pandas 和 pyarraow.parquet 结合操作csv

(4)发送标准文件到 OSS

欢迎关注,一起学习

3. 开发

(1)导出csv文件

impala-shell 
-i {imp_host} 
-d {imp_database}  
-q " {sql} " 
-B --output_delimiter="\\t" 
--print_header 
-o /data/xx.csv

(2) 环境执行

os.system(cmd)

(3) csv 转 parquet

import pyarrow as pa
import pyarrow.parquet as pq

csv_file = pd.read_csv(csvFile, delimiter='\t',low_memory=False)

user_schema = pa.schema([
    ('user_id', pa.int64()),
    ('name', pa.string())
])

table = pa.Table.from_pandas(csv_file,schema=user_schema)

pq.write_table(table, "/data/xxxx.parquet")

(4) 发送到OSS

import oss2

auth_endpoint = 'auth_endpoint'

access_key_id = 'access_key_id'

access_key_secret = 'auth_endpoint'

bucket_name = 'bucket_name'

auth = oss2.Auth(access_key_id, access_key_secret)

bucket = oss2.Bucket(auth,auth_endpoint, bucket_name)


with open(output_file, 'rb') as file_obj:
    # 上传具体数据
    put_object = bucket.put_object(oss_path, file_obj)
    if put_object and put_object.status == 200:
        # 上传_success文件夹
        put_success=bucket.put_object('{0}/_SUCCESS'.format(oss_success_path), '')
        if put_success and put_success.status == 200:
            print("oss data success file upload success")
        else:
            print("oss success data upload fail")
    else:
        print("oss success data upload fail")

(5)日期循环

import datetime

startTime = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y%m%d")
endTime = (datetime.datetime.now()).strftime("%Y%m%d")

if (is_today == 0):
    startTime = datetime.datetime.strptime('2025-05-09 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')
    endTime = datetime.datetime.strptime('2025-08-17 00:00:00', '%Y-%m-%d %H:%M:%S').strftime('%Y%m%d')

while startTime < endTime:
    
    print('逻辑处理')
    
	startTime = (datetime.datetime.strptime(startTime, '%Y%m%d') + datetime.timedelta(days=1)).strftime(
    '%Y%m%d')

4. 遇到的问题

(1) out of memory

Error tokenizing data. C error: out of memory

因为读取 CSV文件过大,内存放不下,设置 low_memory=False,机器内存小于文件大小,那是倒不出来的,除非分块读取

file_index = 0

for file in pd.read_csv(csvFile, 
                        delimiter='\t',
                        low_memory=False,
                        chunksize=20000):
    
    table = pa.Table.from_pandas(file, schema=schema)
    
	pq.write_table(table, '/data/xxxx_{1}.parquet'.format(file_index))

	file_index+=1

(2) 数据类型转换问题

pyarrow.lib.ArrowTypeError: 
('Expected a string or bytes dtype, got int64', 
 'Conversion failed for column xxxx with type int64')

当数据列出现 Null 的时候,转换 int64 转化不过去,就会报错,需要转为 str 同理一样,其他数据类型也会出现这种情况,当schema定义某列为int64,32,16 .... 的时候,如果出现Null,会自定识别为 float 类型。

csv_file = pd.read_csv(csvFile, delimiter='\t',low_memory=False)

csv_file['xxxx'] = csv_file['xxxx'].astype(str)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/355335.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32-LwESP 移植

LwESP 是一个专门解析 Espressif 公司旗下 ESP 系列芯片 AT 指令的开源库&#xff0c;具有以下特性&#xff1a; 支持 Espressif 公司 ESP32, ESP32-C2, ESP32-C3, ESP32-C6 和 ESP8266 芯片。独立平台&#xff0c;采用 C99 标准编写&#xff0c;易于移植。允许不同的配置来优…

【Linux】第三十九站:可重入函数、volatile、SIGCHLD信号

文章目录 一、可重入函数二、volatile三、SIGCHLD信号 一、可重入函数 如下图所示&#xff0c;当我们进行链表的头插的时候&#xff0c;我们刚刚执行完第一条语句的时候&#xff0c;突然收到一个信号&#xff0c;然后我们这个信号的自定义捕捉方法中&#xff0c;正好还有一个头…

Python模拟艾里光束:光可以不沿直线传播

文章目录 Airy光束有限能量Airy光束 Airy光束 在光学领域&#xff0c;傍轴近似下光束传输遵循方程 i ∂ ϕ ∂ z 1 z a ∂ 2 ϕ ∂ x 2 0 i\frac{\partial\phi}{\partial z}\frac{1}{z}\frac{a\partial^2\phi}{\partial x^2}0 i∂z∂ϕ​z1​∂x2a∂2ϕ​0 其中 k 2 π n …

【发展】不确定时代下的从容 —— 终局思维、长期主义与复利

文章目录 一、终局思维1、电影 《蝴蝶效应》2、未来是什么样的 二、长期主义1、这是一个不确定的时代2、做难但正确的事情 三、复利1、复利思维2、马太效应 一、终局思维 终局思维 在面对很多选择时&#xff0c;从终点出发考虑问题&#xff0c;来决定当下的选择。 1、电影 《蝴…

容器和虚拟机的对比

容器和虚拟机的对比 容器和虚拟机在与硬件和底层操作系统交互的方式上有所不同 虚拟化 使多个操作系统能够同时在一个硬件平台上运行。 使用虚拟机监控程序将硬件分为多个虚拟硬件系统&#xff0c;从而允许多个操作系统并行运行。 需要一个完整的操作系统环境来支持该应用。…

从零开始:CentOS系统下搭建DNS服务器的详细教程

前言 如果你希望在CentOS系统上建立自己的DNS服务器,那么这篇文章绝对是你不容错过的宝藏指南。我们提供了详尽的步骤和实用技巧,让你能够轻松完成搭建过程。从安装必要的软件到配置区域文件,我们都将一一为你呈现。无论你的身份是运维人员,还是程序员,抑或是对网络基础设…

GitLab16.8配置webhooks、Jenkins2.4配置GitLab插件实现持续集成、配置宝塔面板实现持续部署

看本篇文章的前提是已经部署完GItlab和Jenkins服务器&#xff0c;已经可以手动构建成功&#xff0c;并且经过了很多次实践&#xff0c;对这两款软件基本熟悉。 建议大家按以下顺序看 前端自动化&#xff08;其一&#xff09;部署gitlab https://blog.csdn.net/weixin_45062076…

DolphinScheduler + Amazon EMR Serverless 的集成实践

01 背景 Apache DolphinScheduler 是一个分布式的可视化 DAG 工作流任务调度开源系统&#xff0c;具有简单易用、高可靠、高扩展性、⽀持丰富的使用场景、提供多租户模式等特性。适用于企业级场景&#xff0c;提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方…

2024.1.24 C++QT 作业

思维导图 练习题 1.提示并输入一个字符串&#xff0c;统计该字符中大写、小写字母个数、数字个数、空格个数以及其他字符个数 #include <iostream> #include <string.h> #include <array> using namespace std;int main() {string str;cout << "…

《微信小程序开发从入门到实战》学习九十六

7.2 基础内容组件 7.2.4 progress组件 progress组件的示例代码如下&#xff1a; <progress percent"20" show-info /> 7.3 表单组件 表单组件是用于收集信息的组件。第三章介绍了许多表单组件&#xff0c;包括form、input、textarea、picker、switch、butt…

在WebSocket中使用Redis出现空指针异常解决方案

文章目录 在WebSocket中使用Redis1.问题描述2.原因3.解决步骤1.新建一个SpringUtil.java类&#xff0c;通过getBean的方法主动获取实例2.在WebSocketSingleServer.java中导入 在WebSocket中使用Redis 1.问题描述 在controller 和 service中都可以正常使用Redis&#xff0c;在…

【Javaweb程序设计】【C00161】基于SSM电子产品交易管理系统(论文+PPT)

基于SSM电子产品交易管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的电子产品交易系统 本系统分为前台用户和后台管理员2个功能模块. 前台用户模块&#xff1a;当游客打开系统的网址后&#xff0c;首先看到的就…

qemu + vscode图形化调试linux kernel

一、背景 使用命令行连接gdb 在调试时&#xff0c;虽然可以通过tui enable 显示源码&#xff0c;但还是存在设置断点麻烦&#xff08;需要对着源码设置&#xff09;&#xff0c;terminal显示代码不方便&#xff0c;不利于我们学习&#xff1b;另外在gdb 下p命令显示结构体内容…

猫用空气净化器哪款牌子好?好用能吸毛的宠物空气净化器推荐

作为一个养猫多年的铲屎官&#xff0c;我真的无法抗拒猫星人的可爱魅力&#xff01;以前&#xff0c;每当我路过宠物店&#xff0c;我总会忍不住停下来&#xff0c;在玻璃窗前停留半个小时以上。但是后来&#xff0c;我终于有了自己的猫咪。每天都能享受到给它摸小肚子的乐趣&a…

腾讯云幻兽帕鲁服务器创建教程,附4核16G服务器价格表

腾讯云0基础搭建帕鲁服务器4C16G14M服务器稳定无卡顿&#xff0c;先下载SteamCMD&#xff0c;并运行&#xff1b;然后下载Palserver&#xff0c;修改服务ini配置&#xff0c;启动PalServer&#xff0c;进入游戏服务器。腾讯云百科txybk.com分享腾讯云创建幻兽帕鲁服务器教程&am…

gdb 调试 - 在vscode图形化展示在远程的gdb debug过程

前言 本地机器的操作系统是windows&#xff0c;远程机器的操作系统是linux&#xff0c;开发在远程机器完成&#xff0c;本地只能通过ssh登录到远程。现在目的是要在本地进行图形化展示在远程的gdb debug过程。&#xff08;注意这并不是gdb remote &#xff01;&#xff01;&am…

产业需求大数据平台

产业需求大数据平台&#xff0c;依托大数据、NLP等技术&#xff0c;全面采集区域产业人才需求数据&#xff0c;从宏观、中观、微观三个层面对产业需求进行分析&#xff0c;并匹配学校自身的办学定位、专业布局、人才培养目标、培养规格和课程设置&#xff0c;进行专业设置匹配度…

【JavaScript基础入门】03 JavaScript 基础语法(一)

JavaScript 基础语法&#xff08;一&#xff09; 目录 JavaScript 基础语法&#xff08;一&#xff09;1. JS 初体验2. JavaScript注释2.1 单行注释2.2 多行注释 3. JavaScript结束符4. JavaScript输入输出语句 1. JS 初体验 JS 有3种书写位置&#xff0c;分别为内联、内部和外…

2024.1.28 GNSS 学习笔记

1.基于 地球自转改正卫地距 以及 伪距码偏差 重构定位方程&#xff1a; 先验残差计算公式如下所示&#xff1a; 2.观测值如何定权&#xff1f;权重如何确定&#xff1f; 每个卫星的轨钟精度以及电离层模型修正后的误差都有差异&#xff0c;所以我们不能简单的将各个观测值等权…

Hotspot源码解析-第25章-类的初始化

第25章-类的初始化 这一章主要是讲类的初始化操作&#xff0c;后续类加载章节中也会用到这一章的知识&#xff0c;只不过&#xff0c;这里就讲&#xff0c;是因为虚拟在初始化过程中&#xff0c;需要对基础类&#xff0c;比如System/Thread等类进行初始化操作&#xff0c;所以…
最新文章