文章
问答
冒泡
三、Flink DataStream Transforms(数据流转换算子)

概述

Map

DataStream → DataStream

接收一个元素,并产生一个元素。

dataStream.map { x => x * 2 } 
//1. map,把String转换成对应长度输出
DataStream<Integer> mapStream = dataStream.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String s) throws Exception {
        return s.length();
    }
});

FlatMap

DataStream → DataStream

接收一个元素,并生产零个、一个或多个元素。

dataStream.flatMap { str => str.split(" ") }
//2. flatMap,按逗号分字段
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String s, Collector<String> collector) throws Exception {
        String[] fields = s.split(",");
        for (String item : fields) {
            collector.collect(item);
        }
    }
});

Filter

DataStream → DataStream

对接收到的元素做一个过滤操作。

dataStream.filter { _ != 0 }
//3. filter,筛选sensor_1的数据
DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String s) throws Exception {
        return s.startsWith("sensor_1");
    }
});

KeyBy

DataStream → KeyedStream

逻辑的将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现。

dataStream.keyBy(_.someKey)
dataStream.keyBy(_._1)

Reduce

KeyedStream → DataStream

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

keyedStream.reduce { _ + _ }

Window

KeyedStream → WindowedStream

开窗是基于已经分区了的KeyedStream,开窗其实就是在指定的(时间)窗口内将数据按指定的key分组分别进行处理。

dataStream
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 

WindowAll

DataStream → AllWindowedStream

WindowAll开窗可以基于常规的DataStream,这里是对指定的(时间)窗口内所有数据进行处理。

dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

Window Apply

WindowedStream → DataStream

AllWindowedStream → DataStream

将常规函数应用在整个窗口,需要缓存当前窗口的所有数据,太耗费资源了。

// 示例:求所有元素的和
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

WindowReduce

WindowedStream → DataStream

应用聚合函数到窗口,并返回最终聚合的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});

Union

DataStream→ DataStream

对两个或两个以上的类型相同的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream,DataStream → DataStream

聚合两个DataStream到一个给定key的公共窗口。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Interval Join

KeyedStream,KeyedStream → DataStream

在指定的时间间隔内,将根据相同的key获取到的两个KeyedStream(比如e1和e2)聚合起来,并满足条件:e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});

Window CoGroup

DataStream,DataStream → DataStream

在指定的Window内,按给定的key将两个DataStream合并到一个DataStream中。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

Connect

DataStream,DataStream → ConnectedStream

连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流互相独立。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap,CoFlatMap

ConnectedStream → DataStream

作用在ConnectedStreams 上,功能与map和flatMap一样,对ConnectedStreams 中的每一个Stream分别进行map和flatMap处理。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

Iterate

DataStream → IterativeStream → ConnectedStream

在流中的数据创建一个迭代器,达到反复循环使用的目的,可以用于分流。

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

 

flink
java

关于作者

justin
123456
获得点赞
文章被阅读