spark利器2函式之dataframe全域性排序id與分組後保留最大值行

語言: CN / TW / HK

spark利器2函式之dataframe全域性排序id與分組後保留最大值行


作為一個演算法工程師,日常學習和工作中,不光要 訓練模型關注效果 ,更多的 時間 是在 準備樣本資料與分析資料 等,而這些過程 都與 大資料 spark和hadoop生態 的若干工具息息相關。

今天我們就不在更新 機器學習演算法模型 相關的內容,分享兩個 spark函式 吧,以前也在某種場景中使用過但沒有儲存收藏,哎!! 事前不蒐藏,臨時抱佛腳 的感覺 真是 痛苦,太耽誤幹活了

so,把這 兩個函式 記在這裡 以備不時 之需~


(1) 得到 spark dataframe 全域性排序ID

這個函式的 應用場景 就是:根據某一列的數值對 spark 的 dataframe 進行排序, 得到全域性多分割槽排序的全域性有序ID,新增一列儲存這個rank id ,並且保留別的列的資料無變化

有使用者會說,這不是很容易嗎 ,直接用 orderBy 不就可以了嗎,但是難點是:orderBy完記錄下全域性ID 並且 保持原來全部列的DF資料

多說無益,遇到這個場景 直接copy 用起來 就知道 有多爽 了,同類問題 我們可以 用下面 這個函式 解決 ~

scala 寫的 spark 版本程式碼:

```

@ 歡迎關注微信公眾號:演算法全棧之路

def dfZipWithIndex(   df: DataFrame,   offset: Int = 1,   colName: String ="rank_id",   inFront: Boolean = true ) : DataFrame = {   df.sqlContext.createDataFrame(     df.rdd.zipWithIndex.map(ln =>       Row.fromSeq(         (if (inFront) Seq(ln._2 + offset) else Seq())           ++ ln._1.toSeq ++         (if (inFront) Seq() else Seq(ln._2 + offset))       )     ),     StructType(       (if (inFront) Array(StructField(colName,LongType,false)) else ArrayStructField)         ++ df.schema.fields ++       (if (inFront) ArrayStructField else Array(StructField(colName,LongType,false)))     )   ) } ```

函式呼叫我們可以用這行程式碼呼叫: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接複製過去就可以~

python寫的 pyspark 版本程式碼:

``` @ 歡迎關注微信公眾號:演算法全棧之路

from pyspark.sql.types import LongType, StructField, StructType

def dfZipWithIndex (df, offset=1, colName="rank_id"):     new_schema = StructType(                     [StructField(colName,LongType(),True)]        # new added field in front                     + df.schema.fields                            # previous schema                 )     zipped_rdd = df.rdd.zipWithIndex()     new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))     return spark.createDataFrame(new_rdd, new_schema) ```

呼叫 同理 , 這裡我就不在進行贅述了。


(2)分組後保留最大值行

這個函式的 應用場景 就是: 當我們使用 spark 或則 sparkSQL 查詢某個 dataframe 資料的時候,在某一天裡,任意一個使用者可能有多條記錄,我們需要 對每一個使用者,保留dataframe 中 某列值最大 的那行資料

其中的 關鍵點 在於:一次性求出對每個使用者分組後,求得每個使用者的多行記錄中,某個值最大的行進行資料保留

當然,經過 簡單修改程式碼,不一定是最大,最小也是可以的,平均都ok

scala 寫的 spark 版本程式碼:

```

@ 歡迎關注微信公眾號:演算法全棧之路

// 得到一天內一個使用者多個記錄裡面時間最大的那行使用者的記錄 import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions

val w = Window.partitionBy("user_id") val result_df = raw_df     .withColumn("max_time",functions.max("time").over(w))     .where($"time" === $"max_time")     .drop($"max_time") ```

python寫的 pyspark 版本程式碼:

``` @ 歡迎關注微信公眾號:演算法全棧之路

pyspark dataframe 某列值最大的元素所在的那一行

GroupBy 列並過濾 Pyspark 中某列值最大的行

建立一個Window 以按A列進行分割槽,並使用它來計算每個組的最大值。然後過濾出行,使 B 列中的值等於最大值

from pyspark.sql import Window w = Window.partitionBy('user_id')

result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\     .where(fun.col('time') == fun.col('time'))     .drop('max_time') ```

我們可以看到: 這個函式的關鍵就是運用了 spark 的 window 函式 ,靈活運用 威力無窮 哦 !

到這裡,spark利器2函式之dataframe全域性排序id與分組後保留最大值行 的全文 就寫完了 。

看到不蒐藏,用時忙斷魂哦!! 趕緊收藏轉發吧~


碼字不易,覺得有收穫就動動小手轉載一下吧,你的支援是我寫下去的最大動力 ~

更多更全更新內容,歡迎關注作者的公眾號: 演算法全棧之路

  • END -