Spark 如何對源端資料做切分?

語言: CN / TW / HK

引言

典型的Spark作業讀取位於OSS的Parquet外表時,源端的併發度(task/partition)如何確定?特別是在做TPCH測試時有一些疑問,如源端掃描檔案的併發度是如何確定的?是否一個parquet檔案對應一個partition?多個parquet檔案對應一個partition?還是一個parquet檔案對應多個partition?本文將從原始碼角度進行分析進而解答這些疑問。

分析

資料來源讀取對應的物理執行節點為FileSourceScanExec,讀取資料程式碼塊如下

lazy val inputRDD: RDD[InternalRow] = {
    val readFile: (PartitionedFile) => Iterator[InternalRow] =
      relation.fileFormat.buildReaderWithPartitionValues(
        sparkSession = relation.sparkSession,
        dataSchema = relation.dataSchema,
        partitionSchema = relation.partitionSchema,
        requiredSchema = requiredSchema,
        filters = pushedDownFilters,
        options = relation.options,
        hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    val readRDD = if (bucketedScan) {
      createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
        relation)
    } else {
      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
    }
    sendDriverMetrics()
    readRDD
  }

主要關注非bucket的處理,對於非bucket的掃描呼叫createReadRDD方法定義如下

/**
   * Create an RDD for non-bucketed reads.
   * The bucketed variant of this function is [[createBucketedReadRDD]].
   *
   * @param readFile a function to read each (part of a) file.
   * @param selectedPartitions Hive-style partition that are part of the read.
   * @param fsRelation [[HadoopFsRelation]] associated with the read.
   */
  private def createReadRDD(
      readFile: (PartitionedFile) => Iterator[InternalRow],
      selectedPartitions: Array[PartitionDirectory],
      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
    // 檔案開啟開銷,每次開啟檔案最少需要讀取的位元組    
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    // 最大切分分片大小
    val maxSplitBytes =
      FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")
    // Filter files with bucket pruning if possible
    val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled
    val shouldProcess: Path => Boolean = optionalBucketSet match {
      case Some(bucketSet) if bucketingEnabled =>
        // Do not prune the file if bucket file name is invalid
        filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
      case _ =>
        _ => true
    }
    // 對分割槽下檔案進行切分並按照從大到小進行排序
    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        // getPath() is very expensive so we only want to call it once in this block:
        val filePath = file.getPath
        if (shouldProcess(filePath)) {
          // 檔案是否可split,parquet/orc/avro均可被split
          val isSplitable = relation.fileFormat.isSplitable(
            relation.sparkSession, relation.options, filePath)
          // 切分檔案
          PartitionedFileUtil.splitFiles(
            sparkSession = relation.sparkSession,
            file = file,
            filePath = filePath,
            isSplitable = isSplitable,
            maxSplitBytes = maxSplitBytes,
            partitionValues = partition.values
          )
        } else {
          Seq.empty
        }
      }
    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
    val partitions =
      FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
  }

可以看到確定最大切分分片大小maxSplitBytes對於後續切分為多少個檔案非常重要,其核心邏輯如下

def maxSplitBytes(
      sparkSession: SparkSession,
      selectedPartitions: Seq[PartitionDirectory]): Long = {
    // 讀取檔案時打包成最大的partition大小,預設為128MB,對應一個block大小
    val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
    // 開啟每個檔案的開銷,預設為4MB
    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
    // 建議的(不保證)最小分割檔案分割槽數,預設未設定,從leafNodeDefaultParallelism獲取
    // 程式碼邏輯呼叫鏈 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism
    // -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism
    // -> 總共多少核max(executor core總和, 2),最少為2
    val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
      .getOrElse(sparkSession.leafNodeDefaultParallelism)
    // 總共讀取的大小
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    // 單core讀取的大小
    val bytesPerCore = totalBytes / minPartitionNum
    // 計算大小,不會超過設定的128MB
    Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
  }

對於PartitionedFileUtil#splitFiles,其核心邏輯如下,較為簡單,直接按照最大切分大小切分大檔案來進行分片

def splitFiles(
      sparkSession: SparkSession,
      file: FileStatus,
      filePath: Path,
      isSplitable: Boolean,
      maxSplitBytes: Long,
      partitionValues: InternalRow): Seq[PartitionedFile] = {
    if (isSplitable) {
      // 切分為多個分片
      (0L until file.getLen by maxSplitBytes).map { offset =>
        val remaining = file.getLen - offset
        val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
        val hosts = getBlockHosts(getBlockLocations(file), offset, size)
        PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
      }
    } else {
      Seq(getPartitionedFile(file, filePath, partitionValues))
    }
  }

