使用Retrofit的方式請求Socket,且Socket可以和Http無縫切換

語言: CN / TW / HK

前言

一般來說前端的app和伺服器通訊都是用的Http,Http使用方便,請求流程好控制,但有時候app需要實時接收服務端的推送或保持長連線,這時就需要使用Socket了

java提供的Socket介面還是比較難用的,而網上有一個開源庫OkScoket封裝的還是挺好用的,Github地址:https://github.com/xuuhaoo/OkSocket

但即使如此,其沒有一對一回調或同步請求方法,只能通過一個或幾個統一的回撥方法,就造成了使用比較麻煩且容易出錯

而Retrofit使用比較好用,但是原生其只支援Http,所以我將其封裝了一下可以不用修改Retrofit和其介面的程式碼就可以簡單方便安全的使用

正文

下面將一下原理,如果不想看的同學可以直接翻到下面看引用方式和如何使用

我們如果無侵入無修改介面的使用可以用兩種方案

1.自定義動態代理

2.根據Retrofit提供的介面自定義實現Call.Factory和Call

1.自定義動態代理

該種方式比較靈活,且程式碼很少,但無法利用Retrofit的其他優勢,比如自定義返回值型別和解析器,比如支援RxJava就需要自己在寫一套或Copy一下,比如解析物件就要自己來處理

所以不使用該方法,但我將實現的程式碼放出來(只支援POST和GET的註解,其他註解可以自行支援)

我們通過寫相應介面的動態代理,並將自身請求的回撥註冊到OkSocket的統一回調中,然後自己判斷id來取回調,並將其改造成同步(自旋)和非同步(回撥)的Call在封裝適配RxJava

import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.CompositeException
import io.reactivex.exceptions.Exceptions
import io.reactivex.plugins.RxJavaPlugins
import okhttp3.Request
import retrofit2.Call
import retrofit2.Callback
import retrofit2.Response
import retrofit2.http.Field
import retrofit2.http.GET
import retrofit2.http.POST
import retrofit2.http.Query
import java.lang.reflect.*
import java.net.SocketTimeoutException

/**
 * creator: lt  2021/1/23  [email protected]
 * effect : 將大部分Http轉為Socket
 * warning:
 */
object SocketRequest {

    /**
     * 動態代理單例物件
     */
    val instance: IPostRequest = getPostRequest()

    //獲取動態代理例項物件
    private fun getPostRequest(): IPostRequest {
        val clazz = IPostRequest::class.java//拿到我們被代理介面的class物件
        return Proxy.newProxyInstance(//呼叫動態代理生成的方法來生成動態代理
                clazz.classLoader,//類載入器物件
                arrayOf(clazz),//因為我們的介面不需要繼承別的介面,所以直接傳入介面的class就行
                PostRequestHandler()//InvocationHandler介面的實現類,用來處理代理物件的方法呼叫
        ) as IPostRequest
    }

    class PostRequestHandler : InvocationHandler {
        override fun invoke(proxy: Any, method: Method, args: Array<out Any?>?): Any? {
            //處理Object類的方法
            if (method.declaringClass == Any::class.java)
                return (if (args == null) method.invoke(this) else method.invoke(this, *args))
            val tMap = HashMap<String, Any>()
            method.parameterAnnotations.forEachIndexed { index, it ->
                //拿到引數的key,如果拿不到說明需要使用http請求
                val key = (it.find { it is Field } as? Field)?.value
                        ?: (it.find { it is Query } as? Query)?.value
                        ?: return method.invoke(PostRequest.getPostRequest(), args)
                tMap.put(key, args?.get(index) ?: "")
            }
            //拿到url,如果不是以斜槓開頭就拼接上
            var url = method.getAnnotation(POST::class.java)?.value
                    ?: method.getAnnotation(GET::class.java)?.value
                    ?: return method.invoke(PostRequest.getPostRequest(), args)
            if (url.startsWith('/').not())
                url = "/$url"
            //處理返回值
            return when (method.returnType) {
                Call::class.java -> SocketCall<Any>(url, tMap, getParameterUpperBound(0, method.genericReturnType as ParameterizedType))
                Observable::class.java -> createObservable(url, tMap, method)
                else -> method.invoke(PostRequest.getPostRequest(), args)
            }
        }
    }

