使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai

SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。

介绍

CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。

为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。

图 1 显示了我们系统的高级架构。

高层架构

图 1. 高级架构(来源:SingleStore)。

我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。

MongoDB Atlas

我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名密码主机

Apache Kafka

我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。

首先,我们将安装一些库:

!pip install pymongo kafka-python --quiet

接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:

from kafka import KafkaConsumer
from pymongo import MongoClient

try:
    client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
    db = client.adtech
    print("Connected successfully")
except:
    print("Could not connect")

consumer = KafkaConsumer(
    "ad_events",
    bootstrap_servers = ["public-kafka.memcompute.com:9092"]

我们将用我们之前从 MongoDB Atlas 保存的值替换<username>,<password>和。<host>

最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:

MAX_ITERATIONS = 100

for iteration, message in enumerate(consumer, start = 1):
    if iteration > MAX_ITERATIONS:
        break

    try:
        record = message.value.decode("utf-8")
        user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))

        events_record = {
            "user_id": int(user_id),
            "event_name": event_name,
            "advertiser": advertiser,
            "campaign": int(campaign.split()[0]),
            "gender": gender,
            "income": income,
            "page_url": page_url,
            "region": region,
            "country": country
        }

        db.events.insert_one(events_record)
    except Exception as e:
        print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应该成功加载,我们应该看到一个名为 的数据库,adtech其中包含一个名为 的集合events。集合中的文档在结构上应类似于以下示例:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档代表广告活动事件。该events集合存储 的详细信息advertiser以及campaign有关用户的各种人口统计信息,例如genderincome

SingleStore Kai

上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC 演示组
  • 云提供商: AWS
  • 区域:美国东部 1(弗吉尼亚北部)
  • 工作区名称: cdc-demo
  • 尺码: S-00
  • 设置:
    - SingleStore Kai 选择

一旦工作区可用,我们将记下密码主机该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。

从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建adtech数据库link,如下所示:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;

DROP LINK adtech.link;

CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
        "collection.include.list": "adtech.*",
        "mongodb.ssl.enabled": "true",
        "mongodb.authsource": "admin",
        "mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>",
            "mongodb.password": "<password>"}';

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我们将用我们之前从 MongoDB Atlas 保存的值替换<username>和。<password>我们还需要将<primary><secondary>和的值替换<secondary>为 MongoDB Atlas 中每个值的完整地址。

我们现在将检查是否有任何表,如下所示:

SHOW TABLES;

这应该显示一张名为events

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我们将检查表的结构:

DESCRIBE events;

输出应如下所示:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下来,我们将检查是否有pipelines

SHOW PIPELINES;

这将显示events当前调用的一个管道Stopped

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

现在我们将启动events管道:

START ALL PIPELINES;

并且状态应更改为Running

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我们现在运行以下命令:

SELECT COUNT(*) FROM events;

它应该返回 100 作为结果:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我们将检查表中的一行events,如下所示:

SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。

现在让我们使用 Metabase 创建一个仪表板。

元数据库

上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。

1. 活动总数

SELECT COUNT(*) FROM events;

2. 各地区活动

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3. Top 5 广告商活动

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4. 按性别和收入划分的广告访问者

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。

图 2.最终仪表板。

图 2.最终仪表板。

如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中  MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。

总结

在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。


作者:Akmal Chaudhri ​

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/293477.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JAVA基础学习笔记-day13-数据结构与集合源1

JAVA基础学习笔记-day13-数据结构与集合源1 1. 数据结构剖析1.1 研究对象一&#xff1a;数据间逻辑关系1.2 研究对象二&#xff1a;数据的存储结构&#xff08;或物理结构&#xff09;1.3 研究对象三&#xff1a;运算结构1.4 小结 2. 一维数组2.1 数组的特点 3. 链表3.1 链表的…

Linux之IP地址、主机名、域名解析

一、IP地址 可以通过ifconfig命令查看本机的ip地址&#xff0c;如果无法使用ifconfig命令&#xff0c;可以安装 安装&#xff1a;yum -y install net-tools ens33&#xff1a;主网卡&#xff0c;里面的inet就是ip地址 lo&#xff1a;本地回环网卡&#xff0c;127.0.0.1&…

Pytorch从零开始实战15

Pytorch从零开始实战——ResNeXt-50算法实战 本系列来源于365天深度学习训练营 原作者K同学 文章目录 Pytorch从零开始实战——ResNeXt-50算法实战环境准备数据集模型选择开始训练可视化总结 环境准备 本文基于Jupyter notebook&#xff0c;使用Python3.8&#xff0c;Pytor…

【计算机毕业设计】SSM医药信息管理系统

项目介绍 该系统共七个功能模块&#xff1a;查询模块、录入模块、删除模块、修改模块、浏览模块、打印模块和用户管理模块。 系统只有一个超级管理员&#xff0c;可以创建系统用户并进行权限管理&#xff0c;其他用户没有用户管理权限&#xff0c;只有其他权限。 不同的用户…

Jvm垃圾收集器系列之Parallel Scavenge收集器(个人见解仅供参考)

问&#xff1a;什么是Parallel Scavenge&#xff1f; 答&#xff1a;Parallel Scavenge是Java HotSpot虚拟机中的一种垃圾收集器&#xff0c;它主要用于提高应用程序的吞吐量。 问&#xff1a;Parallel Scavenge的主要目标是什么&#xff1f; 答&#xff1a;Parallel Scavenge的…

Debian12使用Xshell连接失败解决办法详细

1、Debian开启ssh服务 sudo apt update -y sudo apt install ssh2、编辑配置文件 # 安装vim sudo apt install vimvim /etc/ssh/sshd_config3、将#PermitRootLogin prohibit-password的注释去掉&#xff0c;设置为yes 4、将#PasswordAuthentication no的注释去掉&#xff0c;…

