Spark 閉包(Task not serializable)問題分析及解決

語言: CN / TW / HK

問題描述及原因分析

在編寫Spark程式中,由於在map等運算元內部使用了外部定義的變數和函式,從而引發Task未序列化問題。然而,Spark運算元在計算過程中使用外部變數在許多情形下確實在所難免,比如在filter運算元根據外部指定的條件進行過濾,map根據相應的配置進行變換等。為了解決上述Task未序列化問題,這裡對其進行了研究和總結。

出現“ org.apache.spark.SparkException: Task not serializable ”這個錯誤,一般是因為在map、filter等的引數使用了外部的變數,但是這個變數不能序列化(  不是說不可以引用外部變數,只是要做好序列化工作  ,具體後面詳述)。其中最普遍的情形是:當引用了某個類(經常是當前類)的成員函式或變數時,會導致這個類的所有成員(整個類)都需要支援序列化。雖然許多情形下,當前類使用了“extends Serializable”宣告支援序列化,但是由於某些欄位不支援序列化,仍然會導致整個類序列化時出現問題,最終導致出現Task未序列化問題。

引用成員變數的例項分析

如上所述,  由於Spark程式中的map、filter等運算元內部引用了類成員函式或變數導致需要該類所有成員都需要支援序列化,又由於該類某些成員變數不支援序列化,最終引發Task無法序列化問題 。為了驗證上述原因,我們編寫了一個例項程式,如下所示。該類的功能是從域名列表中(rdd)過濾得到特定頂級域名(rootDomain,如.com,.cn,.org)的域名列表,而該特定頂級域名需要函式呼叫時指定。

class MyTest1(conf:String) extends Serializable{
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
private val sparkConf = new SparkConf().setAppName("AppName");
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);

private val rootDomain = conf

def getResult(): Array[(String)] = {
val result = rdd.filter(item => item.contains(rootDomain))
result.take(result.count().toInt)
}
}

依據上述分析的原因,由於依賴了當前類的成員變數,所以導致當前類全部需要序列化,由於當前類某些欄位未做好序列化,導致出錯。實際情況與分析的原因一致,執行過程中出現錯誤,如下所示。分析下面的錯誤報告得到錯誤是由於sc(SparkContext)引起的。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(**SparkContext**.scala:1435)
……
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
- field (class "com.ntci.test.MyTest1", name: "sc", type: "class org.apache.spark.SparkContext")
- object (class "com.ntci.test.MyTest1", [email protected]63700353)
- field (class "com.ntci.test.MyTest1$$anonfun$1", name: "$outer", type: "class com.ntci.test.MyTest1")

為了驗證上述結論,將不需要序列化的的成員變數使用關鍵字“@transent”標註,表示不序列化當前類中的這兩個成員變數,再次執行函式,同樣報錯。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
……
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
- field (class "com.ntci.test.MyTest1", name: "sparkConf", type: "class org.apache.spark.**SparkConf**")
- object (class "com.ntci.test.MyTest1", [email protected])

雖然錯誤原因相同,但是這次導致錯誤的欄位是sparkConf(SparkConf)。使用同樣的“@transent”標註方式,將sc(SparkContext)和sparkConf(SparkConf)都標註為不需序列化,再次執行時,程式正常執行。

class MyTest1(conf:String) extends Serializable{
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
@transient
private val sparkConf = new SparkConf().setAppName("AppName");
@transient
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);

private val rootDomain = conf

def getResult(): Array[(String)] = {

val result = rdd.filter(item => item.contains(rootDomain))
result.take(result.count().toInt)
}
}

所以,通過上面的例子我們可以得到結論:由於Spark程式中的map、filter等運算元內部引用了類成員函式或變數導致該類所有成員都需要支援序列化,又由於該類某些成員變數不支援序列化,最終引發Task無法序列化問題。相反地,對類中那些不支援序列化問題的成員變數標註後,使得整個類能夠正常序列化,最終消除Task未序列化問題。

引用成員函式的例項分析

成員變數與成員函式的對序列化的影響相同,即引用了某類的成員函式,會導致該類所有成員都支援序列化。為了驗證這個假設,我們在map中使用了當前類的一個成員函式,作用是如果當前域名沒有以“www.”開頭,那麼就在域名頭新增“www.”字首(注:由於rootDomain是在getResult函式內部定義的,就不存在引用類成員變數的問題,也就不存在和排除了上一個例子所討論和引發的問題,因此這個例子主要討論成員函式引用的影響;此外,不直接引用類成員變數也是解決這類問題的一個手段,如本例中為了消除成員變數的影響而在函式內部定義變數的這種做法,這類問題具體的規避做法此處略提,在下一節作詳細闡述)。下面的程式碼同樣會報錯,同上面的例子一樣,由於當前類中的sc(SparkContext)和sparkConf(SparkConf)兩個成員變數沒有做好序列化處理,導致當前類的序列化出現問題。

