WaterDrop将Kafka中的数据写入Clickhouse
2020/03 作者:ihunter 0 次 0
https://github.com/InterestingLab/waterdrop/releases
https://interestinglab.github.io/waterdrop/#/zh-cn/v2/
一个简单易用,高性能,能够应对海量数据的数据处理产品
简单易用,灵活配置,无需开发
实时流式处理
高性能
海量数据处理能力
模块化和插件化,易于扩展
支持利用SQL做数据处理和聚合
支持spark 2.x
# 下载
wget https://github.com/InterestingLab/waterdrop/releases/download/v2.0.0-pre/waterdrop-dist-2.0.0-pre-2.11.8-release.zip && unzip waterdrop-dist-2.0.0-pre-2.11.8-release.zip && rm -rf waterdrop-dist-2.0.0-pre-2.11.8-release.zip
# 修改配置文件waterdrop-env.sh
vi config/waterdrop-env.sh
SPARK_HOME=/data/work/spark-2.4 #配置为spark的路径
# 增加配置文件small.conf
spark { spark.streaming.batchDuration = 5 spark.app.name = "small_spark_streaming" spark.ui.port = 14020 spark.executor.instances = 3 spark.executor.cores = 1 spark.executor.memory = "1g" } input { kafkaStream { topics = "small" consumer.bootstrap.servers = "hadoop008.eqxiu.com:9092,hadoop006.eqxiu.com:9092,hadoop007.eqxiu.com:9092" consumer.zookeeper.connect = "hadoop004:2181,hadoop003:2181,hadoop002:2181,hadoop001:2181,hadoop005:2181" consumer.group.id = "clickhouse_small" consumer.failOnDataLoss = false consumer.auto.offset.reset = latest consumer.rebalance.max.retries = 100 } } filter { json{ source_field = "raw_message" } } output { clickhouse { host = "10.10.8.1:8123" database = "bw" table = "small" fields = ["act","b_t","b_v","bro","c_i","c_p","s_t","c_t","cit","cou","url","ref","u_i"] username = "" password = "" retry_codes = [209, 210 ,1002] retry = 10 bulk_size = 1000 } }
# 创建Clickhouse表
create table bw.small( act String, b_t String, b_v String, bro String, c_i String, c_p String, s_t String, c_t String, cit String, cou String, url String, ref String, u_i String ) ENGINE = MergeTree() partition by toYYYYMMDD(toDateTime(toUInt64(s_t)/1000)) order by (s_t);
# 启动写入程序
cd /data/work/waterdrop-1.4.1
cd /data/work/waterdrop-1.4.1 sh bin/start-waterdrop.sh --master yarn --deploy-mode client --config small.conf
上篇:
12个顶级大数据工具
下篇:
大数据分析平台的搭建应具备哪些功能?