Flink 解析 | Flink 原始碼:廣播流狀態原始碼解析

語言: CN / TW / HK

Broadcast State 是 Operator State 的一種特殊型別。它的引入是為了支援這樣的場景: 一個流的記錄需要廣播到所有下游任務,在這些用例中,它們用於在所有子任務中維護相同的狀態。然後可以在處理第二個流的資料時訪問這個廣播狀態,廣播狀態有自己的一些特性。

  1. 必須定義為一個 Map 結構。

  2. 廣播狀態只能在廣播流側修改,非廣播側不能修改狀態。

  3. Broadcast State 執行時的狀態只能儲存在記憶體中。

看到這相信你肯定會有下面的疑問:

  • 廣播狀態為什麼必須定義為 Map 結構,我用其他的狀態型別不行嗎?

  • 廣播狀態為什麼只能在廣播側修改,非廣播側為什麼不能修改呢?

  • 廣播狀態為什麼只能儲存在記憶體中,難道不能用 Rockdb 狀態後端嗎?

下面就帶著這三個疑問通過閱讀相關原始碼,回答上面的問題。

broadcast 原始碼

/**
* Sets the partitioning of the {@link DataStream} so that the output elements are broadcasted
* to every parallel instance of the next operation. In addition, it implicitly as many {@link
* org.apache.flink.api.common.state.BroadcastState broadcast states} as the specified
* descriptors which can be used to store the element of the stream.
*
* @param broadcastStateDescriptors the descriptors of the broadcast states to create.
* @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)}
* to create a {@link BroadcastConnectedStream} for further processing of the elements.
*/

@PublicEvolving
public BroadcastStream<T> broadcast(
final MapStateDescriptor<?, ?>... broadcastStateDescriptors)
{
Preconditions.checkNotNull(broadcastStateDescriptors);
final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}

可以發現 broadcast 方法需要的引數是 MapStateDescriptor 也就是一個 Map 結構的狀態描述符,我們在使用的時候就必須定義為 MapStateDescriptor,否則會直接報錯,其實主要是因為廣播狀態的作用是和非廣播流進行關聯,你可以想象成雙流 join 的場景,那麼 join 的時候就必須要有一個主鍵,也就是相同的 key 才能 join 上,所以 Map(key-value) 結構是最適合這種場景的,key 可以儲存要關聯欄位,value 可以是任意型別的廣播資料,在關聯的時候只需要獲取到廣播狀態,然後 state.get(key) 就可以很容易拿到廣播資料。

process 原始碼

@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function)
{
// 獲取輸出資料的型別資訊
TypeInformation<OUT> outTypeInfo =
TypeExtractor.getBinaryOperatorReturnType(
function,
KeyedBroadcastProcessFunction.class,
1,
2,
3,
TypeExtractor.NO_INDEX,
getType1(),
getType2(),
Utils.getCallLocationName(),
true)
;

return process(function, outTypeInfo);
}

process 方法需要的引數是 KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT>,跟普通的 KeyedProcessFunction<K, I, O> 相比,很容易發現多了一個泛型引數,因為這裡的 process 上游連線的是兩個資料流,所以需要兩個型別。然後呼叫 process 的過載方法。

process 原始碼

@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function,
final TypeInformation<OUT> outTypeInfo)
{

Preconditions.checkNotNull(function);
Preconditions.checkArgument(
nonBroadcastStream instanceof KeyedStream,
"A KeyedBroadcastProcessFunction can only be used on a keyed stream.");

return transform(function, outTypeInfo);
}

這個 process 方法裡面什麼都沒幹,直接呼叫 transform 方法。

transform 原始碼

