1.随机分区
dataStream.shuffle()调用,随机
源码
public DataStream<T> shuffle() { return this.setConnectionType(new ShufflePartitioner()); }
ShufflePartitioner类中的selectChannel()方法
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return this.random.nextInt(this.numberOfChannels); }
2.数据进行在平衡(消除数据倾斜)
dataStream.rebalance()调用,轮询
源码
public DataStream<T> rebalance() { return this.setConnectionType(new RebalancePartitioner()); }
RebalancePartitioner类中的selectChannel()方法
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { this.nextChannelToSendTo = (this.nextChannelToSendTo + 1) % this.numberOfChannels; return this.nextChannelToSendTo; }
3.rescale()和rebalance类似,如果需要本地数据传输而不是通过网络传输数据
举例:
如果上游操作有2个并发,而下游操作有4个并发,那么上游的一个并发结果分配给下游的两个并发操作,另外的一个并发结果分配给了下游的另外两个并发操作.
如果下游有两个并发操作而上游又4个并发操作,那么上游的其中两个操作的结果分配给下游的一个并发操作而另外两个并发操作的结果则分配给另外一个并发操作。
dataStream.rescale()
源码
public DataStream<T> rescale() { return this.setConnectionType(new RescalePartitioner()); }
RescalePartitioner类中的selectChannel()方法
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++this.nextChannelToSendTo >= this.numberOfChannels) { this.nextChannelToSendTo = 0; } return this.nextChannelToSendTo; }
4.自定义分区
实现Partitioner方法
public class MyPartition implements Partitioner<Long> { @Override public int partition(Long num, int i) { System.out.println("当前的分区数"+i); if(num%2==0){ return 0; }else { return 1; } } }
进行方法调用:
public class StreamingWithMyPartition { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Long> text = env.addSource(new MyParallelism()); //对数据进行转换将tuple类型 DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1 map(Long value){ return new Tuple1<Long>(value); } }); //partition分区,按照第一个字段进行分区 DataStream<Tuple1<Long>> dataStream = tupleData.partitionCustom(new MyPartition(), 0); DataStream<Long> result = dataStream.map(new MapFunction<Tuple1<Long>, Long>() { @Override public Long map(Tuple1<Long> value) throws Exception { System.out.println("当前的线程ID:"+Thread.currentThread().getId()+" value:"+value); return value.getField(0); } }); //将结果打印 result.print().setParallelism(1); env.execute("StreamingWithMyPartition"); } }