class MyTest1(conf:String)  extends Serializable{
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
private val sparkConf = new SparkConf().setAppName("AppName");
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);

def getResult(): Array[(String)] = {
val rootDomain = conf
val result = rdd.filter(item => item.contains(rootDomain))
.map(item => addWWW(item))
result.take(result.count().toInt)
}

def addWWW(str:String):String = {
if(str.startsWith("www."))
str
else
"www."+str
}
}

如同前面的做法,將sc(SparkContext)和sparkConf(SparkConf)兩個成員變數使用“@transent”標註後,使當前類不序列化這兩個變數,則程式可以正常執行。此外,與成員變數稍有不同的是,由於該成員函式不依賴特定的成員變數,因此可以定義在scala的object中(類似於Java中的static函式),這樣也取消了對特定類的依賴。如下面例子所示,將addWWW放到一個object物件(UtilTool)中去,filter操作中直接呼叫,這樣處理以後,程式能夠正常執行。

def getResult(): Array[(String)] = {
val rootDomain = conf
val result = rdd.filter(item => item.contains(rootDomain))
.map(item => UtilTool.addWWW(item))
result.take(result.count().toInt)
}
object UtilTool {
def addWWW(str:String):String = {
if(str.startsWith("www."))
str
else
"www."+str
}
}

對全類序列化要求的驗證

如上所述,引用了某類的成員函式,會導致該類及所有成員都需要支援序列化。因此,對於使用了某類成員變數或函式的情形,首先該類需要序列化(extends Serializable),同時需要對某些不需要序列化的成員變數標記以避免為序列化造成影響。對於上面兩個例子,由於引用了該類的成員變數或函式,導致該類以及所有成員支援序列化,為了消除某些成員變數對序列化的影響,使用“@transent”進行標註。 

為了進一步驗證關於整個類需要序列化的假設,這裡在上面例子使用“@transent”標註後並且能正常執行的程式碼基礎上,將類序列化的相關程式碼刪除(去掉extends Serializable),這樣程式執行會報該類為序列化的錯誤,如下所示。所以通過這個例項說明了上面的假設。

Caused by: java.io.NotSerializableException: com.ntci.test.MyTest1
- field (class "com.ntci.test.MyTest1$$anonfun$1", name: "$outer", type: "class com.ntci.test.MyTest1")

所以通過以上例子可以說明:map等運算元內部可以引用外部變數和某類的成員變數,但是要做好該類的序列化處理。首先是該類需要繼承Serializable類,此外,對於類中某些序列化會出錯的成員變數做好處理,這也是Task未序列化問題的主要原因。對於出現這類問題,首先檢視未能序列化的成員變數是哪個,對於可以不需要序列化的成員變數可使用“@transent”標註。 
此外,也不是map操作所在的類必須序列化不可(繼承Serializable類),對於不需要引用某類成員變數或函式的情形,就不會要求相應的類必須實現序列化,如下面的例子所示,filter操作內部沒有引用任何類的成員變數或函式,因此當前類不用序列化,程式可正常執行。

class MyTest1(conf:String) {
val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");
private val sparkConf = new SparkConf().setAppName("AppName");
private val sc = new SparkContext(sparkConf);
val rdd = sc.parallelize(list);

def getResult(): Array[(String)] = {
val rootDomain = conf
val result = rdd.filter(item => item.contains(rootDomain))
result.take(result.count().toInt)
}
}

解決辦法與程式設計建議

承上所述,這個問題主要是引用了某類的成員變數或函式,並且相應的類沒有做好序列化處理導致的。因此解決這個問題無非以下兩種方法:

  1. 不在(或不直接在)map等閉包內部直接引用某類(通常是當前類)的成員函式或成員變數

  2. 如果引用了某類的成員函式或變數,則需對相應的類做好序列化處理

一、不在(或不直接在)map等閉包內部直接引用某類成員函式或成員變數

(1)對於依賴某類成員變數的情形

  • 如果程式依賴的值相對固定,可取固定的值,或定義在map、filter等操作內部,或定義在scala 
    object物件中(類似於Java中的static變數)

  • 如果依賴值需要程式呼叫時動態指定(以函式引數形式),則在map、filter等操作時,可不直接引用該成員變數,而是在類似上面例子的getResult函式中根據成員變數的值重新定義一個區域性變數,這樣map等運算元就無需引用類的成員變數。

(2)對於依賴某類成員函式的情形

  • 如果函式功能獨立,可定義在scala object物件中(類似於Java中的static方法),這樣就無需一來特定的類。

二、如果引用了某類的成員函式或變數,則需對相應的類做好序列化處理

對於這種情況,則需對該類做好序列化處理,首先該類繼承序列化類,然後對於不能序列化的成員變數使用“@transent”標註,告訴編譯器不需要序列化。 
此外如果可以,可將依賴的變數獨立放到一個小的class中,讓這個class支援序列化,這樣做可以減少網路傳輸量,提高效率。