    private fun getParameterUpperBound(index: Int, type: ParameterizedType): Type {
        val types = type.actualTypeArguments
        require(!(index < 0 || index >= types.size)) { "Index " + index + " not in range [0," + types.size + ") for " + type }
        val paramType = types[index]
        return if (paramType is WildcardType) {
            paramType.upperBounds[0]
        } else paramType
    }

    private fun createObservable(url: String, tMap: HashMap<String, Any>, method: Method): Observable<Any?> =
            SocketObservable(SocketObservable.CallExecuteObservable(SocketCall(url, tMap, getParameterUpperBound(0, method.genericReturnType as ParameterizedType))))
}


class SocketObservable<T>(private val upstream: Observable<Response<T>>) : Observable<T>() {
    override fun subscribeActual(observer: Observer<in T>) {
        upstream.subscribe(BodyObserver(observer))
    }

    private class BodyObserver<R>(val observer: Observer<in R>) : Observer<Response<R>> {
        private var terminated = false
        override fun onSubscribe(disposable: Disposable) {
            observer.onSubscribe(disposable)
        }

        override fun onNext(response: Response<R>) {
            if (response.isSuccessful) {
                val body = response.body()
                if (body != null) {
                    observer.onNext(body)
                    return
                }
            }
            terminated = true
            val t: Throwable = retrofit2.HttpException(response)
            try {
                observer.onError(t)
            } catch (inner: Throwable) {
                Exceptions.throwIfFatal(inner)
                RxJavaPlugins.onError(CompositeException(t, inner))
            }
        }

        override fun onComplete() {
            if (!terminated) {
                observer.onComplete()
            }
        }

        override fun onError(throwable: Throwable) {
            if (!terminated) {
                observer.onError(throwable)
            } else {
                // This should never happen! onNext handles and forwards errors automatically.
                val broken: Throwable = AssertionError(
                        "This should never happen! Report as a bug with the full stacktrace.")
                broken.initCause(throwable)
                RxJavaPlugins.onError(broken)
            }
        }
    }

    class CallExecuteObservable<T>(private val call: Call<T>) : Observable<Response<T>>() {
        override fun subscribeActual(observer: Observer<in Response<T>>) {
            // Since Call is a one-shot type, clone it for each new observer.
            val disposable = CallDisposable(call)
            observer.onSubscribe(disposable)
            if (disposable.isDisposed) {
                return
            }
            var terminated = false
            try {
                val response = call.execute()
                if (!disposable.isDisposed) {
                    observer.onNext(response)
                }
                if (!disposable.isDisposed) {
                    terminated = true
                    observer.onComplete()
                }
            } catch (t: Throwable) {
                Exceptions.throwIfFatal(t)
                if (terminated) {
                    RxJavaPlugins.onError(t)
                } else if (!disposable.isDisposed) {
                    try {
                        observer.onError(t)
                    } catch (inner: Throwable) {
                        Exceptions.throwIfFatal(inner)
                        RxJavaPlugins.onError(CompositeException(t, inner))
                    }
                }
            }
        }

        private class CallDisposable constructor(private val call: Call<*>) : Disposable {
            @Volatile
            private var disposed = false
            override fun dispose() {
                disposed = true
                call.cancel()
            }

            override fun isDisposed(): Boolean {
                return disposed
            }
        }
    }
}

