Flinksql流式转换
使用flinksql完成流式处理任务,具体需求如下。
需求描述:
1)自定义kafka主题,使用flink从kafka中读取原油实时售价数据,注册成表;
数据如下
油品种类 价格 时间
LightCrudeOil 100 2020/11/25 09:00:00
MediumCrudeOil 90 2020/11/25 09:00:00
HeavyCrudeOil 78 2020/11/25 09:00:00
MediumCrudeOil 88 2020/11/25 10:45:00
MediumCrudeOil 89 2020/11/25 11:15:00
HeavyCrudeOil 76 2020/11/25 11:49:00
2)自定义kafka主题,从kafka中读取购买原油订单数据,注册成表;
油品种类 购买数量 时间
MediumCrudeOil 2 2020/11/25 10:15:00
LightCrudeOil 1 2020/11/25 10:30:00
HeavyCrudeOil 5 2020/11/25 10:32:00
MediumCrudeOil 3 2020/11/25 10:52:00
LightCrudeOil 6 2020/11/25 11:04:00
3)将kafka偏移量保存在checkpoint中,使用精确一次消费模式;
4)使用将原油实时售价数据注册成Temporal Table的方式进行计算 ;
5)现在要根据订单下单的时间和实时的单价表使用flinksql实时计算出订单的金额,字段包含油品种类、购买数量、订单金额、购买时间;
(例如10点52分的MediumCrudeOil原油价格要遵从10点45分时的价格而不是9点的价格)
6)使用flinksql实时计算MediumCrudeOil下单次数及其总订单金额;
7)将第5问的结果存入Hbase中,表名oilSales,列族f1,字段名自拟;
8)将第6问的结果输出到Kafka的aggSales主题,并查看输出结果。
代码:
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
//import org.apache.hadoop.hbase.client.Table
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TemporalTableFunction
import org.apache.flink.types.Row
import org.apache.hadoop.hbase.client.Connection
case class OilPrice(item:String, price:Double, pTime:Long)
case class OilCount(item:String, num:Int, cTime:Long)
case class OilPC(item:String, num:Int,price:Double,amount:Double,cTime:Long)
object SqlSteam {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//启用状态后端
env.setStateBackend(new MemoryStateBackend())
// env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/flink/ck"))
//设置checkpoint
env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input = env.socketTextStream("hadoop102", 1111)
val input2 = env.socketTextStream("hadoop102", 2222)
/* val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "test")
val input = env.addSource(new FlinkKafkaConsumer011[String]("crudeoil1", new SimpleStringSchema(), properties))
val input2 = env.addSource(new FlinkKafkaConsumer011[String]("crudeoil2", new SimpleStringSchema(), properties))*/
val priceDataStream: DataStream[OilPrice] = input.filter(_.nonEmpty).map(data => {
val dataArr: Array[String] = data.split("\t")
val pTime: Long = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(dataArr(2).trim).getTime
OilPrice(dataArr(0).trim, dataArr(1).trim.toDouble, pTime)
}).assignAscendingTimestamps(_.pTime)
val countDataStream = input2.filter(_.nonEmpty).map(data => {
val dataArr = data.split("\t")
val cTime: Long = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(dataArr(2).trim).getTime
OilCount(dataArr(0).trim, dataArr(1).trim.toInt, cTime)
}).assignAscendingTimestamps(_.cTime)
val tenv: StreamTableEnvironment = StreamTableEnvironment.create(env)
val priceTable: Table = tenv.fromDataStream(priceDataStream, 'item,'price,'pTime,'ts.rowtime)
tenv.createTemporaryView("priceTable",priceTable)
val priceFun: TemporalTableFunction = priceTable.createTemporalTableFunction('ts,'item)
tenv.registerFunction("pricehistory",priceFun)
//订单流转换
val countTable: Table = tenv.fromDataStream(countDataStream, 'item,'num,'cTime,'ts.rowtime)
tenv.createTemporaryView("countTable",countTable)
//现在要根据订单下单的时间和实时的单价表使用flinksql实时计算出订单的金额,字段包含油品种类、购买数量、订单金额、购买时间;
val saleTable: Table = tenv.sqlQuery(
"""
|select c.item,c.num,p.price,c.num * p.price as amount, c.cTime
|from countTable as c,
|LATERAL TABLE(pricehistory(c.ts)) as p
|where c.item=p.item
|""".stripMargin
)
val sql1 = saleTable.toAppendStream[Row]
//sql1.print("sale")
val outStream = sql1.map(t => {
OilPC(t.getField(0).toString, t.getField(1).toString.toInt, t.getField(2).toString.toDouble, t.getField(3).toString.toDouble, t.getField(4).toString.toLong)
})
//outStream.addSink(new InHbase())
//使用flinksql实时计算MediumCrudeOil下单次数及其总订单金额;
val sql2 = tenv.sqlQuery(
"""
|select count(c.num),sum(c.num * p.price) as sumamount
|from countTable as c,
|LATERAL TABLE(pricehistory(c.ts)) as p
|where c.item='MediumCrudeOil' and c.item=p.item
|""".stripMargin
)
val value = sql2.toRetractStream[Row]
value.print("sum")
value.map(_.toString).addSink(new FlinkKafkaProducer011[String](
"hadoop102:9092",
"aggSales", // target topic
new SimpleStringSchema))
env.execute()
}
}
class InHbase() extends RichSinkFunction[OilPC]{
var conn:Connection = _
var table:org.apache.hadoop.hbase.client.Table = _
override def open(parameters: Configuration): Unit = {
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM,"hadoop102:2181")
conn = ConnectionFactory.createConnection(conf)
table = conn.getTable(TableName.valueOf("oilSales"))
}
override def invoke(value: OilPC, context: SinkFunction.Context[_]): Unit = {
val put = new Put(value.item.getBytes())
put.addColumn("f1".getBytes(),"num".getBytes(),value.num.toString.getBytes())
put.addColumn("f1".getBytes(),"price".getBytes(),value.price.toString.getBytes())
put.addColumn("f1".getBytes(),"amount".getBytes(),value.amount.toString.getBytes())
put.addColumn("f1".getBytes(),"cTime".getBytes(),value.cTime.toString.getBytes())
table.put(put)
}
override def close(): Unit = {
table.close()
conn.close()
}
}