Spark 如何對源端資料做切分?
lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) val readRDD = if (bucketedScan) { createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, relation) } else { createReadRDD(readFile, dynamicallySelectedPartitions, relation) } sendDriverMetrics() readRDD }
/** * Create an RDD for non-bucketed reads. * The bucketed variant of this function is [[createBucketedReadRDD]]. * * @param readFile a function to read each (part of a) file. * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. */ private def createReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { // 檔案開啟開銷,每次開啟檔案最少需要讀取的位元組 val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes // 最大切分分片大小 val maxSplitBytes = FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") // Filter files with bucket pruning if possible val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled val shouldProcess: Path => Boolean = optionalBucketSet match { case Some(bucketSet) if bucketingEnabled => // Do not prune the file if bucket file name is invalid filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) case _ => _ => true } // 對分割槽下檔案進行切分並按照從大到小進行排序 val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath if (shouldProcess(filePath)) { // 檔案是否可split,parquet/orc/avro均可被split val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) // 切分檔案 PartitionedFileUtil.splitFiles( sparkSession = relation.sparkSession, file = file, filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partition.values ) } else { Seq.empty } } }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) new FileScanRDD(fsRelation.sparkSession, readFile, partitions) }
def maxSplitBytes( sparkSession: SparkSession, selectedPartitions: Seq[PartitionDirectory]): Long = { // 讀取檔案時打包成最大的partition大小,預設為128MB,對應一個block大小 val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes // 開啟每個檔案的開銷,預設為4MB val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes // 建議的(不保證)最小分割檔案分割槽數,預設未設定,從leafNodeDefaultParallelism獲取 // 程式碼邏輯呼叫鏈 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism // -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism // -> 總共多少核max(executor core總和, 2),最少為2 val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(sparkSession.leafNodeDefaultParallelism) // 總共讀取的大小 val totalBytes = selectedPartitions.flatMap( + openCostInBytes)).sum // 單core讀取的大小 val bytesPerCore = totalBytes / minPartitionNum // 計算大小,不會超過設定的128MB Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) }
def splitFiles( sparkSession: SparkSession, file: FileStatus, filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, partitionValues: InternalRow): Seq[PartitionedFile] = { if (isSplitable) { // 切分為多個分片 (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(getBlockLocations(file), offset, size) PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts) } } else { Seq(getPartitionedFile(file, filePath, partitionValues)) } }
def getFilePartitions( sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile], maxSplitBytes: Long): Seq[FilePartition] = { val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] var currentSize = 0L /** Close the current partition and move to the next. */ def closePartition(): Unit = { if (currentFiles.nonEmpty) { // Copy to a new Array. // 重新生成一個新的PartitionFile val newPartition = FilePartition(partitions.size, currentFiles.toArray) partitions += newPartition } currentFiles.clear() currentSize = 0 } // 開啟檔案開銷,預設為4MB val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes // Assign files to partitions using "Next Fit Decreasing" partitionedFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { // 如果累加的檔案大小大於的最大切分大小,則關閉該分割槽,表示完成一個Task讀取的資料切分 closePartition() } // Add the given file to the current partition. currentSize += file.length + openCostInBytes currentFiles += file } // 最後關閉一次分割槽,檔案可能較小 closePartition() partitions.toSeq }
生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的併發度為partitions的長度,也即最後Spark生成的Task個數
override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray
對於TPCH 10G生成的customer parquet表
conf spark.driver.resourceSpec=small; conf spark.executor.instances=1; conf spark.executor.resourceSpec=small; conf SQL Test; conf spark.adb.connectors=oss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100;
defaultMaxSplitBytes = 128MB openCostInBytes = 4MB minPartitionNum = max(1, 2) = 2 totalBytes = 113.918 + 8 * 4MB = 145.918MB bytesPerCore = 145.918MB / 2 = 72.959MB maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
經過排序後的檔案順序為(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次經過合併後得到3個FilePartitioned,分別對應
- FilePartitioned 1: 00000, 00001, 00002
- FilePartitioned 2: 00003, 00004, 00006
- FilePartitioned 3: 00005, 00007
從Spark UI檢視確實生成3個Task
conf spark.driver.resourceSpec=small; conf spark.executor.instances=5; conf spark.executor.resourceSpec=medium; conf SQL Test; conf spark.adb.connectors=oss; use tpcd; select * from customer order by C_CUSTKEY desc limit 100;
defaultMaxSplitBytes = 128MB openCostInBytes = 4MB minPartitionNum = max(10, 2) = 10 totalBytes = 113.918 + 8 * 4MB = 145.918MB bytesPerCore = 145.918MB / 10 = 14.5918MB maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
此時可以看到14.5918MB會對原始檔進行切分,會對00001, 00002,00003,00004,00005,00006進行切分,切分成兩份,00007由於小於14.5918MB,因此不會進行切分,經過PartitionedFileUtil#splitFiles後,總共存在7 * 2 + 1 = 15個PartitionedFile
- 00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)
- 00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)
- 00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)
- 00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)
- 00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)
- 00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)
- 00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)
- 00007(0 -> 4.634MB)
- FilePartitioned 1: 00000(0 -> 14.5918MB)
- FilePartitioned 2: 00001(0 -> 14.5918MB)
- FilePartitioned 3: 00002(0 -> 14.5918MB)
- FilePartitioned 4: 00003(0 -> 14.5918MB)
- FilePartitioned 5: 00004(0 -> 14.5918MB)
- FilePartitioned 6: 00005(0 -> 14.5918MB)
- FilePartitioned 7: 00006(0 -> 14.5918MB)
- FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
- FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)
- FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)
通過Spark UI也可檢視到生成了10個Task
檢視日誌,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)在同一個Task中
00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)
00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)在同一個Task中