@Internal
private <KEY, OUT> SingleOutputStreamOperator<OUT> transform(
final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> userFunction,
final TypeInformation<OUT> outTypeInfo)
{

// read the output type of the input Transforms to coax out errors about MissingTypeInfo
nonBroadcastStream.getType();
broadcastStream.getType();

KeyedStream<IN1, KEY> keyedInputStream = (KeyedStream<IN1, KEY>) nonBroadcastStream;
// 構造 KeyedBroadcastStateTransformation
final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation =
new KeyedBroadcastStateTransformation<>(
"Co-Process-Broadcast-Keyed",
nonBroadcastStream.getTransformation(),
broadcastStream.getTransformation(),
clean(userFunction),
broadcastStateDescriptors,
keyedInputStream.getKeyType(),
keyedInputStream.getKeySelector(),
outTypeInfo,
environment.getParallelism());

@SuppressWarnings({"unchecked", "rawtypes"})
final SingleOutputStreamOperator<OUT> returnStream =
new SingleOutputStreamOperator(environment, transformation);
// 新增到 List<Transformation<?>> 集合
getExecutionEnvironment().addOperator(transformation);
return returnStream;
}

transform 方法裡面主要做了兩件事:

  1. 先是構造對應的 KeyedBroadcastStateTransformation 物件,其實  KeyedBroadcastStateTransformation 也是 Transformation 的一個子類。

  2. 然後把構造好的 transformation 新增到 List<Transformation<?>> 集合裡,後面在構建 StreamGraph 的時候會從這個集合裡獲取 Transformation。

getStreamGraph 原始碼

@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}

getStreamGraph 的主要作用就是生成 StreamGraph。下面就會用到上一步生成的 List<Transformation<?>> 集合,因為這篇文章主要是分析 Flink 廣播流的原始碼,所以只會對廣播流相關的原始碼進行解析。

getStreamGraphGenerator 原始碼

private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
if (transformations.size() <= 0) {
throw new IllegalStateException(
"No operators defined in streaming topology. Cannot execute.");
}

// We copy the transformation so that newly added transformations cannot intervene with the
// stream graph generation.
return new StreamGraphGenerator(
new ArrayList<>(transformations), config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
}

getStreamGraphGenerator 方法主要就是構造 StreamGraphGenerator 物件,StreamGraphGenerator 構造完成後,就可以呼叫 generate 方法來產生 StreamGraph 了,在看 generate 方法之前先來看一下 StreamGraphGenerator 的靜態程式碼塊。

StreamGraphGenerator 原始碼

static {
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(
TimestampsAndWatermarksTransformation.class,
new TimestampsAndWatermarksTransformationTranslator<>())
;
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(
KeyedBroadcastStateTransformation.class,
new KeyedBroadcastStateTransformationTranslator<>())
;
translatorMap = Collections.unmodifiableMap(tmp);
}

在初始化 StreamGraphGenerator 之前,會先執行其靜態程式碼塊生成一個 Transformation -> TransformationTranslator 對映關係的 Map 集合,後面會用到這個 Map。

transform 原始碼

// 根據 Transformation 獲取對應的 TransformationTranslator 
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>)
translatorMap.get(transform.getClass());

Collection<Integer> transformedIds;
if (translator != null) {

transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}

構造完 StreamGraphGenerator 物件後,緊接著會呼叫 generate 方法,然後又呼叫了 transform 方法,這裡會從上面生成的 Map 裡面獲取到對應的 TransformationTranslator,然後呼叫 translate 方法。

translate#translateForStreaming#translateForStreamingInternal 原始碼

@Override
protected Collection<Integer> translateForStreamingInternal(
final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation,
final Context context)
{
checkNotNull(transformation);
checkNotNull(context);
// 構建 CoBroadcastWithKeyedOperator
CoBroadcastWithKeyedOperator<KEY, IN1, IN2, OUT> operator =
new CoBroadcastWithKeyedOperator<>(
transformation.getUserFunction(),
transformation.getBroadcastStateDescriptors());

return translateInternal(
transformation,
transformation.getRegularInput(),
transformation.getBroadcastInput(),
SimpleOperatorFactory.of(operator),
transformation.getStateKeyType(),
transformation.getKeySelector(),
null /* no key selector on broadcast input */,
context);
}

