客戶流失?來看看大廠如何基於spark+機器學習構建千萬資料規模上的使用者留存模型 ⛵

語言: CN / TW / HK

highlight: a11y-dark

攜手創作,共同成長!這是我參與「掘金日新計劃 · 8 月更文挑戰」的第13天,點選檢視活動詳情

💡 背景

Sparkify 是一個音樂流媒體平臺,使用者可以獲取部分免費音樂資源,也有不少使用者開啟了會員訂閱計劃(參考QQ音樂),在Sparkify中享受優質音樂內容。

使用者可以隨時對自己的會員訂閱計劃降級甚至取消,而當下極其內卷和競爭激烈的大環境下,獲取新客的成本非常高,因此維護現有使用者並確保他們長期會員訂閱至關重要。同時因為我們有很多使用者在平臺的歷史使用記錄,基於這些資料支撐去挖掘客戶傾向,定製合理的業務策略,也更加有保障和資料支撐。

但現在稍大一些的網際網路公司,資料動輒成百上千萬,我們要在這麼巨大的資料規模下完成挖掘與建模,又要藉助各種處理海量資料的大資料平臺。在本文中ShowMeAI將結合 Sparkify 的業務場景和海量資料,講解基於 Spark 的客戶流失建模預測案例。

本文涉及到大資料處理分析及機器學習建模相關內容,ShowMeAI為這些內容製作了詳細的教程與工具速查手冊,大家可以通過如下內容展開學習或者回顧相關知識。 - 圖解資料分析:從入門到精通系列教程 - 圖解大資料技術:從入門到精通系列教程 - 圖解機器學習演算法:從入門到精通系列教程 - 資料科學工具庫速查表 | Spark RDD 速查表 - 資料科學工具庫速查表 | Spark SQL 速查表

💡 資料

本文用到的 Sparkify 資料有3個大小的資料規格,大家可以根據自己的計算資源情況,選擇合適的大小,本文程式碼都相容和匹配,對應的資料大家可以通過ShowMeAI的百度網盤地址獲取。

🏆 實戰資料集下載(百度網盤):公眾號『ShowMeAI研究中心』回覆『實戰』,或者點選 這裡 獲取本文 [9] Spark 海量資料上的使用者留存分析挖掘與建模sparkify 使用者流失資料集

ShowMeAI官方GitHubhttps://github.com/ShowMeAI-Hub

  • mini_sparkify_event_data.json: 最小的資料子集 (125 MB)
  • medium-sparkify-event-data.json: 中型大小資料子集 (237 MB)
  • sparkify_event_data.json: 全量資料 (12 GB)

💡 探索性資料分析(EDA)

在進行建模之前,我們首先要深入瞭解我們的資料,這可以幫助我們更有針對性地構建特徵和選擇模型。也就是ShowMeAI之前提到過的「探索性資料分析(EDA)」的過程。

① 匯入工具庫

```python

基礎資料處理與繪圖

import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt import requests from datetime import datetime

spark相關

from pyspark.sql import SparkSession from pyspark.sql import Window, Row import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, FloatType ```

② 初步資料探索

Sparkify 資料集中,每一個使用者的行為都被記錄成了一條帶有時間戳的操作記錄,包括使用者登出、播放歌曲、點讚歌曲和降級訂閱計劃等。

```python

初始化spark session

spark_session = SparkSession.builder \ .master("local") \ .appName("sparkify") \ .getOrCreate()

載入資料與持久化

src = "data/mini_sparkify_event_data.json" df = spark_session.read.json(src)

構建檢視(方便查詢)

df.createOrReplaceTempView("sparkify_table") df.persist()

檢視前5行資料

df . limit(5) . toPandas() ```

用全量資料集(12GB)做EDA可能會消耗大量的資源且很慢,所以這個過程我們選擇小子集(128MB)來完成,如果取樣方式合理,小子集上的資料分佈能很大程度體現全量資料上的分佈特性。

對於中小資料集上的EDA大家可以參考ShowMeAI分享過的自動化資料分析工具,可以更快捷地獲取一些資料資訊與分析結論。 - 自動化資料分析 (EDA) 工具庫大全

📌 基礎資料維度資訊

```python

檢視資料維度資訊

print(f'資料集有 {len(df.columns)} 列') print(f'資料集有 {df.count()} 行') ```

結果顯示有 18 列 和 286500 行。

實際這份小子集中只有 225 個唯一使用者 ID,這意味著平均每個客戶與平臺有 286500/225≈1200 多個互動操作。

📌 欄位資訊

```python

檢視欄位資訊

df . printSchema() ```

我們通過上述命令檢視資料欄位資訊,輸出結果如下,包含欄位名和型別等:

python |-- artist: string (nullable = true) |-- auth: string (nullable = true) |-- firstName: string (nullable = true) |-- gender: string (nullable = true) |-- itemInSession: long (nullable = true) |-- lastName: string (nullable = true) |-- length: double (nullable = true) |-- level: string (nullable = true) |-- location: string (nullable = true) |-- method: string (nullable = true) |-- page: string (nullable = true) |-- registration: long (nullable = true) |-- sessionId: long (nullable = true) |-- song: string (nullable = true) |-- status: long (nullable = true) |-- ts: long (nullable = true) |-- userAgent: string (nullable = true) |-- userId: string (nullable = true)

我們獲取的一些初步資訊如下:

  • 字串型別的欄位包括 song, artist, genderlevel
  • 一些時間和ID類的欄位特徵 ts(時間戳),registration(時間戳),pageuserId
  • 可能作用不太大的一些欄位 firstName , lastName , method , status , userAgentauth等(等待進一步挖掘)

