Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

文章目录

  • 系列文章索引
  • 一、概述
    • 1、版本匹配
    • 2、导包
  • 二、编码实现
    • 1、基本使用
    • 2、更多配置
    • 3、自定义序列化器
    • 4、Flink SQL方式
  • 三、踩坑
    • 1、The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai.
  • 参考资料

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

一、概述

1、版本匹配

注意MySQL的版本,本次是使用MySQL8.0进行演示。
同时,Flink支持很多数据库的cdc。
在这里插入图片描述
同时也要对应好版本,我们本次使用Flink是1.18,同时FlinkCDC也是3.0版本
在这里插入图片描述

2、导包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.18.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.18.0</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.27</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>1.18.0</version>
</dependency>


二、编码实现

1、基本使用

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

/**
 * Flink MySql CDC
 * 每次启动之后,会将所有数据采集一遍
 */
public class FlinkCDC01 {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.56.10")
                .port(3306)
                .databaseList("testdb") // 要监听的数据库,可以填多个,支持正则表达式
                .tableList("testdb.access") // 监听的表,可以填多个,需要db.表,支持正则表达式
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启检查点
        env.enableCheckpointing(3000);

        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // 1个并行任务
            .setParallelism(1)
            .print()
            .setParallelism(1); // 对接收器使用并行性1来保持消息顺序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

结果是json数据:

{
“before”: null,
“after”: {
“id”: 1,
“name”: “1”
},
“source”: {
“version”: “1.9.7.Final”,
“connector”: “mysql”,
“name”: “mysql_binlog_source”,
“ts_ms”: 1707353812000,
“snapshot”: “false”,
“db”: “testdb”, // 库名
“sequence”: null,
“table”: “access”, // 表名
“server_id”: 1,
“gtid”: null,
“file”: “binlog.000005”,
“pos”: 374,
“row”: 0,
“thread”: 9,
“query”: null
},
“op”: “c”, // 操作 c是create;u是update;d是delete;r是read
“ts_ms”: 1707353812450,
“transaction”: null
}

2、更多配置

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:
initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

3、自定义序列化器

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class DomainDeserializationSchema implements DebeziumDeserializationSchema<String> {


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        String topic = sourceRecord.topic();
        String[] split = topic.split("\\.");
        System.out.println("数据库:" + split[1]);
        System.out.println("表:" + split[2]);

        Struct value = (Struct)sourceRecord.value();
        // 获取before信息
        Struct before = value.getStruct("before");
        System.out.println("before:" + before);
        if (before != null) {
            // 所有字段
            List<Field> fields = before.schema().fields();
            for (Field field : fields) {
                System.out.println("before field:" + field.name() + " value:" + before.get(field));
            }
        }
        // 获取after信息
        Struct after = value.getStruct("after");
        System.out.println("after:" + after);
        if (after != null) {
            // 所有字段
            List<Field> fields = after.schema().fields();
            for (Field field : fields) {
                System.out.println("after field:" + field.name() + " value:" + after.get(field));
            }
        }
        // 操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        System.out.println("操作:" + operation);

        // 收集序列化后的结果
        collector.collect("aaaaaaaaaaaaa");
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO; // 类型
    }
}

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("192.168.56.10")
        .port(3306)
        .databaseList("testdb") // 要监听的数据库,可以填多个
        .tableList("testdb.access") // 监听的表,可以填多个
        .username("root")
        .password("root")
        .deserializer(new DomainDeserializationSchema()) // 序列化器
        .build();

4、Flink SQL方式

CDC用的少,还是StreamAPI用的多。

三、踩坑

1、The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai.