translate 方法最終會呼叫到 KeyedBroadcastStateTransformationTranslator 的 translateForStreamingInternal 方法中,根據 UserFunction(使用者程式碼)和 broadcastStateDescriptors(廣播狀態描述符)構造CoBroadcastWithKeyedOperator 物件。

CoBroadcastWithKeyedOperator 原始碼

/**
* A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction
* KeyedBroadcastProcessFunctions}.
*
* @param <KS> The key type of the input keyed stream.
* @param <IN1> The input type of the keyed (non-broadcast) side.
* @param <IN2> The input type of the broadcast side.
* @param <OUT> The output type of the operator.
*/

@Internal
public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<KS, VoidNamespace>
{

private static final long serialVersionUID = 5926499536290284870L;

private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;

private transient TimestampedCollector<OUT> collector;

private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;

private transient ReadWriteContextImpl rwContext;

private transient ReadOnlyContextImpl rContext;

private transient OnTimerContextImpl onTimerContext;

public CoBroadcastWithKeyedOperator(
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors)
{
super(function);
this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors);
}

@Override
public void open() throws Exception {
super.open();

InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

TimerService timerService = new SimpleTimerService(internalTimerService);

collector = new TimestampedCollector<>(output);

this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
for (MapStateDescriptor<?, ?> descriptor : broadcastStateDescriptors) {
broadcastStates.put(
descriptor,
// 初始化狀態實現例項
getOperatorStateBackend().getBroadcastState(descriptor));
}

rwContext =
new ReadWriteContextImpl(
getExecutionConfig(),
getKeyedStateBackend(),
userFunction,
broadcastStates,
timerService);
rContext =
new ReadOnlyContextImpl(
getExecutionConfig(), userFunction, broadcastStates, timerService);
onTimerContext =
new OnTimerContextImpl(
getExecutionConfig(), userFunction, broadcastStates, timerService);
}

@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
rContext.setElement(element);
userFunction.processElement(element.getValue(), rContext, collector);
rContext.setElement(null);
}

@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
rwContext.setElement(element);
userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
rwContext.setElement(null);
}

private class ReadWriteContextImpl
extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context
{

private final ExecutionConfig config;

private final KeyedStateBackend<KS> keyedStateBackend;

private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;

private final TimerService timerService;

private StreamRecord<IN2> element;

ReadWriteContextImpl(
final ExecutionConfig executionConfig,
final KeyedStateBackend<KS> keyedStateBackend,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {

function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}

void setElement(StreamRecord<IN2> e) {
this.element = e;
}

@Override
public Long timestamp() {
checkState(element != null);
return element.getTimestamp();
}

@Override
public <K, V> BroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor)
{
Preconditions.checkNotNull(stateDescriptor);

stateDescriptor.initializeSerializerUnlessSet(config);
BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException(
"The requested state does not exist. "
+ "Check for typos in your state descriptor, or specify the state descriptor "
+ "in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
checkArgument(outputTag != null, "OutputTag must not be null.");
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}

@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}

@Override
public long currentWatermark() {
return timerService.currentWatermark();
}

@Override
public <VS, S extends State> void applyToKeyedState(
final StateDescriptor<S, VS> stateDescriptor,
final KeyedStateFunction<KS, S> function)

throws Exception
{

keyedStateBackend.applyToAllKeys(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
Preconditions.checkNotNull(stateDescriptor),
Preconditions.checkNotNull(function));
}
}

private class ReadOnlyContextImpl extends ReadOnlyContext {

private final ExecutionConfig config;

private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;

private final TimerService timerService;

private StreamRecord<IN1> element;

ReadOnlyContextImpl(
final ExecutionConfig executionConfig,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {

function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}

void setElement(StreamRecord<IN1> e) {
this.element = e;
}

@Override
public Long timestamp() {
checkState(element != null);
return element.hasTimestamp() ? element.getTimestamp() : null;
}

@Override
public TimerService timerService() {
return timerService;
}

@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}

@Override
public long currentWatermark() {
return timerService.currentWatermark();
}

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
checkArgument(outputTag != null, "OutputTag must not be null.");
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}