📌 時間跨度資訊

```python

排序

df = df . sort('ts', ascending= False)

獲取最大最小時間戳

df . select(F . max(df . ts), F . min(df . ts)) . show() ```

```python

https://www.programiz.com/python-programming/datetime/timestamp-datetime

轉換為日期

print("Min date =", datetime.fromtimestamp(1538352117000 / 1000)) print("Max date =", datetime.fromtimestamp(1543799476000 / 1000)) ```

```python

最早註冊時間

df.select(F.min(df.registration)).show() print("Min register =", datetime.fromtimestamp(1521380675000 / 1000)) ```

📌 欄位分佈

```python

統計欄位的不同取值數量

cols = df.columns n_unique = []

for col in cols: n_unique.append(df.select(col).distinct().count())

pd.DataFrame(data={'col':cols, 'n_unique':n_unique}).sort_values('n_unique', ascending=False) ```

結果如下,ID類的屬性有最多的取值,其他的欄位屬性相對集中。

📌 類別型取值分佈

我們來看看上面分析的尾部,分佈比較集中的類別型欄位的取值有哪些。

python # method df . select(['method']) . distinct() . show()

```python

level

df.select(['level']).distinct().show() ```

python # status df . select(['status']) . distinct() . show()

```python

gender

df.select(['gender']).distinct().show() ```

python # auth df . select(['auth']) . distinct() . show()

我們再看看取值中等和較多的欄位

python # page df . select(['page']) . distinct() . show()

```python

userAgent

df.select(['userAgent']).distinct().show() ```

```python

artist

df.select(['artist']).distinct().show() ```

```python

song

df.select(['song']).distinct().show() ```

③ 缺失值分析

我們首先剔除掉userId為空的資料記錄,總共刪除了 8,346 行。

python no_userId = df . where(df . userId == "") no_userId . count()

python no_userId . limit(10) . toPandas()

```python

構建無userId缺失資料的檢視

df = df . where(df . userId != "") df . createOrReplaceTempView("sparkify_table") ```

我們再統計一下其他欄位的缺失狀況

```python

類別型欄位

general_string_type = ['auth', 'firstName', 'gender', 'lastName', 'level', 'location', 'method', 'page', 'userAgent', 'userId'] for col in general_string_type: null_vals = df.select(col).where(df[col].isNull()).count() print(f'{col}: {null_vals}')

數值型欄位

numerical_cols = ['itemInSession', 'length', 'registration', 'sessionId', 'status', 'ts'] for col in numerical_cols: null_vals = df.select(col).where(df[col] == np.nan).count() print(f'{col}: {null_vals}') ```

```python

直接統計缺失值並輸出資訊

Reference

https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/

def make_missing_bool_index(c): ''' Generates boolean index to check missing value/NULL values @param c (string) - string of column of dataframe returns boolean index created ''' # removed checking these 2 since they would flag some incorrect rows, e.g. the song "None More Black" would be flagged # col(c).contains('None') | \ # col(c).contains('NULL') | \

bool_index = (F.col(c) == "") | \
F.col(c).isNull() | \
F.isnan(c)
return bool_index

missing_count = [F.count(F.when(make_missing_bool_index(c), c)).alias(c) for c in df.columns]

df.select(missing_count).toPandas() ```

④ EDA洞察&結論

由於我們的資料是基於各種有時間戳的交易來組織的,以事件為基礎(基於 "頁 "列),我們需要執行額外的特徵工程來定製我們的資料以適應我們的機器學習模型。

📌 目標&問題

  • 使用者流失是什麼意思?是指取消訂閱嗎?

📌 重要欄位列

  • ts - 時間戳,在以下場景有用
    • 訂閱與取消之間的時間點資訊
    • 構建「聽歌的平均時間」特徵
    • 構建「聽歌之間的時間間隔」特徵
    • 基於時間戳構建資料樣本,比如選定使用者流失前的3個月或6個月
  • registration - 時間戳 - 用於識別交易的範圍
  • page - 使用者正在參與的事件
    • 本身並無用處
    • 需要進一步特徵工程,從頁面型別中提取資訊,或結合時間戳等資訊
  • userId
    • 本身並無用處
    • 基於使用者分組完成統計特徵

📌 配合特徵工程有用的欄位列

  • song - 歌名,可用於構建類似下述的特徵:
    • 使用者聽的不同歌曲數量
    • 使用者聽同一首歌的次數
  • artist- 歌手,可用於構建類似下述的特徵:
    • 每個使用者收聽的歌手數量
  • 因為是明文的歌名,我們甚至可以通過外部API補充資訊構建特徵:
    • 使用者收聽的音樂型別(並觀察型別是否影響流失率)。
  • gender - 性別
    • 不同性別的人可能有不同的音樂偏好。
  • level - 等級
    • 區分使用者是免費的還是付費的
  • location - 地區
    • 地域差別

📌 無用欄位列(我們會直接刪除)

  • firstNamelastName - 名字一般在模型中很難直接給到資訊。
  • method - 僅僅有PUT或GET取值,是網路請求型別,作用不大。
  • status- 僅僅是API響應,例如200/404,作用不大。
  • userAgent--指定使用者使用的瀏覽器型別
    • 有可能不同瀏覽器代表的使用者群體有差別,這個可以進一步調研
  • auth - 登入登出等資訊,作用不大

💡 資料處理

① 定義流失

我們的 page功能有 22 個獨特的標籤,代表使用者點選或訪問的頁面,結合上面的資料分析大家可以看到頁面包括關於登入註冊等。

