seatunnel同步Mysql至Hive
什么是seatunnel
SeaTunnel是一个非常好用的超高性能分布式数据集成平台,支持海量数据的实时同步。每天可以稳定高效地同步数百亿数据,已应用于近百家企业的生产中。
使用场景
- 海量数据同步
- 海量数据整合
- 具有海量数据的 ETL
- 海量数据聚合
- 多源数据处理
特点
- 简单易用,配置灵活,低代码开发
- 实时流式传输
- 离线多源数据分析
- 高性能、海量数据处理能力
- 模块化和插件机制,易于扩展
- 支持SQL数据处理和聚合
- 支持 Spark 结构化流
- 支持 Spark 2.x
解决海量数据同步存在的问题
- 数据丢失和重复
- 任务累积和延迟
- 低吞吐量
- 长周期应用于生产环境
- 缺乏应用运行状态监控
示例,同步mysql至hive
SEATUNNEL_HOME=/data/apps/apache-seatunnel-incubating-2.1.1
wf_mysql_db=publish_account
wf_mysql_table="(select * from t_common_unique_report where data_timestamp=-999) as A"
hive_db=ods_common
hive_table=t_common_unique_report_all
import_mysql_to_hive(){
UUID=$(cat /proc/sys/kernel/random/uuid)
UUID=${UUID//-/}
table_date=$(date "+%Y%m%d")
# 打印数据传输脚本并赋值
cat>$SEATUNNEL_HOME/jobs/mysql2hive_${table_date}_${UUID}.conf<<!EOF
env {
spark.streaming.batchDuration = 5
spark.app.name = "mysql2hive"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "4g"
spark.queue = "default"
# 因为sink用到hive源,所以必须进行以下配置
spark.sql.catalogImplementation = "hive"
}
source {
jdbc {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://mysq.hk.jdbc.cn:3306/$1?useUnicode=true&characterEncoding=utf8&useSSL=false"
table = "$2"
user = "user_name"
password = "test_password"
result_table_name = "$3"
}
}
transform {
}
sink {
Hive {
sql = "insert overwrite table $4 select * from $3"
}
}
!EOF
echo "同步mysql表数据到hive..."
$SEATUNNEL_HOME/bin/start-seatunnel-spark.sh --master yarn --deploy-mode client --config $SEATUNNEL_HOME/jobs/mysql2hive_${table_date}_${UUID}.conf
}
import_mysql_to_hive "$wf_mysql_db" "$wf_mysql_table" "$hive_db_$hive_table" "$hive_db.$hive_table"
参考链接:
https://seatunnel.apache.org/docs/2.1.3/connector/source/Jdbc
https://seatunnel.apache.org/docs/2.1.3/connector/sink/Hive
版权声明:本文为qq_39211866原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。