2024-02-08 08:36:33 INFO 5217 — [lt-dispatcher-6] o.a.f.r.executiongraph.ExecutionGraph : Source: MySQL Source -> Sink: Print to Std. Out (1/1) (e2371dabd0c952a5dfa7c053cbde80c3_cbc357ccb763df2852fee8c4fc7d55f2_0_2) switched from CREATED to SCHEDULED.
2024-02-08 08:36:33 INFO 5217 — [lt-dispatcher-8] o.a.f.r.r.s.FineGrainedSlotManager : Received resource requirements from job 369b1c979674a0444f679dd13264ea88: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2024-02-08 08:36:33 INFO 5218 — [lt-dispatcher-6] o.a.flink.runtime.jobmaster.JobMaster : Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for ‘Source: MySQL Source -> Sink: Print to Std. Out’ (operator cbc357ccb763df2852fee8c4fc7d55f2).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder L a z y I n i t i a l i z e d C o o r d i n a t o r C o n t e x t . f a i l J o b ( O p e r a t o r C o o r d i n a t o r H o l d e r . j a v a : 624 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . o p e r a t o r s . c o o r d i n a t i o n . R e c r e a t e O n R e s e t O p e r a t o r C o o r d i n a t o r LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:624)atorg.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinatorQuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:248)
at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:395)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:225)
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator D e f e r r a b l e C o o r d i n a t o r . r e s e t A n d S t a r t ( R e c r e a t e O n R e s e t O p e r a t o r C o o r d i n a t o r . j a v a : 416 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . o p e r a t o r s . c o o r d i n a t i o n . R e c r e a t e O n R e s e t O p e r a t o r C o o r d i n a t o r . l a m b d a DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:416) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:416)atorg.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambdaresetToCheckpoint 7 ( R e c r e a t e O n R e s e t O p e r a t o r C o o r d i n a t o r . j a v a : 156 ) a t j a v a . u t i l . c o n c u r r e n t . C o m p l e t a b l e F u t u r e . u n i W h e n C o m p l e t e ( C o m p l e t a b l e F u t u r e . j a v a : 774 ) a t j a v a . u t i l . c o n c u r r e n t . C o m p l e t a b l e F u t u r e . u n i W h e n C o m p l e t e S t a g e ( C o m p l e t a b l e F u t u r e . j a v a : 792 ) a t j a v a . u t i l . c o n c u r r e n t . C o m p l e t a b l e F u t u r e . w h e n C o m p l e t e ( C o m p l e t a b l e F u t u r e . j a v a : 2153 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . o p e r a t o r s . c o o r d i n a t i o n . R e c r e a t e O n R e s e t O p e r a t o r C o o r d i n a t o r . r e s e t T o C h e c k p o i n t ( R e c r e a t e O n R e s e t O p e r a t o r C o o r d i n a t o r . j a v a : 143 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . o p e r a t o r s . c o o r d i n a t i o n . O p e r a t o r C o o r d i n a t o r H o l d e r . r e s e t T o C h e c k p o i n t ( O p e r a t o r C o o r d i n a t o r H o l d e r . j a v a : 284 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . c h e c k p o i n t . C h e c k p o i n t C o o r d i n a t o r . r e s t o r e S t a t e T o C o o r d i n a t o r s ( C h e c k p o i n t C o o r d i n a t o r . j a v a : 2044 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . c h e c k p o i n t . C h e c k p o i n t C o o r d i n a t o r . r e s t o r e L a t e s t C h e c k p o i n t e d S t a t e I n t e r n a l ( C h e c k p o i n t C o o r d i n a t o r . j a v a : 1719 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . c h e c k p o i n t . C h e c k p o i n t C o o r d i n a t o r . r e s t o r e L a t e s t C h e c k p o i n t e d S t a t e T o A l l ( C h e c k p o i n t C o o r d i n a t o r . j a v a : 1647 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . s c h e d u l e r . S c h e d u l e r B a s e . r e s t o r e S t a t e ( S c h e d u l e r B a s e . j a v a : 434 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . s c h e d u l e r . D e f a u l t S c h e d u l e r . r e s t a r t T a s k s ( D e f a u l t S c h e d u l e r . j a v a : 419 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . s c h e d u l e r . D e f a u l t S c h e d u l e r . l a m b d a 7(RecreateOnResetOperatorCoordinator.java:156) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:143) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:284) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:2044) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1719) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1647) at org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:434) at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasks(DefaultScheduler.java:419) at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda 7(RecreateOnResetOperatorCoordinator.java:156)atjava.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)atjava.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)atjava.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)atorg.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:143)atorg.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:284)atorg.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:2044)atorg.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1719)atorg.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1647)atorg.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:434)atorg.apache.flink.runtime.scheduler.DefaultScheduler.restartTasks(DefaultScheduler.java:419)atorg.apache.flink.runtime.scheduler.DefaultScheduler.lambdanull 2 ( D e f a u l t S c h e d u l e r . j a v a : 379 ) a t j a v a . u t i l . c o n c u r r e n t . C o m p l e t a b l e F u t u r e . u n i R u n ( C o m p l e t a b l e F u t u r e . j a v a : 719 ) a t j a v a . u t i l . c o n c u r r e n t . C o m p l e t a b l e F u t u r e 2(DefaultScheduler.java:379) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) at java.util.concurrent.CompletableFuture 2(DefaultScheduler.java:379)atjava.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)atjava.util.concurrent.CompletableFutureUniRun.tryFire(CompletableFuture.java:701)
at java.util.concurrent.CompletableFuture C o m p l e t i o n . r u n ( C o m p l e t a b l e F u t u r e . j a v a : 456 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . r p c . p e k k o . P e k k o R p c A c t o r . l a m b d a Completion.run(CompletableFuture.java:456) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda Completion.run(CompletableFuture.java:456)atorg.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambdahandleRunAsync 4 ( P e k k o R p c A c t o r . j a v a : 451 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . c o n c u r r e n t . C l a s s L o a d i n g U t i l s . r u n W i t h C o n t e x t C l a s s L o a d e r ( C l a s s L o a d i n g U t i l s . j a v a : 68 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . r p c . p e k k o . P e k k o R p c A c t o r . h a n d l e R u n A s y n c ( P e k k o R p c A c t o r . j a v a : 451 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . r p c . p e k k o . P e k k o R p c A c t o r . h a n d l e R p c M e s s a g e ( P e k k o R p c A c t o r . j a v a : 218 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . r p c . p e k k o . F e n c e d P e k k o R p c A c t o r . h a n d l e R p c M e s s a g e ( F e n c e d P e k k o R p c A c t o r . j a v a : 85 ) a t o r g . a p a c h e . f l i n k . r u n t i m e . r p c . p e k k o . P e k k o R p c A c t o r . h a n d l e M e s s a g e ( P e k k o R p c A c t o r . j a v a : 168 ) a t o r g . a p a c h e . p e k k o . j a p i . p f . U n i t C a s e S t a t e m e n t . a p p l y ( C a s e S t a t e m e n t s . s c a l a : 33 ) a t o r g . a p a c h e . p e k k o . j a p i . p f . U n i t C a s e S t a t e m e n t . a p p l y ( C a s e S t a t e m e n t s . s c a l a : 29 ) a t s c a l a . P a r t i a l F u n c t i o n . a p p l y O r E l s e ( P a r t i a l F u n c t i o n . s c a l a : 127 ) a t s c a l a . P a r t i a l F u n c t i o n . a p p l y O r E l s e 4(PekkoRpcActor.java:451) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse 4(PekkoRpcActor.java:451)atorg.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)atorg.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)atorg.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)atorg.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)atorg.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)atorg.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)atorg.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)atscala.PartialFunction.applyOrElse(PartialFunction.scala:127)atscala.PartialFunction.applyOrElse(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction O r E l s e . a p p l y O r E l s e ( P a r t i a l F u n c t i o n . s c a l a : 175 ) a t s c a l a . P a r t i a l F u n c t i o n OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction OrElse.applyOrElse(PartialFunction.scala:175)atscala.PartialFunctionOrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction O r E l s e . a p p l y O r E l s e ( P a r t i a l F u n c t i o n . s c a l a : 176 ) a t o r g . a p a c h e . p e k k o . a c t o r . A c t o r . a r o u n d R e c e i v e ( A c t o r . s c a l a : 547 ) a t o r g . a p a c h e . p e k k o . a c t o r . A c t o r . a r o u n d R e c e i v e OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive OrElse.applyOrElse(PartialFunction.scala:176)atorg.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)atorg.apache.pekko.actor.Actor.aroundReceive(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
at com.ververica.cdc.connectors.mysql.MySqlValidator.checkTimeZone(MySqlValidator.java:184)
at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:72)
at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:197)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:221)
… 42 common frames omitted