可以幫助我們定義流失的頁面是 Cancellation Confirmation,表示 免費 和 付費 使用者均存在流媒體平臺。

```python

定義流失使用者

is_churn = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType()) df = df.withColumn("churn", is_churn(df.page)) df.createOrReplaceTempView("sparkify_table")

user_window = Window \ .partitionBy('userId') \ .orderBy(F.desc('ts')) \ .rangeBetween(Window.unboundedPreceding, 0)

manually define schema

https://stackoverflow.com/questions/40517553/pyspark-valueerror-some-of-types-cannot-be-determined-after-inferring

tmp_row = spark_local.sparkContext.parallelize(Row(second_row)).toDF(schema=df.schema) df.where(df.userId == 100001).union(tmp_row).withColumn('pre_churn', F.sum('churn').over(user_window)).limit(5).toPandas()

df = df.withColumn('preChurn', F.sum('churn').over(user_window)) df.createOrReplaceTempView("sparkify_table") ```

對使用者流失情況做簡單分析

python spark_local.sql(''' SELECT SUM(churn) FROM sparkify_table GROUP BY userId ''').toPandas().value_counts()

在我們取樣出來的小資料集中:有225 個使用者, 23%(52 個使用者)流失 。

② 特徵工程

關於特徵工程可以參考ShowMeAI的以下文章詳解

本文中所使用到的特徵工程如下:

  • ① 歌曲和歌手相關: uniqueSongs, uniqueArtists, uniqueSongArtist.
  • ② 使用者服務時長: dayServiceLen(註冊到上次與網站互動之間的天數)
  • ③ 使用者行為統計: countListen(收聽次數), countSession(session數量), lengthListen(聽的總時長)
  • ④ 使用②和③的組合 lengthListenPerDay, countListenPerDay, sessionPerDay
  • ⑤ 針對一些統計值(countListen , countSession, 和 lengthListen等)計算的差異度。

📌 清理資料

```python

清理資料

def clean_data(df): ''' Cleans raw dataframe to: i. sort values ii. remove null userId rows @param df: raw spark dataframe returns updated spark dataframe ''' # sort values df = df.sort('ts', ascending=False) # remove null userIds df = df.where(df.userId != "") return df ```

📌 定義使用者流失標籤

```python

定義使用者流失

def define_churn(df): ''' Define churn @param df - spark dataframe returns updated spark dataframe ''' # define churn as cancellation confirmation is_churn = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType()) df = df.withColumn("churn", is_churn(df.page)) return df ```

📌 清理髒資料

有一部分使用者在流失之後,還有一些資料資訊,這可能是時間戳的問題,我們把這部分資料清理掉

```python

清理髒資料

def remove_post_churn_rows(df, spark, sql_table): ''' Remove post-churn rows @param df - spark dataframe @param spark - SparkSession instance @param sql_table - string representing name of sql table returns updated spark dataframe ''' # define window function to mark non-churn related rows user_window = Window \ .partitionBy('userId') \ .orderBy(F.desc('ts')) \ .rangeBetween(Window.unboundedPreceding, 0) df = df.withColumn('preChurn', F.sum('churn').over(user_window)) # remove rows for userIds which are marked as churn but have a timestamp after the 'Cancellation Confirmation' page # define GROUP BY and merge against larger df churn_df = spark.sql(f''' SELECT userId AS tmpId, MAX(churn) AS tmpChurn FROM {sql_table} GROUP BY userId ''') df = df.join(churn_df, df.userId == churn_df.tmpId, "left") # remove instances where churned userIds have transctions post Cancellation Confirmation df = df.where(~((df.preChurn == 0) & (df.tmpChurn == 1))) # remove tmp rows df = df.drop('tmpId', 'tmpChurn') return df ```

📌 時間特徵

```python def prelim_feature_eng(df): ''' Feature engineer columns: i timeSinceRegister ii. columns representing time scope of entry @param df: raw spark dataframe returns updated spark dataframe ''' # create new column representing time since registration (ms) time_since_register = F.col('ts') - F.col('registration') df = df.withColumn("timeSinceRegister", time_since_register)

# create 3 new columns representing when row data relates to
mth_3 = 60 * 60 * 24 * 90
mth_6 = 60 * 60 * 24 * 180
mth_12 = 60 * 60 * 24 * 365
mth_3_f = F.udf(lambda x : 1 if x / 1000 <= mth_3 else 0, IntegerType())
mth_6_f = F.udf(lambda x : 1 if x / 1000 <= mth_6 else 0, IntegerType())
mth_12_f = F.udf(lambda x : 1 if x / 1000 <= mth_12 else 0, IntegerType())
df = df.withColumn("month3", mth_3_f(df.timeSinceRegister))\
    .withColumn("month6", mth_6_f(df.timeSinceRegister))\
    .withColumn("month12", mth_12_f(df.timeSinceRegister))
return df

```

📌 統計&組合特徵