class SocketCall<T>(
        val url: String,
        val tMap: HashMap<String, Any>,
        val trueReturnType: Type) : Call<T> {
    private var canceled = false
    private var isExecuted = false//是否執行過,一個call物件只允許執行一次
    private var requestId = 0

    /**
     * 檢查網路三十秒,如果沒有連線成功就 sync拋異常,async就回調false
     * 傳入[asyncCallback]表示async
     */
    private fun checkConnect(asyncCallback: ((Boolean) -> Unit)? = null) {
        if (SocketManage.manager.isConnect) {
            asyncCallback?.invoke(true)
            return
        }
        SocketManage.connect()
        val time = System.currentTimeMillis()
        if (asyncCallback == null) {
            while (true) {
                if (SocketManage.currConnStatus == 3)
                    return
                if (System.currentTimeMillis() - time > 30000)
                    throw SocketTimeoutException()
                try {
                    Thread.sleep(100)
                } catch (e: Exception) {
                    e.printStackTrace()
                }
            }
        } else {
            ThreadPool.submitToCacheThreadPool {
                while (true) {
                    if (SocketManage.currConnStatus == 3) {
                        asyncCallback(true)
                        return@submitToCacheThreadPool
                    }
                    if (System.currentTimeMillis() - time > 30000) {
                        asyncCallback(false)
                        return@submitToCacheThreadPool
                    }
                    try {
                        Thread.sleep(100)
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }
            }
        }
    }

    override fun execute(): Response<T> {
        checkConnect()
        if (isExecuted) throw IllegalStateException("只能執行一次")
        isExecuted = true
        val data = TcpSendData(url, tMap)
        requestId = data.request_id
        "send2 : $url ${tMap.toJson()}".w("SocketManage22")
        var a: Any? = null
        var t: Throwable? = null
        var notFinish = true
        SocketManage.addListener(requestId, trueReturnType) { any: Any?, throwable: Throwable? ->
            a = any
            t = throwable
            notFinish = false
        }
        //傳送請求
        SocketManage.manager.send(data)
        var whileNumber = 0
        while (notFinish && !canceled) {
            try {
                whileNumber++
                if (whileNumber % 20 == 0)
                    SocketManage.handlerTimeOutedListener()
                Thread.sleep(50)
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
        t?.let { throw it }
        return if (a == null) throw ServerException("服務端返回的result是空") else Response.success(a as T)
    }

    override fun enqueue(callback: Callback<T>) {
        checkConnect {
            if (!it) {
                HandlerPool.post {
                    callback.onFailure(this, SocketTimeoutException())
                }
                return@checkConnect
            }
            if (isExecuted) throw IllegalStateException("只能執行一次")
            isExecuted = true
            val data = TcpSendData(url, tMap)
            "send : $url ${tMap.toJson()}".w("SocketManage22")
            requestId = data.request_id
            SocketManage.addListener(requestId, trueReturnType) { any: Any?, throwable: Throwable? ->
                    if (any != null)
                        callback.onResponse(this, Response.success(any as T))
                    else
                        callback.onFailure(this, throwable ?: ServerException("服務端返回的result是空2"))
            }
            //傳送請求
            SocketManage.manager.send(data)
        }
    }

    override fun clone(): Call<T> = SocketCall(url, tMap, trueReturnType)

    override fun isExecuted(): Boolean = isExecuted

    override fun cancel() {
        canceled = true
        // 取消請求
        SocketManage.removeListener(requestId)
    }

    override fun isCanceled(): Boolean = canceled

    override fun request(): Request? = null

}

object : SocketActionAdapter() {
            override fun onSocketReadResponse(info: ConnectionInfo?, action: String?, data: OriginalData?) {
                data ?: return
...
                        //處理回撥
                        val (_, type, listener) = listenerMap.remove(requestId) ?: return
                        try {
                            val any = Gson().fromJson<Any?>(body.getJSONObject("body").toString(), type)
                            HandlerPool.post {
                                if (any == null) listener(null, ServerException("服務端返回的result是空")) else listener(any, null)
                            }
                        } catch (t: Throwable) {
                            HandlerPool.post {
                                listener(null, t)
                            }
                        }
                        handlerTimeOutedListener()
...

具體可以參考我之前仿照Retrofit實現的Http動態代理:模仿Retrofit封裝一個使用更簡單的網路請求框架_滔lt的部落格-CSDN部落格

2.根據Retrofit提供的介面自定義實現Call.Factory和Call

ps:該方案被廢棄,因為其比較受限制,而且無法充分利用到Retrofit的功能,並且限制Http比較死,用Socket需要寫很多死程式碼

通過檢視Retrofit框架,發現傳入的OkHttpClient其實就是一個Call.Factory的實現類,所以只需要自行實現Call.Factory,我們也可以來控制Call的建立,只要Call能創建出來,其他的如Observable也只是對Call的包裝

在通過和方法1一樣的回撥處理,即可實現效果

關鍵程式碼如下:

abstract class SocketCallAdapter(private val manager: IConnectionManager) : Call.Factory {

    override fun newCall(request: Request): Call {
        ...
        return SocketCall(
                manager,
                this,
                url,
                map
        )
    }
}

/**
 * creator: lt  2021/2/27  [email protected]
 * effect : 用於Socket請求的Call
 * warning:
 */
internal class SocketCall(
        private val manager: IConnectionManager,
        private val adapter: SocketCallAdapter,
        private val url: String,
        private val tMap: HashMap<String, Any>) : Call {
  //和第一種實現相同

  ...

}

3.根據我from的Retrofit庫提供的能力來攔截Retrofit.Call的建立

使用該方式可以更靈活且更容易複用Retrofit的能力和外掛

參考更易於使用的Retrofit(不用寫註解)_滔lt的部落格-CSDN部落格第七條和原始碼:GitHub - ltttttttttttt/Retrofit_SocketCallAdapter: Retrofit可以直接使用OkSocket來進行網路請求,Retrofit內的東西都不需要修改,只需要將OkHttpClient換成此即可
ps:其實原始碼很簡單,就幾個檔案且內容都很少,而且想自定義攔截簡直太方便了

使用方式

在根專案的build.gradle檔案中加入:

allprojects {
    repositories {
...
        maven { url 'https://jitpack.io' }
    }
}

app的build.gradle中加上

dependencies{
    ...
    implementation 'com.github.ltttttttttttt:Retrofit_SocketCallAdapter:1.1.8'
    implementation "com.github.ltttttttttttt:retrofit:1.3.0"
    implementation 'org.jetbrains.kotlin:kotlin-reflect:1.4.30'
    //todo 預設用的'com.github.ltttttttttttt:retrofit:',所以如果需要用到gson的攔截器之類的,但是其中包含的有原版的retrofit的引用,會導致衝突,所以可以使用下面的方法來去掉本引用的某個遠端依賴
    //implementation 'com.squareup.retrofit2:converter-gson:2.7.0' exclude module: 'retrofit'
    //todo 因為使用了com.github.ltttttttttttt:retrofit,所以無法使用預設的Retrofit庫,因為提供了預設Retrofit庫沒有的功能,但其原有功能此庫都有,所以還是可以使用預設Retrofit庫的所有功能的
}

程式碼中使用(也可以參考Github內的InitRetrofit類):

OkSocket的初始化方式參考:https://github.com/xuuhaoo/OkSocket/wiki/Connection 使用也很簡單

其他使用方式和Retrofit相同(但只支援get的query和post的field轉為socket請求,其他方式可以評論留言,如果你可以實現直接提交更新!(歡迎大佬來補充更新))

val manager = OkSocket.open(ConnectionInfo(xxx,xxx))
...//初始化OkSocket的manager

val mId = AtomicInteger(0)
val handler = Handler(Looper.getMainLooper())
val socketAdapter = object : SocketAdapter(manager) {
                    //從響應資料中獲取請求id和body資料
                    override fun getResponseIdAndBodyBytes(data: OriginalData): Pair<Int, ByteArray>? {
                        val jb = JSONObject(String(data.bodyBytes))
                        if (!jb.has("id")) return null
                        return jb.getInt("id") to jb.getString("body").toByteArray()
                    }

                    //返回當前Socket在邏輯意義上是否和服務端連通了
                    override fun socketIsConnect(): Boolean = manager.isConnect

                    //根據url和請求引數生成用於傳送的資料和Id
                    override fun createSendDataAndId(url: String, requestParametersMap: HashMap<String, Any>, returns: (ISendable, Int) -> Unit) {
                        val id = mId.incrementAndGet()
                        val data = TcpSendData(id, url, requestParametersMap)//這裡需要你使用自己定義的資料傳送類
                        returns(data, id)
                    }

                    //在主執行緒回撥資料
                    override fun handlerCallbackRunnable(runnable: Runnable) {
                        handler.post(runnable)
                    }
                }


        //Socket的動態代理
        val r = Retrofit.Builder()
                .baseUrl(HttpConfig.ROOT_URL.toString())
                .addConverterFactory(GsonConverterFactory.create())
                .client(OkHttpClient())
                //這裡設定了Retrofit.Call的生成攔截,可以重寫SocketServiceMethodFactory.createServiceMethod方法返回null表示自己用Socket處理不了而使用Http請求
                .setServiceMethodFactory(SocketServiceMethodFactory(manager,socketAdapter ))
                .build()
                .create(HttpFunctions::class.java)

原始碼地址:GitHub - ltttttttttttt/Retrofit_SocketCallAdapter: Retrofit可以直接使用OkSocket來進行網路請求,Retrofit內的東西都不需要修改,只需要將OkHttpClient換成此即可