@Override
public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor)
{
Preconditions.checkNotNull(stateDescriptor);

stateDescriptor.initializeSerializerUnlessSet(config);
ReadOnlyBroadcastState<K, V> state =
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException(
"The requested state does not exist. "
+ "Check for typos in your state descriptor, or specify the state descriptor "
+ "in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}

@Override
@SuppressWarnings("unchecked")
public KS getCurrentKey() {
return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();
}
}

private class OnTimerContextImpl
extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext
{

private final ExecutionConfig config;

private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;

private final TimerService timerService;

private TimeDomain timeDomain;

private InternalTimer<KS, VoidNamespace> timer;

OnTimerContextImpl(
final ExecutionConfig executionConfig,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {

function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}

@Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
}

@Override
public TimeDomain timeDomain() {
checkState(timeDomain != null);
return timeDomain;
}

@Override
public KS getCurrentKey() {
return timer.getKey();
}

@Override
public TimerService timerService() {
return timerService;
}

@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}

@Override
public long currentWatermark() {
return timerService.currentWatermark();
}

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
checkArgument(outputTag != null, "OutputTag must not be null.");
output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
}

@Override
public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor)
{
Preconditions.checkNotNull(stateDescriptor);

stateDescriptor.initializeSerializerUnlessSet(config);
ReadOnlyBroadcastState<K, V> state =
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException(
"The requested state does not exist. "
+ "Check for typos in your state descriptor, or specify the state descriptor "
+ "in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}
}
}

在分析 CoBroadcastWithKeyedOperator 原始碼之前,先來看一下 CoBroadcastWithKeyedOperator 的 UML 圖。

CoBroadcastWithKeyedOperator UML 圖

CoBroadcastWithKeyedOperator

可以看到 CoBroadcastWithKeyedOperator 實現了 TwoInputStreamOperator 這個介面,從命名上就能知道,這是一個具有兩個輸入流的 StreamOperator 介面,因為 CoBroadcastWithKeyedOperator 的上游連線的是兩個資料流,所以就實現了這個介面,下面再來看一下 TwoInputStreamOperator 的原始碼。

TwoInputStreamOperator 原始碼

/**
* Interface for stream operators with two inputs. Use {@link
* org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if you want to
* implement a custom operator.
*
* @param <IN1> The input type of the operator
* @param <IN2> The input type of the operator
* @param <OUT> The output type of the operator
*/

@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

/**
* Processes one element that arrived on the first input of this two-input operator. This method
* is guaranteed to not be called concurrently with other methods of the operator.
*/

void processElement1(StreamRecord<IN1> element) throws Exception;

/**
* Processes one element that arrived on the second input of this two-input operator. This
* method is guaranteed to not be called concurrently with other methods of the operator.
*/

void processElement2(StreamRecord<IN2> element) throws Exception;

}

TwoInputStreamOperator 接口裡面定義了兩個方法,其中 processElement1 是用來處理非廣播流的資料,processElement2 是用來處理廣播流的資料。

接著回到 CoBroadcastWithKeyedOperator 的 open 方法,首先會初始化 broadcastStates,用來儲存 MapStateDescriptor -> BroadcastState 的對映關係,然後初始化 ReadWriteContextImpl 和 ReadOnlyContextImpl 物件,顧名思義 ReadWriteContextImpl 是既可以讀也可以寫狀態,ReadOnlyContextImpl  是隻能讀狀態,不能寫狀態,在 open 方法裡面還有一個重要的事情,就是初始化廣播狀態的實現類。

getBroadcastState 原始碼

public <K, V> BroadcastState<K, V> getBroadcastState(
final MapStateDescriptor<K, V> stateDescriptor)
throws StateMigrationException
{

Preconditions.checkNotNull(stateDescriptor);
String name = Preconditions.checkNotNull(stateDescriptor.getName());

BackendWritableBroadcastState<K, V> previous =
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);