```python def melt_data(df, spark, sql_table): ''' Melts data to show entries on a user basis for the following columns: - userId - gender - level - location - uniqueSongs - uniqueArtists - dayServiceLen - countListen1H, - countSession1H, - lengthListen1H, - countListen2H, - countSession2H, - lengthListen2H - churn @param df - spark dataframe @param spark - SparkSession instance @param sql_table - string representing name of sql table returns updated spark datafraem ''' melt1 = spark.sql(f''' SELECT userId, MIN(gender) AS gender, MIN(level) AS level, MAX(location) AS location, COUNT(DISTINCT(song)) AS uniqueSongs, COUNT(DISTINCT(artist)) AS uniqueArtists, COUNT(DISTINCT(song, artist)) AS uniqueSongArtist, MAX(Churn) AS churn FROM {sql_table} GROUP BY userId ''') melt2 = spark.sql(f''' WITH sparkify_table_upt AS ( SELECT * FROM {sql_table} WHERE page = "NextSong" ), msServiceTable AS ( SELECT userId, MAX(ts) - MIN(ts) AS msServiceLen, MIN(ts) + (MAX(ts) - MIN(ts)) / 2 AS midTs FROM sparkify_table_upt GROUP BY userId ), earlyHalfTable AS ( SELECT a.userId, COUNT(1) AS countListen1H, COUNT(DISTINCT(a.sessionId)) AS countSession1H, SUM(a.length) AS lengthListen1H FROM sparkify_table_upt AS a LEFT JOIN msServiceTable AS b ON b.userId = a.userId WHERE a.ts < b.midTs GROUP BY a.userId ), lateHalfTable AS ( SELECT a.userId, COUNT(1) AS countListen2H, COUNT(DISTINCT(a.sessionId)) AS countSession2H, SUM(a.length) AS lengthListen2H FROM sparkify_table_upt AS a LEFT JOIN msServiceTable AS b ON b.userId = a.userId WHERE a.ts >= b.midTs GROUP BY a.userId ), concatTable AS ( SELECT m.userId AS tmpUserId, milisecToDay(msServiceLen) AS dayServiceLen, countListen1H + countListen2H AS countListen, countSession1H + countSession2H AS countSession, lengthListen1H + lengthListen2H AS lengthListen, countListen2H - countListen1H AS countListenDiff, countSession2H - countSession1H AS countSessionDiff, lengthListen2H - lengthListen1H AS lengthListenDiff FROM msServiceTable as m LEFT JOIN earlyHalfTable as e ON e.userId = m.userId LEFT JOIN lateHalfTable AS l ON l.userId = m.userId ) SELECT *, lengthListen / dayServiceLen AS lengthListenPerDay, countListen / dayServiceLen AS countListenPerDay, countSession / dayServiceLen AS sessionPerDay, lengthListen / countListen AS lengthPerListen, lengthListen / countSession AS lengthPerSession FROM concatTable

''')
melt_concat = melt1.join(melt2, melt1.userId == melt2.tmpUserId, "Left")
melt_concat = melt_concat.drop('tmpUserId')
return melt_concat

```

📌 位置資訊

```python def location_feature_eng(df, census): ''' Create 2 new columns from location -> Region and Division @param df: raw spark dataframe @param census: csv file containing location mapping based on state code returns updated spark dataframe ''' # some census data contains two states, for simplicity, selecting last location map_region = F.udf(lambda x: census.loc[census['State Code'] == x[-2:], 'Region'].iloc[0], StringType()) map_division = F.udf(lambda x: census.loc[census['State Code'] == x[-2:], 'Division'].iloc[0], StringType())

df = df.withColumn("region", map_region(df.location))\
    .withColumn("division", map_division(df.location))
return df

```

📌 組織資料&特徵流水線

```python

讀資料

df_train = spark_session.read.json(src)

剔除無用欄位

df_train = df_train.drop('firstName', 'lastName', 'method', 'status', 'userAgent', 'auth')

清理資料

df_train = clean_data(df_train) df_train = define_churn(df_train) df_train.createOrReplaceTempView("table")

清除髒資料

df_train = remove_post_churn_rows(df_train, spark_local, "table")

基礎特徵

df_train = prelim_feature_eng(df_train)

更新表

df_train.createOrReplaceTempView("table")

新增更多特徵

df_melt = melt_data(df_train, spark_local, "table") df_melt = location_feature_eng(df_melt, census) ```

📌 檢視資料特徵

python pd_melt = df_melt . toPandas() pd_melt . describe()

💡 進一步資料探索

① 流失率

```python predictor = pd_melt['churn'].value_counts()

print(predictor)

plt.title('Churn distribution') predictor.plot.pie(autopct='%.0f%%') plt.show() ```

② 數值vs類別型特徵

```python label = 'churn' categorical = ['gender', 'level' , 'location', 'region', 'division'] numerical = ['uniqueSongs', 'uniqueArtists', 'uniqueSongArtist', 'dayServiceLen', \ 'countListen', 'countSession', 'lengthListen', 'countListenDiff', 'countSessionDiff',\ 'lengthListenDiff', 'lengthListenPerDay', 'countListenPerDay',\ 'sessionPerDay', 'lengthPerListen', 'lengthPerSession']

plt.title('Distribution of numerical/categorical features') plt.pie([len(categorical), len(numerical)], labels=['categorical', 'numerical'], autopct='%.0f%%') plt.show() ```

在我們所有的特徵中,25% 是類別型的。

③ 數值型特徵分佈

📌 數值特徵&流失分佈

```python def plot_distribution(df, hue, filter_col=None, bins='auto'): ''' Plots distribution of numerical columns By default, exclude object, datetime, timedelta and bool types and only consider numerical columns @param df (DataFrame) - dataset @param hue (str) - column of dataset to apply hue (useful for classification) @param filter_col (array) - optional argument, features to be included in plot @param bins (int) - defaults to auto for seaborn, sets number of bins of histograms ''' if filter_col == None: filter_col = df.select_dtypes(exclude=['object', 'datetime', 'timedelta', 'bool']).columns num_cols = len(list(filter_col)) width = 3 height = num_cols // width if num_cols % width == 0 else num_cols // width + 1 plt.figure(figsize=(18, height * 3)) for i, col in zip(range(num_cols), filter_col): plt.subplot(height, width, i + 1) plt.xlabel(col) plt.ylabel('Count') plt.title(f'Distribution of {col}') sns.histplot(df, x=col, hue=hue, element="step", stat="count", common_norm=False, bins=bins) plt.tight_layout() plt.show()

# 繪製數值型特徵分佈圖 plot_distribution(pd_melt, 'churn', filter_col=numerical) ```

