[Flink-原始碼分析]Blink SQL 回撤解密

語言: CN / TW / HK

因為目前我司使用的版本還是和Blink對齊的版本,所以本文還是先針對Blink中對於回撤的實現來進行原始碼分析。

概念

回撤這個概念,是流計算中特有的,簡單理解起來就是將先前的計算結果回撤,那什麼場景下會出現回撤呢?當" 中間計算結果 "被提前下發時,後續更新結果時,需要將先前的中間值回撤,並下發更新後的值。因此回撤的使用場景,主要是在會產生中間計算結果的場景。

在流計算中,因為上游的資料集是持續流入的,因此計算的結果都是中間結果。例如

  • group aggregate 計算中,每來一條資料,都會去更新某個維度的聚合值下發,這個時候下發的就是中間結果。
  • 雙流join中,left join場景下,右表沒有資料時會先下發一條 (left, null) 下去,而當後續右表相對應的key的資料到達時,要將先前傳送的(left, null)的資料回撤掉,在傳送(left, right)
  • 在window 運算元中,如果我們設定了early trigger,那麼就會發送中間計算結果,就會需要傳送回撤訊息。

以第一種場景為例,如果下游是一個按主鍵更新的儲存,其結果會被不斷的覆蓋,計算結果沒有正確性的問題。但是如果計算的sql是如下場景。

SELECT
  cnt,
  COUNT (word) AS freq
FROM
  (
    SELECT
      word,
      COUNT(num) AS cnt
    FROM
      Table
    GROUP BY
      word
  )
GROUP BY
  cnt;

第一階段產生的count值,會作為第二個階段的group by維度,因此當count值發生變化時,如果不傳送回撤訊息,產生的聚合值就是錯誤的

我們可以看到傳送回撤訊息,最終目的是實現計算結果的準確性,最終一致性。

規則推導

在瞭解回撤的作用之後,我們再來看看blink中具體是怎麼實現回撤功能的。在 StreamExecRetractionRules 中定義了三個規則

  • StreamExecRetractionRules.DEFAULT_RETRACTION_INSTANCE
  • StreamExecRetractionRules.UPDATES_AS_RETRACTION_INSTANCE
  • StreamExecRetractionRules.ACCMODE_INSTANCE

這三個規則的主要作用是給StreamPhysicalRel節點上去分配AccModeTrait。AccModeTrait 是一種RelTrait,表示了每個節點的物理屬性。而AccModeTrait中的屬性有以下兩種

  • Acc
    • insert: (true, newRow)
    • update: (true, newRow)
    • delete: (false, oldRow)
  • AccRetract
    • insert: (true, newRow)
    • update: (false, oldRow), (true, newRow)
    • delete: (false, oldRow)

以上表示的是兩種模式下,針對insert,update和delete三種訊息,向下遊傳送具體message的差異,其中true或者false表示的是BaseRow中的header頭,用來表示是否是回撤訊息。

可以看到這兩個模式的主要差別是傳送更新訊息時的行為不一樣。 AccRetract模式下update訊息會先發送一條oldRow的回撤訊息。

AssignDefaultRetractionRule

首先我們來分析第一個RetractRule。

override def onMatch(call: RelOptRuleCall): Unit = {
      val rel = call.rel(0).asInstanceOf[StreamPhysicalRel]
      val traits = rel.getTraitSet

      // init AccModeTrait
      val traitsWithAccMode =
        if (
          AccModeTrait.DEFAULT == traits.getTrait(AccModeTraitDef.INSTANCE) &&
            ((rel.isInstanceOf[StreamExecDataStreamScan] &&
              rel.asInstanceOf[StreamExecDataStreamScan].isAccRetract) ||
              (rel.isInstanceOf[StreamExecIntermediateTableScan] &&
                rel.asInstanceOf[StreamExecIntermediateTableScan].isAccRetract))) {
          // if source is AccRetract
          traits.plus(new AccModeTrait(AccMode.AccRetract))
        } else {
          traits
        }

      // transform
      if (traits != traitsWithAccMode) {
        call.transformTo(rel.copy(traitsWithAccMode, rel.getInputs))
      }
    }

第一個是規則,只針對源頭節點進行分析。FlinkRelBuilder中傳入了五個Flink自定義的RelTraitDef,這裡我們需要關注的是AccModeTraitDef。