if (previous != null) {
checkStateNameAndMode(
previous.getStateMetaInfo().getName(),
name,
previous.getStateMetaInfo().getAssignmentMode(),
OperatorStateHandle.Mode.BROADCAST);
return previous;
}

stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
TypeSerializer<K> broadcastStateKeySerializer =
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
TypeSerializer<V> broadcastStateValueSerializer =
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());

BackendWritableBroadcastState<K, V> broadcastState =
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);

if (broadcastState == null) {
broadcastState =
new HeapBroadcastState<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
broadcastStateValueSerializer));
registeredBroadcastStates.put(name, broadcastState);
} else {
// has restored state; check compatibility of new state access

checkStateNameAndMode(
broadcastState.getStateMetaInfo().getName(),
name,
broadcastState.getStateMetaInfo().getAssignmentMode(),
OperatorStateHandle.Mode.BROADCAST);

RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo =
broadcastState.getStateMetaInfo();

// check whether new serializers are incompatible
TypeSerializerSchemaCompatibility<K> keyCompatibility =
restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
if (keyCompatibility.isIncompatible()) {
throw new StateMigrationException(
"The new key typeSerializer for broadcast state must not be incompatible.");
}

TypeSerializerSchemaCompatibility<V> valueCompatibility =
restoredBroadcastStateMetaInfo.updateValueSerializer(
broadcastStateValueSerializer);
if (valueCompatibility.isIncompatible()) {
throw new StateMigrationException(
"The new value typeSerializer for broadcast state must not be incompatible.");
}

broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
}

accessedBroadcastStatesByName.put(name, broadcastState);
return broadcastState;
}

getBroadcastState 方法主要就是初始化 HeapBroadcastState 物件,也就是廣播狀態的具體實現類,再來看一下 HeapBroadcastState 原始碼。

HeapBroadcastState 原始碼

/**
* A {@link BroadcastState Broadcast State} backed a heap-based {@link Map}.
*
* @param <K> The key type of the elements in the {@link BroadcastState Broadcast State}.
* @param <V> The value type of the elements in the {@link BroadcastState Broadcast State}.
*/

public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K, V> {

/** Meta information of the state, including state name, assignment mode, and serializer. */
private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;

/** The internal map the holds the elements of the state. */
private final Map<K, V> backingMap;

/** A serializer that allows to perform deep copies of internal map state. */
private final MapSerializer<K, V> internalMapCopySerializer;

HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this(stateMetaInfo, new HashMap<>());
}

private HeapBroadcastState(
final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo,
final Map<K, V> internalMap)
{

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.backingMap = Preconditions.checkNotNull(internalMap);
this.internalMapCopySerializer =
new MapSerializer<>(
stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer());
}

private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
this(
toCopy.stateMetaInfo.deepCopy(),
toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
}

@Override
public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}

@Override
public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
return stateMetaInfo;
}

@Override
public HeapBroadcastState<K, V> deepCopy() {
return new HeapBroadcastState<>(this);
}

@Override
public void clear() {
backingMap.clear();
}

@Override
public String toString() {
return "HeapBroadcastState{"
+ "stateMetaInfo="
+ stateMetaInfo
+ ", backingMap="
+ backingMap
+ ", internalMapCopySerializer="
+ internalMapCopySerializer
+ '}';
}

@Override
public long write(FSDataOutputStream out) throws IOException {
long partitionOffset = out.getPos();

DataOutputView dov = new DataOutputViewStreamWrapper(out);
dov.writeInt(backingMap.size());
for (Map.Entry<K, V> entry : backingMap.entrySet()) {
getStateMetaInfo().getKeySerializer().serialize(entry.getKey(), dov);
getStateMetaInfo().getValueSerializer().serialize(entry.getValue(), dov);
}

return partitionOffset;
}

@Override
public V get(K key) {
return backingMap.get(key);
}

@Override
public void put(K key, V value) {
backingMap.put(key, value);
}

@Override
public void putAll(Map<K, V> map) {
backingMap.putAll(map);
}

@Override
public void remove(K key) {
backingMap.remove(key);
}