我們的數值型特徵上可以看出:

  • 流失與非流失使用者都有右偏傾向的分佈
  • dayServiceLen欄位有最明顯的流失客戶和非流失客戶分佈差異。

📌 數值型特徵相關度

```python

定義數值型特徵

numerical_churn = numerical + ['churn']

計算相關性

corr_data = pd_melt[numerical_churn].corr()

繪製熱力圖顯示相關性

plt.figure(figsize=(16,16)) plt.title('Heat map of correlation for all variables') matrix = np.triu(corr_data) sns.heatmap(corr_data, cmap='Blues', annot=True, mask=matrix) plt.show() ```

  • 我們從熱力圖上沒有看到有數值型特徵流失標籤列有明顯的高相關性。
  • 有幾組特徵,uniqueArtists、uniqueSongArtist、countListen、countSession和lengthListen,它們之間有非常高的相關性。如果大家使用線性模型,可以考慮做特徵選擇,我們後續使用非線性模型的話,可以考慮保留。

④ 類別型特徵的分佈

```python def plot_cat_distribution(data, colname): ''' Plots barplot for categorical columns and piechart showing proportions of churned vs non-churned customers @param - data (panas dataframe) @param - colname (str) - column of dataframe referenced ''' # https://www.statology.org/seaborn-stacked-bar-plot/ plt.figure(figsize=(15,5)) ax1 = plt.subplot(1, 3, 1) tmp = data.copy() tmp['count'] = 1 x = tmp.groupby([colname, 'churn']).count().reset_index()[[colname, 'churn','count']] # churn index 0, 1 doesn't relate to No, Yes, relates to pivoted index only x = x.pivot(index='churn', columns=colname).transpose().reset_index().drop('level_0', axis=1) x = x.fillna(0)

plt.title(f'Distribution of {colname}')
plt.ylabel('Count')
x.plot.bar(x=colname, stacked=True, ax=ax1, color=['green', 'lightgreen'])

ax2 = plt.subplot(1, 3, 2)
plt.title(f'Proportion of {colname} for churned customers')
plt.pie(x['Yes'], labels=x[colname], autopct='%.0f%%')

plt.subplot(1, 3, 3)
plt.title(f'Proportion of {colname} for non-churned customers')
plt.pie(x['No'], labels=x[colname], autopct='%.0f%%')

plt.tight_layout()
plt.show()

x.index.rename('index', inplace=True)
print(x)
tmp_sum = x[['No','Yes']].sum(axis=1)
x['No'] = x['No'] / tmp_sum
x['Yes'] = x['Yes'] / tmp_sum
print(x)
print(tmp_sum / tmp_sum.sum())

tmp_pd_melt = pd_melt.copy() tmp_pd_melt['churn'] = tmp_pd_melt['churn'].apply(lambda x: 'Yes' if x == 1 else 'No') ```

📌 性別&流失分佈

python plot_cat_distribution(tmp_pd_melt, 'gender')

流失客戶的男性比例更高。

📌 等級&流失分佈

python plot_cat_distribution(tmp_pd_melt, 'level')

免費和付費客戶的流失比例幾乎沒有差異(差2%),雖然圖上表明付費客戶流失的可能性稍小一點,但這個特徵在建模過程中可能作用不大。

📌 地區&流失分佈

python plot_cat_distribution(tmp_pd_melt, 'region')

圖上可以看出地區有一些差異,南部地區的流失要嚴重一些,相比之下北部地區的流失使用者少一些。

可以進一步對地區細化和繪圖

python plot_cat_distribution(tmp_pd_melt, 'division')

📌 類別型特徵取值數量分佈

```python def cardinality_plot(df, filter_col=None): ''' Input list of categorical variables to filter Default is None where it would only consider columns which have type 'Object' @param df (DataFrame) - dataset @param filter_col (array) - optional argument to specify columns we want to filter ''' if filter_col == None: filter_col = df.select_dtypes(include='object').columns num_unique = [] for col in filter_col: num_unique.append(len(df[col].unique())) plt.bar(list(filter_col), num_unique) plt.title('Number of unique categorical variables') plt.xlabel('Column name') plt.ylabel('Num unique') plt.xticks(rotation=90) plt.yticks([0, 1, 2, 3, 4]) plt.show() return pd.Series(num_unique, index=filter_col).sort_values(ascending=False)

cardinality_plot(pd_melt, categorical) ```

直接看最喜歡的location,取值數量有點太多了,我們可以考慮用粗粒度的地理位置資訊,可能區分能力會強一些。

下述部分,我們會使用spark進行特徵工程&大資料建模與調優,相關內容可以閱讀ShowMeAI的以下文章,我們對它的用法做了詳細的講解 - 圖解大資料 | 工作流與特徵工程@Spark機器學習 - 圖解大資料 | 建模與超參調優@Spark機器學習

💡 建模優化

我們先對數值型特徵做一點小小的資料變換(這裡用到的是log變換),這樣我們的原始數值型特徵分佈可以得到一定程度的校正。