這裡定義的幾個traitDef,會在建立關係表示式時,作為預設的triat注入。因此在進行rule檢測之前,預設所有的RelNode中的AccModeTrait都是AccMode.Acc,因此這個規則就是檢測source節點,如果source節點的物理節點具有 xxx.isAccRetract 的屬性,就將其AccModeTrait設定為AccMode.AccRetract

SetUpdatesAsRetractionRule

/**
 * Annotates the children of a parent node with the information that they need to forward
 * update and delete modifications as retraction messages.
 *
 * A child needs to produce retraction messages, if
 *
 * 1. its parent requires retraction messages by itself because it is a certain type
 * of operator, such as a [[StreamExecGroupAggregate]] or [[StreamExecOverAggregate]], or
 * 2. its parent requires retraction because its own parent requires retraction
 * (transitive requirement).
 *
 */
override def onMatch(call: RelOptRuleCall): Unit = {
    val parent = call.rel(0).asInstanceOf[StreamPhysicalRel]

    val children = getChildRelNodes(parent)
    // check if children need to sent out retraction messages
    val newChildren = for (c <- children) yield {
      if (needsUpdatesAsRetraction(parent, c) && !containUpdatesAsRetraction(c)) {
        setUpdatesAsRetraction(c)
      } else {
        c
      }
    }

    // update parent if a child was updated
    if (children != newChildren) {
      call.transformTo(parent.copy(parent.getTraitSet, newChildren.asJava))
    }
  }
}

child: 在關係代數的執行樹中處於靠下位置,如source節點。

根據規則的註釋,他所做的是將parent的child節點(也就是輸入節點)標記上UpdateAsRetractionTrait,以下場景中的輸入節點需要被標記

StreamExecGroupAggregate

具體流程中,是首先根據parent節點獲取其所有的輸入(child), 遍歷輸入節點,當兩個條件滿足時,將child節點設定指定的trait。

  • needsUpdatesAsRetraction
  • !containUpdatesAsRetraction 這個表示當前child沒有UpdateAsRetraction的標記

setUpdatesAsRetraction

def setUpdatesAsRetraction(relNode: RelNode): RelNode = {
  val traitSet = relNode.getTraitSet
  relNode.copy(traitSet.plus(new UpdateAsRetractionTrait(true)), relNode.getInputs)
}

這個方法就是將當前的relNode節點新增上 UpdateAsRetractionTrait 的trait

containUpdatesAsRetraction

def containUpdatesAsRetraction(node: RelNode): Boolean = {
  val retractionTrait = node.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE)
  retractionTrait != null && retractionTrait.sendsUpdatesAsRetractions
}

判斷輸入節點是否有UpdateAsRetraction的屬性

needsUpdatesAsRetraction

其入參是 下游節點輸入節點

// node是下游節點
def needsUpdatesAsRetraction(node: StreamPhysicalRel, input: RelNode): Boolean = {
  node match {
    case _ if shipUpdatesAsRetraction(node) => true
    case dsr: StreamPhysicalRel => dsr.needsUpdatesAsRetraction(input)
  }
}

def shipUpdatesAsRetraction(node: StreamPhysicalRel): Boolean = {
  containUpdatesAsRetraction(node) && !node.consumesRetractions
}
  1. 下游節點 已經被標記為了UpdateAsRetraction,並且 下游節點 不會consumeRetraction訊息( 這個是 StreamPhysicalRel的一個屬性 ,由各個物理節點各自實現 )
  2. 或者檢測下游節點是否需要 needsUpdatesAsRetraction,這個也是 StreamPhysicalRel的一個屬性。

SetAccModeRule

/**
  * Updates the [[AccMode]] of a [[RelNode]] and its children if necessary.
  */
override def onMatch(call: RelOptRuleCall): Unit = {
  val parent = call.rel(0).asInstanceOf[StreamPhysicalRel]
  val children = getChildRelNodes(parent)

  // check if the AccMode of the parent needs to be updated
  if (!isAccRetract(parent) &&
    (producesRetractions(parent) || forwardsRetractions(parent, children))) {
    call.transformTo(setAccRetract(parent))
  }
}
/**
  * Checks if a [[StreamPhysicalRel]] produces retraction messages.
  */