在獲取到Seq[PartitionedFile]列表後,還並沒有完成對檔案的切分,還需要呼叫FilePartition#getFilePartitions做最後的處理,方法核心邏輯如下

def getFilePartitions(
      sparkSession: SparkSession,
      partitionedFiles: Seq[PartitionedFile],
      maxSplitBytes: Long): Seq[FilePartition] = {
    val partitions = new ArrayBuffer[FilePartition]
    val currentFiles = new ArrayBuffer[PartitionedFile]
    var currentSize = 0L
    /** Close the current partition and move to the next. */
    def closePartition(): Unit = {
      if (currentFiles.nonEmpty) {
        // Copy to a new Array.
        // 重新生成一個新的PartitionFile
        val newPartition = FilePartition(partitions.size, currentFiles.toArray)
        partitions += newPartition
      }
      currentFiles.clear()
      currentSize = 0
    }
    // 開啟檔案開銷,預設為4MB
    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
    // Assign files to partitions using "Next Fit Decreasing"
    partitionedFiles.foreach { file =>
      if (currentSize + file.length > maxSplitBytes) {
        // 如果累加的檔案大小大於的最大切分大小,則關閉該分割槽,表示完成一個Task讀取的資料切分
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize += file.length + openCostInBytes
      currentFiles += file
    }
    // 最後關閉一次分割槽,檔案可能較小
    closePartition()
    partitions.toSeq
  }

可以看到經過這一步後,會把一些小檔案做合併,生成maxSplitBytes大小的PartitionFile,這樣可以避免拉起太多task讀取太多小的檔案。

生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的併發度為partitions的長度,也即最後Spark生成的Task個數

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

整體流程圖如下圖所示

拆分、合併過程如下圖所示

實戰

對於TPCH 10G生成的customer parquet表

http:// oss.console.aliyun.com/ bucket/oss-cn-hangzhou/fengzetest/object?path=rt_spark_test%2Fcustomer-parquet%2F

共8個Parquet檔案,總檔案大小為113.918MB

Spark作業配置如下,executor只有1core

conf spark.driver.resourceSpec=small;
conf spark.executor.instances=1;
conf spark.executor.resourceSpec=small;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;

根據前面的公式計算

defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(1, 2) = 2
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 2 = 72.959MB
maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

得到maxSplitBytes為72.959MB,從日誌中也可看到對應大小

經過排序後的檔案順序為(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次經過合併後得到3個FilePartitioned,分別對應

  • FilePartitioned 1: 00000, 00001, 00002
  • FilePartitioned 2: 00003, 00004, 00006
  • FilePartitioned 3: 00005, 00007

即總共會生成3個Task

從Spark UI檢視確實生成3個Task

從日誌檢視也是生成3個Task

變更Spark作業配置,5個executor共10core

conf spark.driver.resourceSpec=small;
conf spark.executor.instances=5;
conf spark.executor.resourceSpec=medium;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;

根據前面的公式計算

defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(10, 2) = 10
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 10 = 14.5918MB
maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

檢視日誌

此時可以看到14.5918MB會對原始檔進行切分,會對00001, 00002,00003,00004,00005,00006進行切分,切分成兩份,00007由於小於14.5918MB,因此不會進行切分,經過PartitionedFileUtil#splitFiles後,總共存在7 * 2 + 1 = 15個PartitionedFile

  • 00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)
  • 00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)
  • 00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)
  • 00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)
  • 00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)
  • 00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)
  • 00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)
  • 00007(0 -> 4.634MB)

經過排序後得到如下以及合併後得到10個FilePartitioned,分別對應

  • FilePartitioned 1: 00000(0 -> 14.5918MB)
  • FilePartitioned 2: 00001(0 -> 14.5918MB)
  • FilePartitioned 3: 00002(0 -> 14.5918MB)
  • FilePartitioned 4: 00003(0 -> 14.5918MB)
  • FilePartitioned 5: 00004(0 -> 14.5918MB)
  • FilePartitioned 6: 00005(0 -> 14.5918MB)
  • FilePartitioned 7: 00006(0 -> 14.5918MB)
  • FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
  • FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)
  • FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)

即總共會生成10個Task

通過Spark UI也可檢視到生成了10個Task

檢視日誌,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)在同一個Task中

00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)

00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)在同一個Task中

總結

通過原始碼可知Spark對於源端Partition切分,會考慮到分割槽下所有檔案大小以及開啟每個檔案的開銷,同時會涉及對大檔案的切分以及小檔案的合併,最後得到一個相對合理的Partition。

原文連結

本文為阿里雲原創內容,未經允許不得轉載。