```python def log_transform(df, columns): ''' Log trasform columns in dataframe @df - spark dataframe @columns - array of string of column names to be log transformed returns updated spark dataframe ''' log_transform_func = F.udf(lambda x: np.log10(x + 1), FloatType()) for col in columns: df = df.withColumn(col, log_transform_func(df[col])) return df

數值型特徵log變換

df_melt = log_transform(df_melt, numerical) ```

① 資料切分

接下來我們把資料集拆分為 60:20:20 的3部分,分別用於訓練、驗證和測試。

python df_melt_copy = df_melt . withColumn("label", df_melt . churn) rest, test = df_melt_copy.randomSplit([0.8, 0.2], seed=42) train, val = rest.randomSplit([0.75, 0.25], seed=42)

② 建模流水線

```python

匯入工具庫

from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler, OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix, ConfusionMatrixDisplay

import re

數值型特徵處理流水線

numerical_assembler = VectorAssembler(inputCols=numerical, outputCol="numericalFeatures") standardise = StandardScaler(inputCol="numericalFeatures", outputCol="standardNumFeatures", withStd=True, withMean=True) minmax = MinMaxScaler(inputCol="standardNumFeatures", outputCol="minmaxNumFeatures")

類別型特徵處理流水線

inputCols = ['gender', 'level', 'region', 'division'] outputColsIndexer = [x + 'SI' for x in inputCols] indexer = StringIndexer(inputCols = inputCols, outputCols=outputColsIndexer) outputColsOH = [x + 'OH' for x in inputCols] onehot = OneHotEncoder(inputCols=outputColsIndexer, outputCols=outputColsOH) categorical_assembler = VectorAssembler(inputCols=outputColsOH, outputCol="categoricalFeatures")

組合兩類特徵

total_assembler = VectorAssembler(inputCols=['minmaxNumFeatures', 'categoricalFeatures'], outputCol='features') pipeline = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler]) ```

```python

執行流水線對資料進行處理

pipeline . fit(train) . transform(train) . head() ```

得到如下結果

python Row(userId='10', gender='M', level='paid', location='Laurel, MS', uniqueSongs=629, uniqueArtists=565, uniqueSongArtist=633, churn=0, dayServiceLen=42.43672561645508, countListen=673, countSession=6, lengthListen=166866.37250999993, countListenDiff=-203, countSessionDiff=2, lengthListenDiff=-48180.54478999992, lengthListenPerDay=3932.121766842835, countListenPerDay=15.858904998528928, sessionPerDay=0.14138696878331883, lengthPerListen=247.94408991084686, lengthPerSession=27811.062084999987, region='South', division='East South Central', label=0, numericalFeatures=DenseVector([629.0, 565.0, 633.0, 42.4367, 673.0, 6.0, 166866.3725, -203.0, 2.0, -48180.5448, 3932.1218, 15.8589, 0.1414, 247.9441, 27811.0621]), standardNumFeatures=DenseVector([-0.3973, -0.331, -0.3968, -0.016, -0.3968, -0.6026, -0.3993, -0.6779, 0.6836, -0.6549, -0.3678, -0.3625, -0.1256, -0.1374, 1.1354]), minmaxNumFeatures=DenseVector([0.1053, 0.1587, 0.1034, 0.6957, 0.0838, 0.0392, 0.0835, 0.5701, 0.5, 0.5692, 0.0264, 0.0245, 0.0002, 0.5344, 0.56]), genderSI=0.0, levelSI=1.0, regionSI=0.0, divisionSI=4.0, genderOH=SparseVector(1, {0: 1.0}), levelOH=SparseVector(1, {}), regionOH=SparseVector(3, {0: 1.0}), divisionOH=SparseVector(8, {4: 1.0}), categoricalFeatures=SparseVector(13, {0: 1.0, 2: 1.0, 9: 1.0}), features=DenseVector([0.1053, 0.1587, 0.1034, 0.6957, 0.0838, 0.0392, 0.0835, 0.5701, 0.5, 0.5692, 0.0264, 0.0245, 0.0002, 0.5344, 0.56, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]))

③ 初步建模&評估

我們先定義一個模型評估函式,因為是類別非均衡場景,我們這裡覆蓋比較多的評估準則,包括常用的precision、recall以及排序準則auc等。

```python

模型評估函式

def evaluate_model(y_trueTrain, y_predTrain, y_trueTest, y_predTest, y_testProba): ''' Wrapper function for evaluating classification results ''' train_acc = accuracy_score(y_trueTrain, y_predTrain) test_acc = accuracy_score(y_trueTest, y_predTest) fscore = f1_score(y_trueTest, y_predTest, zero_division=0) precision = precision_score(y_trueTest, y_predTest, zero_division=0) recall = recall_score(y_trueTest, y_predTest, zero_division=0) # linear models would not have .predict_proba method so, if fails, append 0 to roc_auc try: roc_auc = roc_auc_score(y_trueTest, y_testProba) except: roc_auc = 0 return { 'train_acc': train_acc, 'test_acc' : test_acc, 'fscore': fscore, 'precision': precision, 'recall': recall, 'roc_auc': roc_auc } ```

📌 邏輯迴歸

```python

定義模型

lr = LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0) pipeline_lr = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, lr])

擬合

lrModel = pipeline_lr.fit(train) lr_res_test = lrModel.transform(val).select('label', 'prediction', 'probability').toPandas() lr_res_train = lrModel.transform(train).select('label', 'prediction', 'probability').toPandas()

評估

lr_results = evaluate_model(lr_res_train['label'],lr_res_train['prediction'],lr_res_test['label'],lr_res_test['prediction'], lr_res_test['probability'].apply(lambda x: x[1])) lr_results ```

結果如下

