概述
Map
DataStream → DataStream
接收一个元素,并产生一个元素。
dataStream.map { x => x * 2 }
//1. map,把String转换成对应长度输出
DataStream<Integer> mapStream = dataStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String s) throws Exception {
return s.length();
}
});
FlatMap
DataStream → DataStream
接收一个元素,并生产零个、一个或多个元素。
dataStream.flatMap { str => str.split(" ") }
//2. flatMap,按逗号分字段
DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] fields = s.split(",");
for (String item : fields) {
collector.collect(item);
}
}
});
Filter
DataStream → DataStream
对接收到的元素做一个过滤操作。
dataStream.filter { _ != 0 }
//3. filter,筛选sensor_1的数据
DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("sensor_1");
}
});
KeyBy
DataStream → KeyedStream
逻辑的将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现。
dataStream.keyBy(_.someKey)
dataStream.keyBy(_._1)
Reduce
KeyedStream → DataStream
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
keyedStream.reduce { _ + _ }
Window
KeyedStream → WindowedStream
开窗是基于已经分区了的KeyedStream
,开窗其实就是在指定的(时间)窗口内将数据按指定的key分组分别进行处理。
dataStream
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
WindowAll
DataStream → AllWindowedStream
WindowAll
开窗可以基于常规的DataStream
,这里是对指定的(时间)窗口内所有数据进行处理。
dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将常规函数应用在整个窗口,需要缓存当前窗口的所有数据,太耗费资源了。
// 示例:求所有元素的和
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
WindowReduce
WindowedStream → DataStream
应用聚合函数到窗口,并返回最终聚合的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
Union
DataStream→ DataStream
对两个或两个以上的类型相同的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
dataStream.union(otherStream1, otherStream2, ...);
Window Join
DataStream,DataStream → DataStream
聚合两个DataStream到一个给定key的公共窗口。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
Interval Join
KeyedStream,KeyedStream → DataStream
在指定的时间间隔内,将根据相同的key获取到的两个KeyedStream(比如e1和e2)聚合起来,并满足条件:e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
Window CoGroup
DataStream,DataStream → DataStream
在指定的Window内,按给定的key将两个DataStream合并到一个DataStream中。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
Connect
DataStream,DataStream → ConnectedStream
连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流互相独立。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
CoMap,CoFlatMap
ConnectedStream → DataStream
作用在ConnectedStreams 上,功能与map和flatMap一样,对ConnectedStreams 中的每一个Stream分别进行map和flatMap处理。
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
Iterate
DataStream → IterativeStream → ConnectedStream
在流中的数据创建一个迭代器,达到反复循环使用的目的,可以用于分流。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});