flink-8 重分区算子
重分区算子种类
重分区算子用来对数据进行重新分区,可以用来解决数据倾斜问题
- Random Partitioning
- 根据均匀分布随机分配元素,(类似于random.nextInt(3),0 - 3 在概率上是均匀的)
- dataStream.shuffle()
- Rebalancing
- 分区元素循环,每个分区创建相等的负载。数据发生倾斜的时候可以用于性能优化
- 对数据集进行再平衡,重分组,消除数据倾斜
- dataStream.rebalance()
- Rescaling
- rescale与rebalance很像,也是将数据均匀分布到各下游各实例上,但它的传输开销更小,因为rescale并不是将每个数据轮询地发送给下游每个实例,而是就近发送给下游实例
- dataSteam.rescale()
- Custom Partitioning
- 自定义分区需要时间Paritition接口
- dataStream.partitionCustom(partitioner, “someKey”)
- 或者dataStream.partitionCustom(partitioner,0)
- 自定义分区需要时间Paritition接口
- Brodcasting
- 即广播变量。将数据分发到每一个JVM进程,供当前进程的所有线程共享数据。
Random Partitioning
上游数据会随机的选择下游的一个分区下发数据
Rebalancing
第一次随机选择一个分区,后续按照顺序轮序
注意,以下问题没考证
- 据说老版本的Rebalancing第一次选择的分区是固定的,因此当数据少的时候,第一个固定的分区反而会数据倾斜,不过现在没这个问题了
Rescaling
根据资源使用情况动态调节同一作业的数据分布,根据物理实例部署时的资源共享情况动态调节数据分布,目的是让数据尽可能的在同一 solt 内流转,以减少网络开销。
stream.setParallelism(4).rescale.print().setParallelism(2)
// 前面会4个分区,后面算子只有2个分区。
// 那么前面4个分区会映射到后面的2个分区中
// 例如4中的0,1 映射到后面的 0 分区,2,3映射到后面1分区
// 若
stream.setParallelism(2).rescale.print().setParallelism(4)
// 前面有2个分区,后面算子有4个分区
// 那么前面的0号分区会选择后面的0,1分区发数据。 1号分区会选择后面的2,3号分区发数据
如上图所示,当上游有两个实例时,上游第一个实例将数据发送给下游第一个和第二个实例,上游第二个实例将数据发送给下游第三个和第四个实例,相比rebalance将数据发送给下游每个实例,rescale的传输开销更小。下图则展示了当上游有四个实例,上游前两个实例将数据发送给下游第一个实例,上游后两个实例将数据发送给下游第二个实例。
示例:对filter后的数据进行重分区
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object filterRe {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceDS: DataStream[Int] = env.fromCollection(1 to 100)
val filterDS: DataStream[Int] = sourceDS.filter(x=>x%2 ==0).shuffle
//.shuffle
//.rescale
.rebalance
val resultDS: DataStream[(Int, Int)] = filterDS.map(new RichMapFunction[Int, (Int, Int)] {
override def map(value: Int): (Int, Int) = {
//获取任务id value
(getRuntimeContext.getIndexOfThisSubtask, value)
}
})
resultDS.print()
env.execute()
}
}
输出
定义分区
- 继承
Partitioner
实现自定义分区类 dataSteam.partitionCustom
示例代码
- 将包含hadoop字符串的数据发送到一个分区,不包含的发送到另外的分区
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object customerRe {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceDS: DataStream[String] = env.fromElements("hadoop hbase","flink spark","hello world","hello hadoop")
val rePartiton: DataStream[String] = sourceDS.partitionCustom(new MyPartitioner,x=>x + "")
rePartiton.map(x=>{
println("数据的key为" + x + "线程为" + Thread.currentThread().getId)
x
})
rePartiton.print()
env.execute()
}
}
/*
* 将包含hadoop字符串的数据发送到一个分区,不包含的发送到另外的分区*/
class MyPartitioner extends Partitioner[String]{
override def partition(key: String, numPartitions: Int): Int = {
println("分区个数为" + numPartitions)
if (key.contains("hadoop")){
0
}else{
1
}
}
}
输出
广播变量
广播变量后面单独写一篇记录
版权声明:本文为a3125504x原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。