Flink状态(一)
扫描二维码随身看资讯
使用手机 二维码应用 扫描右侧二维码,您可以
1. 在手机上细细品读~
2. 分享给您的微信好友或朋友圈~
key状态和算子状态
key状态
key状态总是与key有关,只能被用于keyedStream类型的函数与算子。你可以认为key状态是一种被分区的算子状态,每一个key有一个状态分区。每一个key状态逻辑上由<parellel-operator-instance, key>唯一确定,由于每一个key只分布在key算子的多个并发实例中的一个实例上,我们可以将<parellel-operator-instance, key>简化为<operator, key>.
算子(operator)状态
算子状态也称为非key状态。每一个算子状态绑定一个并发算子实例。
kafka connector
是flink算子状态比较好的应用范例。每一个
kafka consumer
并发实例都维护一个Topic分区和分区对应的offset的map,并将此map作为算子状态。
当并发数改变的时候,算子状态支持在并发实例间重新分配状态。有多种不同的重分配策略。
原始的和被管理的状态
key状态和算子状态以两种形式存在:被管理的和原始的。
被管理的状态由flink runtime管理,以一种数据结构表示,比如内部hash表或者RocksDB,例如:
ValueState
,
ListState
等等。flink运行时对状态进行编码,然后写入checkpoints.
原始状态是算子保存到它们自己定义的数据结构中的一种状态。当checkpoint发生的时候,flink仅仅将二进制写入到checkpoint中,它不知道状态的数据结构,仅仅能看见原始的二进制字节数据。
所有的stream数据流Function可以使用被管理的状态,但是当实现算子接口的时候,仅仅能使用原始状态接口。推荐使用被管理的状态而不是原始状态,因为使用被管理状态,当并发度变化的时候,flink能够自动重新分配状态,而且也能够更好地管理内存。
注意: 如果你需要自定义被管理状态的序列化逻辑,为了确保特性兼容,请看 相应的说明 。Flink默认的序列化不需要特别的处理。
使用被管理的Key状态
被管理的Key状态接口能够处理不同类型的状态,包括现有所有输入元素的key。这意味着这类状态仅仅能被用于KeyedStream上。KeyedStream能够通过stream.keyBy(...)创建。
现在,我们首先看一下当前所有的不同类型状态接口,然后我们看一下如何在程序中使用。状态接口类型如下:
- ValueState<T>: 保存一个值。这个值可以被更新或获取(涉及到上面提到的输入元素的key, 每个key中都可能对应一个值)。这个值可以通过update(T)更新,通过T value()获取。
-
ListState<T>:
保存元素列表。可以添加元素和获取当前存储的所有元素
Iterable
对象。使用add(T)或addAll(List<T>)方法添加元素。使用Iterable<T> get()
方法获取iterable对象。也可以通过update(List<T>)
方法覆盖现有的列表。 -
ReducingState<T>:
保存一个值,这个值是添加到状态中所有值的聚合结果。这个接口与
ListState
相似,但是通过add(T)
方法添加的元素会通过指定的ReduceFunction
聚合起来。 -
AggregatingState<IN,OUT>:
保存一个值,这个值是添加到状态的所有值的聚合结果。与
ReducingState
相比,聚合后的数据类型也许与添加进状态的元素类型不同。这个接口与ListState
相同,但是通过add(IN)
添加的元素使用指定的AggregateFunction
对象聚合。 -
FoldingState<T,ACC>:
保存一个值,这个值是添加到状态的所有值的聚合结果。与
ReducingState
相比,聚合结果的类型可能与添加到状态中的元素类型不一样。这个接口与ListState
相似,但是通过add(T)
添加的元素通过指定的FoldFunction
聚合 -
MapState<UK,UV>:
保存一个map对象。你可以将kv键值对放入状态中,也可以获取当前存储的键值列表一个
Iterable
对象。使用put(UK, UK)
或putAll(Map\<UK,UV\>)
方法添加键值对。与key对应的value可以通过get(UK)
获取。map中kv关系,key,value数据分别可以通过entries()
,keys()
和values()
方法获取。
所有类型的state状态接口都有一个clear()方法,能够清除当有输入元素key的状态。
注意:
FoldingState
和
FoldingStateDescriptor
已经在flink1.4中废弃了,将来会完全移除。请用
AggregatingState
和
AggregatingStateDescriptor
代替。
我们要记住两件重要的事,第一件事是上面这些state接口类型仅仅用于与状态交互。状态不一定必须要保存到flink内部,也可以保存到硬盘或其它地方。第二件事是你获取到的状态的值依赖输入元素的key值,所以在同一个user function中如果两次输入流中的key值不一样的话,value也不一样。
为了获得一个状态处理类,你必须要创建一个
StateDescriptor
对象。它里面保存着状态的名字(后面我们会看到,你可以创建多个状态,他们必须有不同的名字,以便你可以根据名字获取状态),状态存储值的类型和一个用户自定义的function,例如一个ReduceFunction。根据你想要存储状态的类型不同,你可以创建
ValueStateDescriptor
,
ListStateDescrptor
,
ReducingStateDescriptor
,
FoldingStateDescriptor
或
MapStateDescriptor
对象。
状态可以通过
RuntimeContext
获取,它只能通过富函数(rich function)获取。请看
这里
详细了解。但是我们也看一个简短的例子。
RichFunction
中的
RuntimeContext
对象有如下方法可以获取状态。
-
ValueState
getState(ValueStateDescriptor ) -
ReducingState
getReducingState(ReducingStateDescriptor ) -
ListState
getListState(ListStateDescriptor ) - AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面举一个
FlatMapFunction
的例子说明所有部分如何配合的。
public class CountWindoWaverage extends RichFlatMapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>> {
private transient ValueState<Tuple2<Long,Long>> sum;
@Override
public void flatMap(Tuple2<Long,Long> input, Collector<Tuple2<Long,Long>> out) throws Exception {
// 获取状态值
Tuple2<Long,Long> currentSum = sum.value();
// 更新数量
currentSum.f0 += 1;
// 将输入数据累加到第2个字段上
currentSum.f1 += input.f1;
// 更新状态
sum.update(currentSum);
// 如果数量达到2个,计算平均值,发送到下游,并清空状态
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long,Long>> descriptor =
new ValueStateDescriptor<>(
"average", // 状态名称
TypeInformation.of(new TypeHint<Tuple2<Long,Long>> () {}), //类型信息
Tuple2.of(0L,0L)); // 状态默认值
sum = getRuntimeContext().getState(descriptor);
}
}
// 可以在流处理程序中像这样使用(假设我们有一个StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// 将打印输出(1,4)和(1,5)
这个例子实现了一个贫血的计数窗口。我们以tuple的第一个字段做为key来分类(这个例子中所有key都为1)。CountWindowAverage类成员变量
ValueState
中存储着实时计算的数量和累加和。一量数量达到2个,它将计算平均值,发送到下游并清空状态,从0开始。需要注意的是,对于不同输入的key(输入元素Tuple的第一个元素值不同),
ValueState
将保存不同的值。
状态存活时间(TTL)
TTL可以被分配给任何类型的key状态。如果key状态设置了TTL,并且状态过期了,状态中存储的值将被清空。后面将详细说明。
所有状态集合的TTL是设置在每个元素上的。这意味着列表和map中每个元素元素过都是过期处理逻辑都是独立的,互不影响
为了使用TTL,首先要创建一个
StateTtlConfig
对象。通过给TTL函数传递这个状态配置对象激活TTL。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
配置状态需要考虑以下几个方面:
newBuilder
方法的第一个参数是必须的,它是TTL过期时间。
状态的TTL时间戳需要被刷新,还需要配置更新类型,表示在什么情况下刷新,默认是
OnCreateAndWrite
:
-
StateTtlConfig.UpdateType.OnCreateAndWrite - 仅仅当创建和写入时刷新TTL
-
StateTtlConfig.UpdateType.OnReadAndWrite - 读和写时刷新TTL
状态可见性配置当读取的时候如果过期的值没有被清除的话,是否返回,默认是NeverReturnExpired
: -
StateTtlConfig.StateVisibility.NeverReturnExpired - 从不返回过期的值
-
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果过期的值仍然可以获得则返回
如果设置成
NeverReturnExpired
,过期的状态数据即使没有被清除也获取不到,就好像不存在一样。这个选项在数据过期后必须不可用的情况下是有用的,例如对隐私数据敏感的应用。
另一个选项是
ReturnExpiredIfNotCleanedUp
,如果过期的状态值没有被清除的话,允许返回。
注意:
- 状态存储器除了存储状态值还会存储数据最后一次被修改的时间戳,意味着如果启用TTL这个特性会增加状态存储的开销。堆状态存储器会在内存中存储一个引用用户状态数据的一个java对象,还有一个原始的long类型。RocksDB状态存储器会给每一个值,列表或map中的每个元素都增加8个字节的存储开销。
- 当前仅支持处理时间(processing time)的TTL,不支持事件(event time)的TTL.
-
没有配置TTL,却启动TTL或反之,都会导致兼容失败和
StateMigrationException
。 - TTL配置不是checkpoint或savepoint的一部分,而是flink处理当前正运行Job的一种方式
-
设置TTL的map状态如果想支持null值,仅当状态值序列化器能够处理null值的时候。如果序列化器不支持null值 ,可以使用
NullableSerializer
包装类,但这将多消耗一个字节的存储空间。
过期状态数据的清除
默认情况下,过期的状态数据仅当显示的读取的时候才会被清除,例如调用
ValueState.value()
的时间。
注意: 这意味着如果过期状态数据没有被读取,它将不会被清除,可能导致状态数据的不断增长。在后来的flink版本中可能会改变。
在完全快照时清除
另外,你可以在执行状态完全快照时清除过期的状态值,这将减小快照的大小。在当前flink实现中,本地状态不会被清除,但是当从上一个快照中恢复的时候,不会包括过期的状态。可以在
StateTtlConfig
中配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
这个选项不适合于以RocksDB存储状态数据的递增checkpoint.
注意:
-
对于已经存在的job,清除策略可以在任意时间在
StateTtlConfig
中激活或关闭激活,例如:从savepoint重启后
状态存储后端清除
除了在完全快照中清除,你还可以在后端清除。如果状态后端存储支持,下面选项可以激活默认的后端清除策略。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInBackground()
.build();
对于更详细的控制某些特别的后端清除策略,你可以按照下面描述的单独配置。当前,堆状态存储依赖递增的清除,RocksDB则使用压缩过滤器。
递增的清除
另一个选项是触发状态的递增清除策略。触发可以是每个状态的获取或每条记录处理时的回调。如果某些状态激活了清除策略,后端的存储将持有一个全局的针对所有元素的惰性迭代器。每次递增清除策略被触发时,迭代器就向前进,会检查经过的元素,过期的状态数据将被清除。
这个特性可以在
StateTtlConfig
激活
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally()
.build();
这个策略有两个参数。第一个参数是每次清除触发时检查的元素个数。如果启用,当获取每一个状态数据时都会触发。第二个参数配置每条记录处理时,是否也触发清除。如果你启用默认的后端清除策略,对于堆存储来说,这个策略将在每次状态数据获取时被触发并检查5个元素,并且每条记录被处理时不会触发清除。
注意:
- 如果没有获取状态数据或者没有处理任何记录,过期的状态数据将一直保存
- 花费在递增清除上的时间增加会记录处理的时间
- 当前递增清除仅仅适用于堆存储,为RocksDB设置递增清除没有作用
- 如果堆存储使用同步快照,那么全局的迭代器在迭代时将保存所有key的副本。因为flink目前的实现不支持对状态的并发修改。启用这个特性将增加内存消耗。异步快照没有这个问题
-
对于已经存在的job,清除策略可以在任意时间在
StateTtlConfig
中激活或关闭激活,例如当从savepoint重启的时候。
在RocksDB压缩过程中清除
如果使用RocksDB存储状态,另一个清除策略是激活flink压缩过滤器。RocksDB周期性的执行异步压缩来合并状态更新和减小存储。flink压缩过滤器检查带TTL元素的时间戳,排除掉过期的状态数据。
这个特性默认没有启用。可以通过flink配置文件配置,将
state.backend.rocksdb.ttl.compaction.filter.enabled
设置为true,或者当为某个Job创建自定义RocksDB状态存储时调用
RocksDBStateBackend::enableTtlCompactionFilter
设置。这样设置了TTL的状态将会使用过滤器。
import org.apache.flink.api.common.state.StateTtlConfig
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();
当处理一定数量的状态数据后,RocksDB压缩过滤器将查询当前时间戳,检查有没有过期。你可以改变它,传递一个自定义值给
StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
方法. 更新时间戳越频繁,清除的速度越快,但却降低了压缩性能。因为需要调用JNI本地方法. 如果你启用默认的清除策略(这个策略适用于RocksDB状态存储),在每处理1000个元素后都查询一次当前的时间戳。
如果想为RocksDB过滤器开启本地方法的debug级别日志,可以为
FlinkCompactionFilter
设置日志级别为Debug.
log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
- 在压缩过程中调用TTL过滤器会减慢flink处理速度。TTL过滤器必须在key正在被压缩的过程中,刷新key对应的每一个value元素的时间戳,并检查是否过期。对于集合类型例如list或map,也将检查其中的每一个元素。
- 如果这个特性用于列表状态,列表的长度不固定。 TTL过滤器必须对每一个元素另外通过JNI调用Java类型的序列化器,由第一个过期的元素决定下一个没过期元素的偏移量。
-
对于已经存在的job,清除策略可以在任意时间在
StateTtlConfig
中激活或关闭激活,例如当从savepoint重启时。
Scala DataStream API中的状态
除了上面讲到的接口, Scala API对于
KeyStream
中使用
ValueState
存储单个状态值的map()或flatmap()函数有更简短的写法。user function从Option对象中得到
ValueState
当前状态值,并返回一个待更新的值。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
使用被管理的算子状态
为了使用被管理的算子状态,必须实现通用的CheckpointedFunction接口,或者实现ListCheckpointed
CheckpointedFunction
CheckpointedFunction接口可以让我们保存不同数据结构的非key对象的状态。我们如果要使用,必须实现以下两个方法:
void snapshotState(FunctionSnapshotContext conetxt) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
每当checkpoint执行的时候都会调用
snapshotState()
方法,而另一个方法,
initializeState()
,则是用户自定义的功能初始化的时候调用,包括首次初始化或者从之前的checkpoint恢复的时候。鉴于此,
initializeState()
不仅包含不同的需要保存状态的数据初始化的逻辑,还需要包含恢复状态的逻辑。
当前,支持列表类型的被管理的算子状态,状态应该是由互相独立的可序列化的对象组成的列表,当伸缩的时候需要重新分配。换言之,这些对象是非key状态分配的最小粒度。根据状态获取方式的不同,定义了以下分配策略:
- 平均分配(Even-split redistribution): 每一个算子返回一个状态集合,整个状态逻辑上是所有列表集合的并集。当恢复或重新分配时,根据并发数平均分配成多个子集,每一个算子获取一个子集合,可能是空集合,也可能包含一个或多个元素。例如,并发度1的时候,一个算子的checkpoint状态包含元素1和元素2,当并发度增加到2的时候,元素1可能被分配到算子0,元素2被分配到算子1.
- 合并分配(union redistribution): 每一个算子返回一个状态的集合,整个状态逻辑上是所有这些列表集合的并集,当恢复或重新分配时,每一个算子都将分配到整个状态的列表集合。
下面是一个带状态的SinkFunction示例,使用平均分配策略,功能是在将一些元素发送给外部系统之前,使用CheckpointedFunction缓存这些元素.
public class BufferingSink
implements SinkFunction <Tuple2<String,Integer>>, CheckpointedFunction {
private final int thresHOLD;
private transient ListState<Tuple2<String, Integer>> checkpoinedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
bufferedElement.add(value);
if (bufferedElements.Size() == threshold) {
for (Tuple2<String, Integer> element : bufferedElements) {
// send it to the sink
}
bufferedElement.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if(context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedStage.get()) {
bufferedElements.add(element);
}
}
}
}
initializeState
方法接收一个
FunctionInitializationContext
参数。这个参数用于初始化非key的状态"容器"。有一个ListState类型的状态容器,当checkpointing发生的时候,非key的状态会存储在ListState对象中。
注意一下状态容器是如何初始化的,和key状态相似,都需要一个
StateDescriptor
来定义状态名和保存状态的数据类型信息。
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
获取状态方法名称的不同代表了不同的分配策略。例如,如果想在恢复的时候使用合并分配策略,获取状态时,使用
getUnionListState(descriptor)
方法。如果方法名不包含分配策略名称,例如:
getListState(descriptor)
,就默认表示将会使用平均分配策略。
在初始化状态容器之后,我们使用
isRestored()
方法来判断当前是否是失败后的恢复,如果是,将执行恢复逻辑。
我们再来看看类
BufferingSink
的代码,
ListState
是成员变量,在
initializeState
方法初始化,以便在
snapshotState
方法中使用。在
snapshotState
方法中,
ListState
首先清除上一次checkpont的所有对象,然后保存这一次需要checkpoint的对象。
顺便提一下,key状态也能使用
initializeState
方法初始化,可以使用
FunctionInitializationContext
对象实现。
ListCheckpointed
ListCheckpointed
是
CheckpointedFunction
的一个变体,有更多的限制条件,仅支持list类型的状态存储,并且只能是平均分配。包含以下两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
在
snapshotState
方法中,算子应该返回需要保存的list对象,当恢复的时候,在
restoreState
方法中编写恢复list数据的逻辑。 如果状态不需要重新分区,可以在snapshotState方法中返回
Collections.singletonList(MY_STATE)
对象。
Stateful Source函数
相对于其它算子,Stateful source算子有一点特别。为了使更新状态和输出状态原子化(失败/恢复的恰好一次语义要求),必须在source算子的上下文中使用锁。
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
/** 恰好一次语义使用的偏移量 */
private Long offset = 0L;
/**job是否取消的标识*/
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// 输出和更新状态是原子化的
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
}
当flink checkpoint完全确认的时候,一些算子也许需要与外部系统交换一些信息,这种情况看下
org.apache.flink.runtime.state.CheckpointListener
接口。
畸形动物园 官方免费版
道途漫漫无限仙玉灵石版下载 v1.0.0 安卓版
战地战争模拟器破解内置菜单下载 v1.0 安卓版
天生会画
双子洛丽塔 安卓手机版
藏语播报输入法
剑侠传奇华为版下载 v2.6 安卓版
Manage Supermarket Simulator
课堂派最新版
远征之门0.1折扣版下载 v1.3.0 安卓版
深宫曲金手指无敌版下载 v0.56 安卓版
如果的世界0.1折4S宠物全免版下载 v1.0.0 安卓版
最佳球会魅族下载 v2.2.115 安卓版
FIFA Mobile2024蓝色封面国际服版本下载 v22.0.01 安卓版
- 深入了解Dubbo框架的运行流程和核心组件
- 海量数据处理利器 Roaring BitMap 原理介绍
- 云计算资源弹性伸缩技术详解
- 使用.NET中的System.IO.Compression进行文件和文件夹压缩的方法
- Flutter 借助SearchDelegate实现搜索页面,实现搜索建议、搜索结果,解决IOS拼音问题
- Flash驱动控制--芯片擦除(SPI协议)
- Canvas在前端开发中的重要性和应用
- Java IO流中的数据流分类
- 自定义实现H5页面下拉刷新功能
- 高级前端开发需要知道的 25 个 JavaScript 单行代码
- MyBatis 的缓存机制
- 解决Docker镜像下载问题的Java工具
- 1
加查之花 正版
- 2
爪女孩 最新版
- 3
企鹅岛 官方正版中文版
- 4
捕鱼大世界 无限金币版
- 5
球球英雄 手游
- 6
情商天花板 2024最新版
- 7
内蒙打大a真人版
- 8
烦人的村民 手机版
- 9
跳跃之王手游
- 10
蛋仔派对 国服版本