@Override
public boolean contains(K key) {
return backingMap.containsKey(key);
}

@Override
public Iterator<Map.Entry<K, V>> iterator() {
return backingMap.entrySet().iterator();
}

@Override
public Iterable<Map.Entry<K, V>> entries() {
return backingMap.entrySet();
}

@Override
public Iterable<Map.Entry<K, V>> immutableEntries() {
return Collections.unmodifiableSet(backingMap.entrySet());
}
}

HeapBroadcastState 的程式碼比較簡單,主要是對狀態的讀寫操作,本質就是在操作 HashMap。

接著回到 CoBroadcastWithKeyedOperator 的 processElement1 方法裡用的是 ReadOnlyContextImpl,processElement2 方法裡用的是 ReadWriteContextImpl,換句話說,只有在廣播側才可以修改狀態,在非廣播側不能修改狀態,這裡對應了上面的第二個問題。

雖然在廣播側和非廣側都可以獲取到狀態,但是 getBroadcastState 方法的返回值是不一樣的。

BroadcastState & ReadOnlyBroadcastState UML 圖

HeapBroadcastState

BroadcastState 介面繼承了 ReadOnlyBroadcastState 介面又繼承了 State 介面,BroadcastState 介面的唯一實現類是 HeapBroadcastState,從名字上就能看出廣播狀態是儲存在 JVM 堆記憶體上的。底層就是一個 Map,上圖中的 backingMap 就是用來儲存狀態資料的,這裡對應了上面的第三個問題。

為了進一步解釋上面的第二個問題,下面補充一個具體的場景來說明。

舉例說明

BroadcastStream

我們來看上圖中的場景,A 流讀取 Kafka 的資料然後經過 keyby 返回一個 KeyedStream,B 流讀取 mysql 的資料用於廣播流返回一個 BroadcastStream,B 流有兩條資料分別是 flink,spark,然後會廣播到下游的每一個 subtask 上去,此時下游的 subtask-0,subtask-1 就擁有了廣播狀態中的 flink,spark 兩條資料,這個時候往 Kafka 裡寫入兩條資料分別是 flink 和 hive,經過 keyby 操作,flink 被分配到了下游的 subtask-0 上,hive 被分配到了 subtask-1 上,很明顯 flink 這條資料可以和廣播流資料關聯上,hive 這條資料則關聯不上,此時,如果在非廣播側也就是 A 流側修改了狀態,比如把 flink, hive 新增到了狀態裡面,此時 subtask-0 和 subtask-1 上的廣播狀態資料就會出現不一致的情況,所以,為了保證 operator 的所有併發例項持有的廣播狀態的一致性,在設計的時候就禁止在非廣播側修改狀態。

總結

Broadcast State 是 Operator State 的一種特殊型別。主要是用來解決低吞吐量的流(小資料量)和另一個原始資料流關聯的場景,廣播狀態必須定義為 Map 結構,並且只能在廣播流側修改狀態,非廣播流側只能獲取狀態,不能修改狀態。廣播狀態只能儲存在堆記憶體中,所以在使用廣播狀態的時候需要給 TM 設定足夠的記憶體,本文主要從原始碼的角度解釋了 Flink 這麼設計的原因,讓大家對廣播流狀態有了更加深入的理解。

推薦閱讀

!! Flink 任務實時監控最佳實踐

Flink on yarn 實時日誌收集最佳實踐

Flink 1.14.0 全新的 Kafka Connector

Flink 1.14.0 消費 kafka 資料自定義反序列化類

Flink SQL JSON Format 原始碼解析

Flink 通過 State Processor API 實現狀態的讀取和寫入

Flink 原始碼分析之 Client 端啟動流程分析

Flink Print SQL Connector 新增隨機取樣功能

Flink on yarn 遠端除錯原始碼

IDEA 中使用 Big Data Tools 連線大資料元件

圖片

如果你覺得文章對你有幫助,麻煩點一下 在看 吧,你的支援是我創作的最大動力.