概述
- 由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
- 状态就是一个本地变量,可以被任务的业务逻辑访问
- Flink会进行状态管理,包括状态一致性、故障处理以及高效存储和访问
- Flink中,状态始终与特定算子相关联
状态的分类
主要分三类:算子状态(Operator State)、广播状态(Broadcast State)、键控状态(Keyed State),其中键控状态是最常用、最重要的一种状态。
算子状态(Operator State)
- 算子状态是一种非键控状态(non-keyed state)
- 算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态
- 状态对于同一子任务而言是共享的
- 算子状态不能由相同或不同算子的另一个子任务访问
- 主要用于source/sink的实现以及没有可以划分状态的键的场景
- python中暂不支持使用此状态
广播状态(Broadcast State)
-
广播状态是一种特殊的算子状态。
-
广播状态的作用主要就是将一个流当前任务的状态可以被带入到下游所有的任务里,即下游任务可以访问上游任务的状态。
-
python中暂不支持使用此状态
键控状态(Keyed State)
- 根据输入数据流中定义的键来维护和访问,所以必须要先进行
keyBy
的操作。 - Flink为每个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态
- 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key
键控状态的数据结构
1)值状态(ValueState)
- 将状态表示为单个的值
ValueState<T> getState(ValueStateDescriptor<T>)
2)列表状态(ListState)
- 将状态表示为一组数据的列表
ListState<T> getListState(ListStateDescriptor<T>)
3)映射状态(MapState)
- 将状态表示为一组key-value对
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
4)聚合状态(ReducingState & AggregatingState)
- 将状态表示为一个用于聚合操作的列表
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
键控状态使用示例
package com.justin.state;
import com.justin.model.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyedStateTest {
public static void main(String[] args) throws Exception {
//创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从paramterTool中提取配置项
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host", "192.168.0.181");
Integer port = parameterTool.getInt("port", 7777);
//从socket中读取
DataStream<String> inputStream = env.socketTextStream(host, port);
//转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(s -> {
String[] arr = s.split(",");
return SensorReading.builder()
.id(arr[0].trim())
.timestamp(Long.parseLong(arr[1].trim()))
.temperature(Double.parseDouble(arr[2].trim()))
.build();
});
//定义一个有状态的map,统计当前sensor数据个数
DataStream<Integer> resultStream = dataStream
.keyBy(SensorReading::getId)
.map(new MyMapFunction());
resultStream.print();
//执行
env.execute();
}
private static class MyMapFunction extends RichMapFunction<SensorReading, Integer> {
private ValueState<Integer> countValueState;
@Override
public Integer map(SensorReading value) throws Exception {
Integer count = countValueState.value();
if(count == null){
count = 0;
}
count++;
//更新State中的值
countValueState.update(count);
return count;
}
@Override
public void open(Configuration parameters) throws Exception {
// 获取State对象实例
countValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("countValueState", Integer.class));
}
}
}