什么是DigiCert证书?

DigiCert作为全球知名的证书颁发机构&#xff0c;以其卓越的品质和全面的服务&#xff0c;为用户的数据安全保驾护航。 一、为何选择DigiCert证书&#xff1f; 权威认证&#xff1a;DigiCert与全球众多知名企业和政府机构合作&#xff0c;拥有广泛的认可度。高安全性&#xff…

太阳能杀虫灯的优点是什么

太阳能杀虫灯的优点主要包括以下几点&#xff1a; 环保节能&#xff1a;太阳能杀虫灯利用太阳能进行供电&#xff0c;无需接通市电&#xff0c;既节约能源又避免了排放污染物。适用范围广&#xff1a;只要有阳光照射的地区都可以使用太阳能杀虫灯&#xff0c;特别适合在电力资…

62.状态机实践(活动管理系统:二)

文章目录 一、简介二、状态机实践&#xff08;活动元信息管理&#xff09;1、dal/db.go2、dal/activity.go3、constdef/activity.go4、service/activity.go5、routes/routes.go6、main.go 代码地址&#xff1a;https://gitee.com/lymgoforIT/golang-trick/tree/master/37-load-…

详细解读QLC SSD无效编程问题-4

对于这些全部页面被无效化的WL&#xff0c;执行第二次编程实际上是不必要的&#xff0c;但当前的策略并未注意到这一问题。而对于那些既有有效页面又有无效页面&#xff08;图11中显示为1到3个&#xff09;的WL&#xff0c;应当被编程&#xff0c;但可以利用这些无效信息来改进…

C++设计模式 #8 抽象工厂(Abstract Factory)

抽象工厂这个名字比较难以帮助理解&#xff0c;可以把抽象工厂理解为“品牌工厂”或者“家族工厂”。 动机 在软件系统中&#xff0c;经常面临着“一系列相互依赖的对象”的创建工作&#xff1b;同时&#xff0c;由于需求的变化&#xff0c;往往存在更多系列对象的创建工作。如…

【Python可视化实战】钻石数据可视化

一、项目引言 1.背景和目标 钻石作为一种珍贵的宝石&#xff0c;其价格受到多种因素的影响。为了深入了解钻石价格的决定因素&#xff0c;我们收集了大量关于钻石的数据&#xff0c;并希望通过数据可视化来揭示钻石特征与价格之间的关系。 2.内容 收集钻石的各项特征数据&a…

【python高级用法】进程

一个简单的进程 # -*- coding: utf-8 -*-import multiprocessingdef foo(i):print (called function in process: %s %i)returnif __name__ __main__:Process_jobs []for i in range(5):p multiprocessing.Process(targetfoo, args(i,))Process_jobs.append(p)p.start()p.j…

Vue中的过滤器详解(应用场景和原理分析)

文章目录 一、是什么二、如何用定义filter小结&#xff1a; 三、应用场景四、原理分析小结&#xff1a; 参考文献 一、是什么 过滤器&#xff08;filter&#xff09;是输送介质管道上不可缺少的一种装置 大白话&#xff0c;就是把一些不必要的东西过滤掉 过滤器实质不改变原…

K-最近邻算法(KNN)是什么算法?

K-最近邻算法&#xff08;K-Nearest Neighbor&#xff0c;KNN&#xff09;是一种经典的有监督学习方法&#xff0c;也可以被归为懒惰学习&#xff08;Lazy Learning&#xff09;方法。它基于“物以类聚”的原理&#xff0c;假设样本之间的类别距离越近则它们越有可能是同一类别…

关于目标检测任务中,XML(voc格式)标注文件的可视化

1. 前言 最近在弄关于目标检测的任务&#xff0c;因为检测的图片和标签是分开的&#xff0c;可视化效果不明显&#xff0c;也不知道随便下载的数据集&#xff0c;标注信息对不对。网上看了好多代码&#xff0c;代码风格和本人平时不同&#xff0c;看起来麻烦&#xff0c;也不知…

项目使用PowerJob

新一代的定时任务框架——PowerJob 简介 PowerJob是基于java开发的企业级的分布式任务调度平台&#xff0c;与xxl-job一样&#xff0c;基于web页面实现任务调度配置与记录&#xff0c;使用简单&#xff0c;上手快速&#xff0c;其主要功能特性如下&#xff1a; 使用简单&…

ClickHouse基础介绍

目录 前言 1、什么是clickhouse 2、OLAP场景的关键特征 3、列式存储更适合于OLAP场景的原因 4、clickhouse的独特功能 5、clickhouse的缺点 6、性能 6.1、单个大查询的吞吐量 6.2、处理短查询的延迟时间 6.3、处理大量短查询的吞吐量 6.4、数据的写入性能 前言 11月…

RTSP/Onvif安防平台EasyNVR接入EasyNVS显示服务不存在的原因及解决办法

EasyNVS云管理平台具备汇聚与管理EasyGBS、EasyNVR等平台的能力&#xff0c;可以将接入的视频资源实现统一的视频能力输出&#xff0c;支持远程可视化运维等管理功能&#xff0c;还能解决设备现场没有固定公网IP却需要在公网直播的需求。 有用户在现场部署EasyNVR&#xff0c;…

how2heap-2.23-04-unsorted_bin_leak

#include<stdio.h> #include<malloc.h>int main() {char* a malloc(0x88);char* b malloc(0x8);free(a);long* c malloc(0x88);printf("%lx , %lx\n",c[0],c[1]);return 0; }unsorted bin leak原理&#xff1a;将chunk从unsorted bin申请回来时&#…
最新文章