python {'train_acc': 0.8456375838926175, 'test_acc': 0.8780487804878049, 'fscore': 0.7368421052631579, 'precision': 0.5833333333333334, 'recall': 1.0, 'roc_auc': 0.9579831932773109}

📌 梯度提升樹GBT

```python

定義模型

gbt = GBTClassifier() pipeline_gbt = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, gbt])

擬合

gbtModel = pipeline_gbt.fit(train) gbt_res_test = gbtModel.transform(val).select('label', 'prediction', 'probability').toPandas() gbt_res_train = gbtModel.transform(train).select('label', 'prediction', 'probability').toPandas()

評估

gbt_results = evaluate_model(gbt_res_train['label'],gbt_res_train['prediction'],gbt_res_test['label'],gbt_res_test['prediction'],\ gbt_res_test['probability'].apply(lambda x: x[1])) gbt_results ```

結果如下

python {'train_acc': 1.0, 'test_acc': 0.8048780487804879, 'fscore': 0.6, 'precision': 0.46153846153846156, 'recall': 0.8571428571428571, 'roc_auc': 0.8193277310924371}

📌 隨機森林

```python

定義模型

rf = RandomForestClassifier() pipeline_rf = Pipeline(stages=[numerical_assembler, standardise, minmax, indexer, onehot, categorical_assembler, total_assembler, rf])

擬合

rfModel = pipeline_rf.fit(train) rf_res_test = rfModel.transform(val).select('label', 'prediction', 'probability').toPandas() rf_res_train = rfModel.transform(train).select('label', 'prediction', 'probability').toPandas()

評估

rf_results = evaluate_model(rf_res_train['label'],rf_res_train['prediction'],rf_res_test['label'],rf_res_test['prediction'], rf_res_test['probability'].apply(lambda x: x[1])) rf_results ```

結果如下

python {'train_acc': 0.959731543624161, 'test_acc': 0.8780487804878049, 'fscore': 0.6666666666666666, 'precision': 0.625, 'recall': 0.7142857142857143, 'roc_auc': 0.9243697478991597}

📌 綜合對比

```python cv_results = pd.DataFrame(columns=['accuracy_train','accuracy_cv','fscore_cv','precision_cv','recall_cv', 'roc_auc_cv']) cv_results.loc['LogisticRegression'] = lr_results.values() cv_results.loc['GradientBoostingTree'] = gbt_results.values() cv_results.loc['RandomForest'] = rf_results.values()

cv_results.style.apply(lambda x: ["background: lightgreen" if abs(v) == max(x) else "" for v in x], axis = 0) ```

綜合對比結果如下:

我們在上述建模與評估過程中,綜合對比了訓練集和驗證集的結果。關於評估準則:

  • accuracy通常不是衡量類別非均衡場景下的分類好指標。 極端的情況下,僅預測我們所有的客戶“不流失”就達到 77% 的accuracy。
  • recall衡量我們的正樣本中有多少被模型預估為正樣本,即TP / (TP + FN),我們上述建模過程中,LogisticRegression正確識別所有會流失的客戶。
  • recall還需要結合precision一起看,例如,上述LogisticRegression預估的流失客戶中,只有 58% 真正流失了。 (這意味著如果我們要開展營銷活動來解決客戶流失問題,有42% (1 - 0.58) 的成本會浪費在未流失客戶身上)。
  • 可以使用 fscore 指標來綜合考慮recall和precision。
  • ROC_AUC 衡量我們的真陽性與假陽性率。 我們的 AUC 越高,模型在區分正類和負類方面的效能就越好。

上述指標中,我們優先關注ROC_AUC,其次是 fscore,我們上述指標中LogisticRegression效果良好,下面我們基於它進一步調優。

④ 超引數調優

📌 交叉驗證

我們上面的建模只是敲定了一組超引數,超引數會影響模型的最終效果,我們可以使用spark的CrossValidator進行超引數調優,選出最優的超引數。

```python paramGrid = ParamGridBuilder() \ .addGrid(lr.regParam,[0.0, 0.1]) \ .addGrid(lr.maxIter,[50, 100]) \ .build()

crossval = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=paramGrid, evaluator=MulticlassClassificationEvaluator(), numFolds=3)

交叉驗證調參

cvModel = crossval . fit(rest) cvModel . avgMetrics ```

輸出結果如下

python [0.8011084544393228, 0.8222872837788751, 0.7284659848286738, 0.7284659848286738]

我們對測試集做評估

```python

交叉驗證評估

cv_res_test = cvModel.transform(test).select('label', 'prediction', 'probability').toPandas() cv_res_train = cvModel.transform(rest).select('label', 'prediction', 'probability').toPandas() cv_metrics = evaluate_model(cv_res_train['label'],cv_res_train['prediction'],cv_res_test['label'],cv_res_test['prediction'], cv_res_test['probability'].apply(lambda x: x[1]))

cv_metrics ```

python {'train_acc': 0.8894736842105263, 'test_acc': 0.8571428571428571, 'fscore': 0.7368421052631577, 'precision': 0.7, 'recall': 0.7777777777777778, 'roc_auc': 0.858974358974359}

📌 最優超引數

python cvModel . getEstimatorParamMaps()[np . argmax(cvModel . avgMetrics)]

```

輸出結果

{Param(parent='LogisticRegression_e765de70ec6a', name='regParam', doc='regularization parameter (>= 0).'): 0.0, Param(parent='LogisticRegression_e765de70ec6a', name='maxIter', doc='max number of iterations (>= 0).'): 100} ```

💡 結果評估