查看mysql:
show variables like ‘%time_zone%’;
在这里插入图片描述
解决方案:

SET time_zone = 'Asia/Shanghai';
SET @@global.time_zone = 'Asia/Shanghai';
#再次查看
SELECT @@global.time_zone;
show variables like '%time_zone%';

在这里插入图片描述

参考资料

源码:https://github.com/ververica/flink-cdc-connectors
文档:https://ververica.github.io/flink-cdc-connectors/master/content/overview/cdc-connectors.html
官网:https://ververica.github.io/flink-cdc-connectors/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/380039.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

kmeans聚类选择最优K值python实现

Kmeans算法中K值的确定是很重要的。 下面利用python中sklearn模块进行数据聚类的K值选择 数据集自制数据集&#xff0c;格式如下&#xff1a; 维度为3。 ①手肘法 手肘法的核心指标是SSE(sum of the squared errors&#xff0c;误差平方和)&#xff0c; 其中&#xff0c;Ci是第…

微调LLM或使用RAG,开发RAG管道的12个痛点

论文地址&#xff1a;archive.is/bNbZo Pain Point 1: Missing Content 内容缺失 Pain Point 2: Missed the Top Ranked Documents 错过排名靠前的文档 Pain Point 3: Not in Context — Consolidation Strategy Limitations 不在上下文中 — 整合战略的局限性 Pain Point …

