返回

从 MySQL 到 ClickHouse 实时数据同步的干货分享

后端

概述

MySQL 是世界流行的开源关系型数据库,而 ClickHouse 是一个快速、开源的列式数据库,适用于大数据分析。许多企业需要将 MySQL 数据实时同步到 ClickHouse 中,以进行分析和报告。

方法介绍

从 MySQL 到 ClickHouse 的实时数据同步,主要可以分为以下三个步骤:

  1. 捕获 MySQL binlog:可以使用 Canal 工具来捕获 MySQL binlog。Canal 是一个开源的 MySQL binlog 解析工具,可以将 binlog 中的事件解析成结构化的数据。
  2. 将数据写入 Kafka:可以使用 Flink 将解析后的 binlog 数据写入 Kafka。Flink 是一个开源的分布式流处理框架,可以将数据从一个数据源流式写入另一个数据源。
  3. ClickHouse 消费 Kafka 数据:可以使用 ClickHouse 消费 Kafka 数据,并将数据写入 ClickHouse 表中。ClickHouse 是一个快速、开源的列式数据库,适用于大数据分析。

操作步骤

1. 安装 Canal

git clone https://github.com/alibaba/canal.git
cd canal
mvn package

2. 启动 Canal

nohup java -jar canal.jar --config canal.properties &

3. 安装 Flink

wget https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
tar -xzvf flink-1.15.2-bin-scala_2.12.tgz

4. 启动 Flink

cd flink-1.15.2-bin-scala_2.12
./bin/start-cluster.sh

5. 创建 Flink 作业

vim FlinkJob.java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;

import java.util.Properties;

public class FlinkJob {

    public static void main(String[] args) throws Exception {
        // 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建 Kafka producer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-topic", new KafkaSerializationSchema<String>() {
            @Override
            public byte[] serialize(String element, KafkaSerializationSchema.Context context) {
                return element.getBytes();
            }
        }, kafkaProps);

        // 创建数据流
        DataStream<String> inputStream = env.addSource(new CanalSource());

        // 将数据写入 Kafka
        inputStream.addSink(kafkaProducer);

        // 执行作业
        env.execute();
    }
}

6. 启动 Flink 作业

./bin/flink run FlinkJob.jar

7. 安装 ClickHouse

wget https://clickhouse.yandex/downloads/releases/stable/clickhouse-server-stable.tar.gz
tar -xzvf clickhouse-server-stable.tar.gz

8. 启动 ClickHouse

./clickhouse-server/bin/clickhouse-server --config-file=/etc/clickhouse-server/config.xml &

9. 创建 ClickHouse 表

CREATE TABLE flink_table (
    id Int64,
    name String,
    age Int32
) ENGINE = MergeTree()
ORDER BY id

10. 启动 ClickHouse 消费 Kafka 数据

CREATE TABLE flink_table_kafka (
    id Int64,
    name String,
    age Int32
) ENGINE = Kafka()
SETTINGS kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'flink-topic',
    kafka_group_id = 'flink_group'

总结

以上就是从 MySQL 到 ClickHouse 实时数据同步的详细步骤。这种方法可以保证数据同步的实时性和准确性。