def producesRetractions(node: StreamPhysicalRel): Boolean = {
  containUpdatesAsRetraction(node) && node.producesUpdates || node.producesRetractions
}
/**
   * Checks if a [[StreamPhysicalRel]] forwards retraction messages from its children.
   */
 def forwardsRetractions(parent: StreamPhysicalRel, children: Seq[RelNode]): Boolean = {
   children.exists(c => isAccRetract(c)) && !parent.consumesRetractions
 }
  1. node 包含有UpdatesAsRetraction 並且node會產生更新訊息,那麼就需要傳送回撤
  2. 或者node本身就會發送回撤訊息,node.producesUpdates 和 node.producesRetractions 這兩者也都是 StreamPhysicalRel的一個屬性。
  3. 或者上游節點存在AccRetract的節點,並且當前節點不能consumeRetraction

這幾種情況下,當前運算元就需要加上AccRetract的屬性。

StreamPhysicalRel

/**
    * Whether the [[FlinkRelNode]] produces update and delete changes.
    */
  def producesUpdates: Boolean = false

  /**
    * Whether the [[FlinkRelNode]] requires retraction messages or not.
    */
  def needsUpdatesAsRetraction(input: RelNode): Boolean = false

  /**
    * Whether the [[FlinkRelNode]] consumes retraction messages instead of forwarding them.
    * The node might or might not produce new retraction messages.
    */
  def consumesRetractions: Boolean = false

  /**
    * Whether the [[FlinkRelNode]] produces retraction messages.
    */
  def producesRetractions: Boolean = false
  • producesUpdates表示該節點是否會產生更新訊息
  • producesRetractions表示該節點是否會直接產生回撤訊息
  • needsUpdatesAsRetraction表示該節點是否需要上游將update編譯成retract訊息傳送
  • consumesRetractions 表示該節點是否會處理上游的retract訊息,而不是直接轉發。但是該節點仍然可能會發送retract訊息下去。

小結

所以大致邏輯就是

  • 首先檢測每個node是否需要UpdatesAsRetraction,如果是的話,就將其上游節點標記為UpdatesAsRetraction。
  • 然後檢測每個node是否包含UpdatesAsRetraction,如果包含,並且該節點會發送update訊息,那麼就將其更新為AccRetract。或者上游已經是AccRetract,並且下游不會處理retract訊息,則將下游標記為AccRetract。

其中會中斷retract傳播鏈的,只是有consumesRetractions的節點會阻斷。

優化規則執行順序

這裡面在測試的時候,還注意到一個細節,例如以下的計算鏈路,按照推導:

因為StreamExecLocalGroupAggregate會consumesRetractions,那麼他被標記為UpdatesAsRetraction,而其needsUpdatesAsRetraction的屬性也為false,那麼它是如何告知上游的GroupAggregate需要傳送回撤訊息呢,其使用的agg函式為什麼會有retract的字尾呢?

其原因是,sql優化的規則執行是有順序的,在physical rewrite階段,首先會進行AccTrait的設定,隨後才會進行諸如兩階段之類的rewrite優化,因此在前面的過程中不會涉及StreamExecLocalGroupAggregate節點的判斷,這個是在兩階段中生成的物理節點。這個規則的執行順序和calcite的Volcano的優化規則模型執行優化,後續有時間再寫文研究下。

以更低的代價retract

我們再回頭看,實際上以上的規則就是對Acc訊息分為Acc和AccRetract已經是針對retract場景做了優化,因為不是所有的場景都需要上游傳送retract訊息,如果每種場景都會發送retract,訊息量和網路開銷就直接double,因此才有了上述的這套推導規則。只有當下遊需要上游以retract的方式傳送更新訊息,上游才傳送retract訊息。

傳送retract訊息一般會帶來一些額外的開銷,包括

  • 網路資料量翻倍 像一般retract時,都會判斷當前的計算值和先前的計算值不一樣了,才會傳送回撤訊息,這也是減少傳送量的一種優化
  • 下游的agg函式為了處理回撤,可能只能採取效能較差的實現,典型的例如Max和Max_with_retract
  • 需要在上游儲存先前已經發送的值,保存於狀態中

參考