免费生成ios证书的方法(无需mac电脑)

使用hbuilderx的uniapp框架开发移动端程序很方便&#xff0c;可以很方便地开发出移动端的小程序和app。但是打包ios版本的app的时候却很麻烦&#xff0c;官方提供的教程需要使用mac电脑来生成证书&#xff0c;但是mac电脑却不便宜&#xff0c;一般的型号都差不多上万。 因此&a…

【Java面试】数据类型常见面试题

什么是包装类型 将基本类型包装进了对象中得到的类型 基本类型和包装类型有什么区别 用途不同&#xff1a;基本类型一般用于局部变量&#xff0c;包装类型用于其他地方存储方式不同&#xff1a;用于局部变量的基本类型存在虚拟机栈中的局部变量表中&#xff0c;用于成员变量…

火星符号运算 - 华为OD统一考试

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 已知火星人使用的运算符号为 #和$ 其与地球人的等价公式如下 x#y2*x3*y4 x$y3*xy2x y是无符号整数。地球人公式按照c语言规则进行计算。火星人公式中&#xff0…

基于鲲鹏服务器的LNMP配置

基于鲲鹏服务器的LNMP配置 系统 Centos8 # cat /etc/redhat-release CentOS Linux release 8.0.1905 (Core) 卸载已经存在的旧版本的安装包 # rpm -qa | grep php #查看已经安装的PHP旧版本# rpm -qa | grep php | xargs rpm -e #卸载已经安装的旧版&#xff0c;如果提示有…

113.路径总和 II

给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…

RocketMQ(二):领域模型(生产者、消费者)

1 生产者&#xff08;Producer&#xff09; 本节介绍Apache RocketMQ 中生产者的定义、模型关系、内部属性、版本兼容和使用建议。 1.1 定义 生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。 生产者通常被集成在业务系统中&#xff0c;将业务消息按照要…

513. 找树左下角的值 - 力扣(LeetCode)

题目描述 给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 题目示例 输入: root [2,1,3] 输出: 1 解题思路 深度优先搜索 使用 depth 记录遍历到的节点的深度&#xff0c;result 记录深度在 depth 的最…

C++ 动态规划 记忆化搜索 滑雪

给定一个 R 行 C 列的矩阵&#xff0c;表示一个矩形网格滑雪场。 矩阵中第 i 行第 j 列的点表示滑雪场的第 i 行第 j 列区域的高度。 一个人从滑雪场中的某个区域内出发&#xff0c;每次可以向上下左右任意一个方向滑动一个单位距离。 当然&#xff0c;一个人能够滑动到某相…

C++:二叉搜索树模拟实现(KV模型)

