Flinksql流式转换

使用flinksql完成流式处理任务,具体需求如下。
需求描述:   
1)自定义kafka主题,使用flink从kafka中读取原油实时售价数据,注册成表;
数据如下
油品种类       价格        时间
LightCrudeOil    100    2020/11/25 09:00:00
MediumCrudeOil    90    2020/11/25 09:00:00
HeavyCrudeOil    78    2020/11/25 09:00:00
MediumCrudeOil    88    2020/11/25 10:45:00
MediumCrudeOil    89    2020/11/25 11:15:00
HeavyCrudeOil    76    2020/11/25 11:49:00
2)自定义kafka主题,从kafka中读取购买原油订单数据,注册成表;
油品种类     购买数量     时间
MediumCrudeOil    2    2020/11/25 10:15:00
LightCrudeOil    1    2020/11/25 10:30:00
HeavyCrudeOil    5    2020/11/25 10:32:00
MediumCrudeOil    3    2020/11/25 10:52:00
LightCrudeOil    6    2020/11/25 11:04:00
3)将kafka偏移量保存在checkpoint中,使用精确一次消费模式;
4)使用将原油实时售价数据注册成Temporal Table的方式进行计算 ;
5)现在要根据订单下单的时间和实时的单价表使用flinksql实时计算出订单的金额,字段包含油品种类、购买数量、订单金额、购买时间;
(例如10点52分的MediumCrudeOil原油价格要遵从10点45分时的价格而不是9点的价格)
6)使用flinksql实时计算MediumCrudeOil下单次数及其总订单金额;
7)将第5问的结果存入Hbase中,表名oilSales,列族f1,字段名自拟;
8)将第6问的结果输出到Kafka的aggSales主题,并查看输出结果。

 

代码:

import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
//import org.apache.hadoop.hbase.client.Table
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.TemporalTableFunction
import org.apache.flink.types.Row
import org.apache.hadoop.hbase.client.Connection

case class OilPrice(item:String, price:Double, pTime:Long)

case class OilCount(item:String, num:Int, cTime:Long)

case class OilPC(item:String, num:Int,price:Double,amount:Double,cTime:Long)

object SqlSteam {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //启用状态后端
    env.setStateBackend(new MemoryStateBackend())
//    env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/flink/ck"))

    //设置checkpoint
    env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE)

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val input = env.socketTextStream("hadoop102", 1111)
    val input2 = env.socketTextStream("hadoop102", 2222)

   /* val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop102:9092")
    properties.setProperty("group.id", "test")
    val input = env.addSource(new FlinkKafkaConsumer011[String]("crudeoil1", new SimpleStringSchema(), properties))
    val input2 = env.addSource(new FlinkKafkaConsumer011[String]("crudeoil2", new SimpleStringSchema(), properties))*/

    val priceDataStream: DataStream[OilPrice] = input.filter(_.nonEmpty).map(data => {
      val dataArr: Array[String] = data.split("\t")
      val pTime: Long = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(dataArr(2).trim).getTime
      OilPrice(dataArr(0).trim, dataArr(1).trim.toDouble, pTime)
    }).assignAscendingTimestamps(_.pTime)

    val countDataStream = input2.filter(_.nonEmpty).map(data => {
      val dataArr = data.split("\t")
      val cTime: Long = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(dataArr(2).trim).getTime
      OilCount(dataArr(0).trim, dataArr(1).trim.toInt, cTime)
    }).assignAscendingTimestamps(_.cTime)

    val tenv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val priceTable: Table = tenv.fromDataStream(priceDataStream, 'item,'price,'pTime,'ts.rowtime)
    tenv.createTemporaryView("priceTable",priceTable)

    val priceFun: TemporalTableFunction = priceTable.createTemporalTableFunction('ts,'item)

    tenv.registerFunction("pricehistory",priceFun)

    //订单流转换
    val countTable: Table = tenv.fromDataStream(countDataStream, 'item,'num,'cTime,'ts.rowtime)
    tenv.createTemporaryView("countTable",countTable)

    //现在要根据订单下单的时间和实时的单价表使用flinksql实时计算出订单的金额,字段包含油品种类、购买数量、订单金额、购买时间;
    val saleTable: Table = tenv.sqlQuery(
      """
        |select c.item,c.num,p.price,c.num * p.price as amount, c.cTime
        |from countTable as c,
        |LATERAL TABLE(pricehistory(c.ts)) as p
        |where c.item=p.item
        |""".stripMargin
    )
    val sql1 = saleTable.toAppendStream[Row]

    //sql1.print("sale")

    val outStream = sql1.map(t => {
      OilPC(t.getField(0).toString, t.getField(1).toString.toInt, t.getField(2).toString.toDouble, t.getField(3).toString.toDouble, t.getField(4).toString.toLong)
    })

    //outStream.addSink(new InHbase())

    //使用flinksql实时计算MediumCrudeOil下单次数及其总订单金额;
    val sql2 = tenv.sqlQuery(
      """
        |select count(c.num),sum(c.num * p.price) as sumamount
        |from countTable as c,
        |LATERAL TABLE(pricehistory(c.ts)) as p
        |where c.item='MediumCrudeOil' and c.item=p.item
        |""".stripMargin
    )

    val value = sql2.toRetractStream[Row]

    value.print("sum")

    value.map(_.toString).addSink(new FlinkKafkaProducer011[String](
      "hadoop102:9092",
      "aggSales",                  // target topic
      new SimpleStringSchema))

    env.execute()
  }
}

class InHbase() extends RichSinkFunction[OilPC]{

  var conn:Connection = _
  var table:org.apache.hadoop.hbase.client.Table = _

  override def open(parameters: Configuration): Unit = {
    val conf = HBaseConfiguration.create()
    conf.set(HConstants.ZOOKEEPER_QUORUM,"hadoop102:2181")
    conn = ConnectionFactory.createConnection(conf)
    table = conn.getTable(TableName.valueOf("oilSales"))
  }

  override def invoke(value: OilPC, context: SinkFunction.Context[_]): Unit = {
    val put = new Put(value.item.getBytes())
    put.addColumn("f1".getBytes(),"num".getBytes(),value.num.toString.getBytes())
    put.addColumn("f1".getBytes(),"price".getBytes(),value.price.toString.getBytes())
    put.addColumn("f1".getBytes(),"amount".getBytes(),value.amount.toString.getBytes())
    put.addColumn("f1".getBytes(),"cTime".getBytes(),value.cTime.toString.getBytes())
    table.put(put)

  }

  override def close(): Unit = {
    table.close()
    conn.close()
  }
}


版权声明:本文为weixin_54215904原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>