我們的 ROC_AUC 從 95.7 下降到 85.9。 這並不奇怪,因為我懷疑 95.7 的結果是由於過度擬合造成的。

python {'train_acc': 0.8894736842105263, 'test_acc': 0.8571428571428571, 'fscore': 0.7368421052631577, 'precision': 0.7, 'recall': 0.7777777777777778, 'roc_auc': 0.858974358974359}

最好的引數是 regParam為 0 和 maxIter100 個。

① 混淆矩陣

我們定一個函式來繪製一下混淆矩陣(即對正負樣本和預估結果劃分4個象限進行評估)。

```python def plot_confusion_matrix(y_true, y_pred, title): ''' Plots confusion matrix @param y_true - array of actual labels @param y_pred - array of predictions @title title - string of title ''' conf_matrix = confusion_matrix(y_true, y_pred) matrix_display = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=["No Churn", "Churn"]) matrix_display.plot(cmap='Greens') # adding title - https://github.com/scikit-learn/scikit-learn/discussions/20690 matrix_display.ax_.set_title(title) plt.grid(False) plt.show()

# Calculating recall (sensitivity), precision and specificity
tn = conf_matrix[0][0]
tp = conf_matrix[1][1]
fn = conf_matrix[1][0]
fp = conf_matrix[0][1]
print(f'True Positive Rate/Recall/Sensitivity: {round(tp/(tp+fn), 6)}')
# basically inverse of TPR
print(f'False Positive Rate/(1 - Specificity): {round(fp/(tn+fp), 6)}')
print(f'Precision                            : {round(tp/(tp+fp), 6)}')

繪製混淆矩陣

plot_confusion_matrix(cv_res_test['label'], cv_res_test['prediction'], "Confusion matrix at 50% threshold (default)") ```

檢視下面的混淆矩陣,用0.5的預設概率閾值能夠正確預測 77.78% 的流失客戶 (7/(7+2)),也具有 70% 的不錯的precision (7/(7+3))

② ROC_AUC 曲線

```python

預測概率

test_proba = cv_res_test['probability'] . apply(lambda x: x[1])

fpr = false positive rate

tpr = true positive rate

fpr, tpr, _ = roc_curve(cv_res_test['label'], test_proba)

繪圖

plt.figure(figsize=(10,8)) plt.title('ROC AUC Curve for customer churn') plt.xlabel('False Positive Rate (FPR)') plt.ylabel('True Postive Rate (FPR) / Recall') plt.plot(fpr, tpr, marker='.', label='LR') plt.plot([0, 1], [0, 1]) plt.show() ```

下面的 ROC AUC 曲線清楚地顯示了召回率(真陽性率)和假陽性率之間的權衡。

③ PR 曲線

```python lr_precision, lr_recall, _ = precision_recall_curve(cv_res_test['label'], test_proba)

繪製PR曲線

plt.figure(figsize=(10,8)) plt.title('Recall/Precision curve') plt.xlabel('Recall') plt.ylabel('Precision') plt.plot(lr_recall, lr_precision, marker='.', label='LR') plt.axhline(y=cv_metrics['precision'], color='r') plt.axvline(x=cv_metrics['recall'], color='r') plt.show() ```

下面的召回/精度圖中的交點代表了我們調整後的LogisticRegression模型的召回-精度。預設的50%的決策閾值得出了77.8%/70%的召回率-精確度的權衡。

通過調整我們的決策閾值,我們可以定製我們想要的召回/精確率。

💡 總結&業務思考

我們可以調整我們的決策(概率)閾值,以獲得一個最滿意的召回率或精確度。比如在我們的場景下,使用了0.72的閾值取代預設的0.5,結果是在召回率沒有下降的基礎上,提升了精度。

現實中,召回率和精確度之間肯定會有權衡,特別是當我們在比較大的資料集上建模應用時。

```python def classify_custom_threshold(y_true, y_pred_proba, threshold=0.5): ''' Identifies custom threshold and plots confusion matrix @y_true - array of actual labels @y_pred_proba - array of probabilities of predictions @threshold - decision threshold which is defaulted to 50% ''' y_pred = y_pred_proba >= threshold plot_confusion_matrix(y_true, y_pred, f'Confusion matrix at {round(threshold * 100, 1)}% decision threshold')

classify_custom_threshold(cv_res_test['label'], test_proba, 0.72) ```

我們還需要與業務管理人員積極溝通,瞭解他們更有傾向性的指標(更看重precision還是recall):

  • 優先考慮recall意味著我們能判斷出大部分實際流失的客戶,但這可能會降低精度,就像我們之前提到的,這可能會導致成本增加。
  • 我們當前的結果已經很不錯了,如果業務負責人想追求更高的召回率,並願意為此花費一些成本,我們可以降低決策(概率)門檻。

舉例來說,在我們當前的例子中,如果我們將決策判定概率從0.5降低到0.25,可以把召回率提升到88.9%,但隨之發生變化的是精度降低到47%。

```python lr_precision, lr_recall, _ = precision_recall_curve(cv_res_test['label'], test_proba)

plt.figure(figsize=(10,8)) plt.title('Recall/Precision curve') plt.xlabel('Recall') plt.ylabel('Precision') plt.plot(lr_recall, lr_precision, marker='.', label='LR') plt.axhline(y=cv_metrics['precision'], color='r', alpha=0.3) plt.axvline(x=cv_metrics['recall'], color='r', alpha=0.3) plt.axhline(y=0.470588, color='r') plt.axvline(x=0.888889, color='r') plt.show()

classify_custom_threshold(cv_res_test['label'], test_proba, 0.25) ```

參考資料

「其他文章」