C&#xff1a;二叉搜索树模拟实现&#xff08;KV模型&#xff09; 前言模拟实现KV模型1. 节点封装2、前置工作&#xff08;默认构造、拷贝构造、赋值重载、析构函数等&#xff09;2. 数据插入&#xff08;递归和非递归版本&#xff09;3、数据删除&#xff08;递归和非递归版本…

【芯片设计- RTL 数字逻辑设计入门 15 -- 函数实现数据大小端转换】

文章目录 函数实现数据大小端转换函数语法函数使用的规则Verilog and Testbench综合图VCS 仿真波形 函数实现数据大小端转换 在数字芯片设计中&#xff0c;经常把实现特定功能的模块编写成函数&#xff0c;在需要的时候再在主模块中调用&#xff0c;以提高代码的复用性和提高设…

《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)

文章目录 6.1 设置和管理复制6.1.1 基础知识6.1.2 重点案例&#xff1a;使用 Python 设置 MySQL 主从复制6.1.3 拓展案例 1&#xff1a;自动故障转移6.1.4 拓展案例 2&#xff1a;设置双主复制 6.2 复制的类型和策略6.2.1 基础知识6.2.2 重点案例&#xff1a;使用 Python 设置半…

目标检测 | 卷积神经网络(CNN)详细介绍及其原理详解

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。卷积神经网络&#xff08;Convolutional Neural Network&#xff0c;CNN&#xff09;是一种深度学习模型&#xff0c;主要用于图像识别和计算机视觉任务。它的设计灵感来自于生物学中视觉皮层的工作原理。CNN的核心思想是通…

极智一周 | 国产CPU系列汇总、鲲鹏、飞腾、平头哥 And so on

欢迎关注我的公众号 [极智视界]&#xff0c;获取我的更多技术分享 大家好&#xff0c;我是极智视界&#xff0c;带来本周的 [极智一周]&#xff0c;关键词&#xff1a;国产CPU系列汇总、鲲鹏、飞腾、平头哥 And so on。 邀您加入我的知识星球「极智视界」&#xff0c;星球目前…

一分钟了解电脑关机快捷键是什么!

在日常使用电脑的过程中&#xff0c;了解一些基本的快捷键是提高效率的关键之一。其中&#xff0c;电脑关机快捷键是一个方便且迅速的操作&#xff0c;使您可以在不用通过烦琐的菜单操作的情况下&#xff0c;快速关机电脑。在本文中&#xff0c;我们将探讨电脑关机快捷键是什么…

Linux——进程池(管道)

经过了管道的介绍之后&#xff0c;我们可以实现了进程间通信&#xff0c;现在我就来简单介 绍一下管道的应用场景——进程池。1. 引入 在我们的编码过程中&#xff0c;不乏会听到&#xff0c;内存池&#xff0c;进程池&#xff0c;空间配置器等等名词&#xff0c;这些是用来干…

NLP_神经概率语言模型(NPLM)

文章目录 NPLM的起源NPLM的实现1.构建实验语料库2.生成NPLM训练数据3.定义NPLM4.实例化NPLM5.训练NPLM6.用NPLM预测新词 NPLM小结 NPLM的起源 在NPLM之前&#xff0c;传统的语言模型主要依赖于最基本的N-Gram技术&#xff0c;通过统计词汇的共现频率来计算词汇组合的概率。然而…

【Linux】SystemV IPC

进程间通信 一、SystemV 共享内存1. 共享内存原理2. 系统调用接口&#xff08;1&#xff09;创建共享内存&#xff08;2&#xff09;形成 key&#xff08;3&#xff09;测试接口&#xff08;4&#xff09;关联进程&#xff08;5&#xff09;取消关联&#xff08;6&#xff09;释…

5周年狂欢,WeTrade众汇积分商城又送车啦!

各位投资者&#xff1a;新年好啊&#xff01; WeTrade众汇承诺积分商城所有礼品&#xff0c;不论价值大小&#xff0c;送出均为真实有效&#xff0c;不做虚假宣传。 WeTrade众汇继2018年9月28日送出特斯拉Model X后&#xff0c;又一次迎来了第二位在积分商城兑换豪车的客户! …
最新文章