使用Retrofit的方式请求Socket,且Socket可以和Http无缝切换
前言
一般來(lái)說(shuō)前端的app和服務(wù)器通訊都是用的Http,Http使用方便,請(qǐng)求流程好控制,但有時(shí)候app需要實(shí)時(shí)接收服務(wù)端的推送或保持長(zhǎng)連接,這時(shí)就需要使用Socket了
java提供的Socket接口還是比較難用的,而網(wǎng)上有一個(gè)開(kāi)源庫(kù)OkScoket封裝的還是挺好用的,Github地址:https://github.com/xuuhaoo/OkSocket
但即使如此,其沒(méi)有一對(duì)一回調(diào)或同步請(qǐng)求方法,只能通過(guò)一個(gè)或幾個(gè)統(tǒng)一的回調(diào)方法,就造成了使用比較麻煩且容易出錯(cuò)
而Retrofit使用比較好用,但是原生其只支持Http,所以我將其封裝了一下可以不用修改Retrofit和其接口的代碼就可以簡(jiǎn)單方便安全的使用
正文
下面將一下原理,如果不想看的同學(xué)可以直接翻到下面看引用方式和如何使用
我們?nèi)绻麩o(wú)侵入無(wú)修改接口的使用可以用兩種方案
1.自定義動(dòng)態(tài)代理
2.根據(jù)Retrofit提供的接口自定義實(shí)現(xiàn)Call.Factory和Call
1.自定義動(dòng)態(tài)代理
該種方式比較靈活,且代碼很少,但無(wú)法利用Retrofit的其他優(yōu)勢(shì),比如自定義返回值類(lèi)型和解析器,比如支持RxJava就需要自己在寫(xiě)一套或Copy一下,比如解析對(duì)象就要自己來(lái)處理
所以不使用該方法,但我將實(shí)現(xiàn)的代碼放出來(lái)(只支持POST和GET的注解,其他注解可以自行支持)
我們通過(guò)寫(xiě)相應(yīng)接口的動(dòng)態(tài)代理,并將自身請(qǐng)求的回調(diào)注冊(cè)到OkSocket的統(tǒng)一回調(diào)中,然后自己判斷id來(lái)取回調(diào),并將其改造成同步(自旋)和異步(回調(diào))的Call在封裝適配RxJava
import io.reactivex.Observable import io.reactivex.Observer import io.reactivex.disposables.Disposable import io.reactivex.exceptions.CompositeException import io.reactivex.exceptions.Exceptions import io.reactivex.plugins.RxJavaPlugins import okhttp3.Request import retrofit2.Call import retrofit2.Callback import retrofit2.Response import retrofit2.http.Field import retrofit2.http.GET import retrofit2.http.POST import retrofit2.http.Query import java.lang.reflect.* import java.net.SocketTimeoutException/*** creator: lt 2021/1/23 lt.dygzs@qq.com* effect : 將大部分Http轉(zhuǎn)為Socket* warning:*/ object SocketRequest {/*** 動(dòng)態(tài)代理單例對(duì)象*/val instance: IPostRequest = getPostRequest()//獲取動(dòng)態(tài)代理實(shí)例對(duì)象private fun getPostRequest(): IPostRequest {val clazz = IPostRequest::class.java//拿到我們被代理接口的class對(duì)象return Proxy.newProxyInstance(//調(diào)用動(dòng)態(tài)代理生成的方法來(lái)生成動(dòng)態(tài)代理clazz.classLoader,//類(lèi)加載器對(duì)象arrayOf(clazz),//因?yàn)槲覀兊慕涌诓恍枰^承別的接口,所以直接傳入接口的class就行PostRequestHandler()//InvocationHandler接口的實(shí)現(xiàn)類(lèi),用來(lái)處理代理對(duì)象的方法調(diào)用) as IPostRequest}class PostRequestHandler : InvocationHandler {override fun invoke(proxy: Any, method: Method, args: Array<out Any?>?): Any? {//處理Object類(lèi)的方法if (method.declaringClass == Any::class.java)return (if (args == null) method.invoke(this) else method.invoke(this, *args))val tMap = HashMap<String, Any>()method.parameterAnnotations.forEachIndexed { index, it ->//拿到參數(shù)的key,如果拿不到說(shuō)明需要使用http請(qǐng)求val key = (it.find { it is Field } as? Field)?.value?: (it.find { it is Query } as? Query)?.value?: return method.invoke(PostRequest.getPostRequest(), args)tMap.put(key, args?.get(index) ?: "")}//拿到url,如果不是以斜杠開(kāi)頭就拼接上var url = method.getAnnotation(POST::class.java)?.value?: method.getAnnotation(GET::class.java)?.value?: return method.invoke(PostRequest.getPostRequest(), args)if (url.startsWith('/').not())url = "/$url"//處理返回值return when (method.returnType) {Call::class.java -> SocketCall<Any>(url, tMap, getParameterUpperBound(0, method.genericReturnType as ParameterizedType))Observable::class.java -> createObservable(url, tMap, method)else -> method.invoke(PostRequest.getPostRequest(), args)}}}private fun getParameterUpperBound(index: Int, type: ParameterizedType): Type {val types = type.actualTypeArgumentsrequire(!(index < 0 || index >= types.size)) { "Index " + index + " not in range [0," + types.size + ") for " + type }val paramType = types[index]return if (paramType is WildcardType) {paramType.upperBounds[0]} else paramType}private fun createObservable(url: String, tMap: HashMap<String, Any>, method: Method): Observable<Any?> =SocketObservable(SocketObservable.CallExecuteObservable(SocketCall(url, tMap, getParameterUpperBound(0, method.genericReturnType as ParameterizedType)))) }class SocketObservable<T>(private val upstream: Observable<Response<T>>) : Observable<T>() {override fun subscribeActual(observer: Observer<in T>) {upstream.subscribe(BodyObserver(observer))}private class BodyObserver<R>(val observer: Observer<in R>) : Observer<Response<R>> {private var terminated = falseoverride fun onSubscribe(disposable: Disposable) {observer.onSubscribe(disposable)}override fun onNext(response: Response<R>) {if (response.isSuccessful) {val body = response.body()if (body != null) {observer.onNext(body)return}}terminated = trueval t: Throwable = retrofit2.HttpException(response)try {observer.onError(t)} catch (inner: Throwable) {Exceptions.throwIfFatal(inner)RxJavaPlugins.onError(CompositeException(t, inner))}}override fun onComplete() {if (!terminated) {observer.onComplete()}}override fun onError(throwable: Throwable) {if (!terminated) {observer.onError(throwable)} else {// This should never happen! onNext handles and forwards errors automatically.val broken: Throwable = AssertionError("This should never happen! Report as a bug with the full stacktrace.")broken.initCause(throwable)RxJavaPlugins.onError(broken)}}}class CallExecuteObservable<T>(private val call: Call<T>) : Observable<Response<T>>() {override fun subscribeActual(observer: Observer<in Response<T>>) {// Since Call is a one-shot type, clone it for each new observer.val disposable = CallDisposable(call)observer.onSubscribe(disposable)if (disposable.isDisposed) {return}var terminated = falsetry {val response = call.execute()if (!disposable.isDisposed) {observer.onNext(response)}if (!disposable.isDisposed) {terminated = trueobserver.onComplete()}} catch (t: Throwable) {Exceptions.throwIfFatal(t)if (terminated) {RxJavaPlugins.onError(t)} else if (!disposable.isDisposed) {try {observer.onError(t)} catch (inner: Throwable) {Exceptions.throwIfFatal(inner)RxJavaPlugins.onError(CompositeException(t, inner))}}}}private class CallDisposable constructor(private val call: Call<*>) : Disposable {@Volatileprivate var disposed = falseoverride fun dispose() {disposed = truecall.cancel()}override fun isDisposed(): Boolean {return disposed}}} }class SocketCall<T>(val url: String,val tMap: HashMap<String, Any>,val trueReturnType: Type) : Call<T> {private var canceled = falseprivate var isExecuted = false//是否運(yùn)行過(guò),一個(gè)call對(duì)象只允許運(yùn)行一次private var requestId = 0/*** 檢查網(wǎng)絡(luò)三十秒,如果沒(méi)有連接成功就 sync拋異常,async就回調(diào)false* 傳入[asyncCallback]表示async*/private fun checkConnect(asyncCallback: ((Boolean) -> Unit)? = null) {if (SocketManage.manager.isConnect) {asyncCallback?.invoke(true)return}SocketManage.connect()val time = System.currentTimeMillis()if (asyncCallback == null) {while (true) {if (SocketManage.currConnStatus == 3)returnif (System.currentTimeMillis() - time > 30000)throw SocketTimeoutException()try {Thread.sleep(100)} catch (e: Exception) {e.printStackTrace()}}} else {ThreadPool.submitToCacheThreadPool {while (true) {if (SocketManage.currConnStatus == 3) {asyncCallback(true)return@submitToCacheThreadPool}if (System.currentTimeMillis() - time > 30000) {asyncCallback(false)return@submitToCacheThreadPool}try {Thread.sleep(100)} catch (e: Exception) {e.printStackTrace()}}}}}override fun execute(): Response<T> {checkConnect()if (isExecuted) throw IllegalStateException("只能執(zhí)行一次")isExecuted = trueval data = TcpSendData(url, tMap)requestId = data.request_id"send2 : $url ${tMap.toJson()}".w("SocketManage22")var a: Any? = nullvar t: Throwable? = nullvar notFinish = trueSocketManage.addListener(requestId, trueReturnType) { any: Any?, throwable: Throwable? ->a = anyt = throwablenotFinish = false}//發(fā)送請(qǐng)求SocketManage.manager.send(data)var whileNumber = 0while (notFinish && !canceled) {try {whileNumber++if (whileNumber % 20 == 0)SocketManage.handlerTimeOutedListener()Thread.sleep(50)} catch (e: Exception) {e.printStackTrace()}}t?.let { throw it }return if (a == null) throw ServerException("服務(wù)端返回的result是空") else Response.success(a as T)}override fun enqueue(callback: Callback<T>) {checkConnect {if (!it) {HandlerPool.post {callback.onFailure(this, SocketTimeoutException())}return@checkConnect}if (isExecuted) throw IllegalStateException("只能執(zhí)行一次")isExecuted = trueval data = TcpSendData(url, tMap)"send : $url ${tMap.toJson()}".w("SocketManage22")requestId = data.request_idSocketManage.addListener(requestId, trueReturnType) { any: Any?, throwable: Throwable? ->if (any != null)callback.onResponse(this, Response.success(any as T))elsecallback.onFailure(this, throwable ?: ServerException("服務(wù)端返回的result是空2"))}//發(fā)送請(qǐng)求SocketManage.manager.send(data)}}override fun clone(): Call<T> = SocketCall(url, tMap, trueReturnType)override fun isExecuted(): Boolean = isExecutedoverride fun cancel() {canceled = true// 取消請(qǐng)求SocketManage.removeListener(requestId)}override fun isCanceled(): Boolean = canceledoverride fun request(): Request? = null}object : SocketActionAdapter() {override fun onSocketReadResponse(info: ConnectionInfo?, action: String?, data: OriginalData?) {data ?: return ...//處理回調(diào)val (_, type, listener) = listenerMap.remove(requestId) ?: returntry {val any = Gson().fromJson<Any?>(body.getJSONObject("body").toString(), type)HandlerPool.post {if (any == null) listener(null, ServerException("服務(wù)端返回的result是空")) else listener(any, null)}} catch (t: Throwable) {HandlerPool.post {listener(null, t)}}handlerTimeOutedListener() ...具體可以參考我之前仿照Retrofit實(shí)現(xiàn)的Http動(dòng)態(tài)代理:https://blog.csdn.net/qq_33505109/article/details/104920101
2.根據(jù)Retrofit提供的接口自定義實(shí)現(xiàn)Call.Factory和Call
ps:該方案被廢棄,因?yàn)槠浔容^受限制,而且無(wú)法充分利用到Retrofit的功能,并且限制Http比較死,用Socket需要寫(xiě)很多死代碼
通過(guò)查看Retrofit框架,發(fā)現(xiàn)傳入的OkHttpClient其實(shí)就是一個(gè)Call.Factory的實(shí)現(xiàn)類(lèi),所以只需要自行實(shí)現(xiàn)Call.Factory,我們也可以來(lái)控制Call的創(chuàng)建,只要Call能創(chuàng)建出來(lái),其他的如Observable也只是對(duì)Call的包裝
在通過(guò)和方法1一樣的回調(diào)處理,即可實(shí)現(xiàn)效果
關(guān)鍵代碼如下:
abstract class SocketCallAdapter(private val manager: IConnectionManager) : Call.Factory {override fun newCall(request: Request): Call {...return SocketCall(manager,this,url,map)} }/*** creator: lt 2021/2/27 lt.dygzs@qq.com* effect : 用于Socket請(qǐng)求的Call* warning:*/ internal class SocketCall(private val manager: IConnectionManager,private val adapter: SocketCallAdapter,private val url: String,private val tMap: HashMap<String, Any>) : Call {//和第一種實(shí)現(xiàn)相同...}3.根據(jù)我from的Retrofit庫(kù)提供的能力來(lái)攔截Retrofit.Call的創(chuàng)建
使用該方式可以更靈活且更容易復(fù)用Retrofit的能力和插件
參考https://blog.csdn.net/qq_33505109/article/details/108767068第七條和源碼:https://github.com/ltttttttttttt/Retrofit_SocketCallAdapter
ps:其實(shí)源碼很簡(jiǎn)單,就幾個(gè)文件且內(nèi)容都很少,而且想自定義攔截簡(jiǎn)直太方便了
使用方式
在根項(xiàng)目的build.gradle文件中加入:
allprojects {repositories { ...maven { url 'https://jitpack.io' }} }app的build.gradle中加上
dependencies{...implementation 'com.github.ltttttttttttt:Retrofit_SocketCallAdapter:1.1.8'implementation "com.github.ltttttttttttt:retrofit:1.3.0"implementation 'org.jetbrains.kotlin:kotlin-reflect:1.4.30'//todo 默認(rèn)用的'com.github.ltttttttttttt:retrofit:',所以如果需要用到gson的攔截器之類(lèi)的,但是其中包含的有原版的retrofit的引用,會(huì)導(dǎo)致沖突,所以可以使用下面的方法來(lái)去掉本引用的某個(gè)遠(yuǎn)程依賴(lài)//implementation 'com.squareup.retrofit2:converter-gson:2.7.0' exclude module: 'retrofit'//todo 因?yàn)槭褂昧薱om.github.ltttttttttttt:retrofit,所以無(wú)法使用默認(rèn)的Retrofit庫(kù),因?yàn)樘峁┝四J(rèn)Retrofit庫(kù)沒(méi)有的功能,但其原有功能此庫(kù)都有,所以還是可以使用默認(rèn)Retrofit庫(kù)的所有功能的 }代碼中使用(也可以參考Github內(nèi)的InitRetrofit類(lèi)):
OkSocket的初始化方式參考:https://github.com/xuuhaoo/OkSocket/wiki/Connection? 使用也很簡(jiǎn)單
其他使用方式和Retrofit相同(但只支持get的query和post的field轉(zhuǎn)為socket請(qǐng)求,其他方式可以評(píng)論留言,如果你可以實(shí)現(xiàn)直接提交更新!(歡迎大佬來(lái)補(bǔ)充更新))
val manager = OkSocket.open(ConnectionInfo(xxx,xxx)) ...//初始化OkSocket的managerval mId = AtomicInteger(0) val handler = Handler(Looper.getMainLooper()) val socketAdapter = object : SocketAdapter(manager) {//從響應(yīng)數(shù)據(jù)中獲取請(qǐng)求id和body數(shù)據(jù)override fun getResponseIdAndBodyBytes(data: OriginalData): Pair<Int, ByteArray>? {val jb = JSONObject(String(data.bodyBytes))if (!jb.has("id")) return nullreturn jb.getInt("id") to jb.getString("body").toByteArray()}//返回當(dāng)前Socket在邏輯意義上是否和服務(wù)端連通了override fun socketIsConnect(): Boolean = manager.isConnect//根據(jù)url和請(qǐng)求參數(shù)生成用于發(fā)送的數(shù)據(jù)和Idoverride fun createSendDataAndId(url: String, requestParametersMap: HashMap<String, Any>, returns: (ISendable, Int) -> Unit) {val id = mId.incrementAndGet()val data = TcpSendData(id, url, requestParametersMap)//這里需要你使用自己定義的數(shù)據(jù)發(fā)送類(lèi)returns(data, id)}//在主線(xiàn)程回調(diào)數(shù)據(jù)override fun handlerCallbackRunnable(runnable: Runnable) {handler.post(runnable)}}//Socket的動(dòng)態(tài)代理val r = Retrofit.Builder().baseUrl(HttpConfig.ROOT_URL.toString()).addConverterFactory(GsonConverterFactory.create()).client(OkHttpClient())//這里設(shè)置了Retrofit.Call的生成攔截,可以重寫(xiě)SocketServiceMethodFactory.createServiceMethod方法返回null表示自己用Socket處理不了而使用Http請(qǐng)求.setServiceMethodFactory(SocketServiceMethodFactory(manager,socketAdapter )).build().create(HttpFunctions::class.java)源碼地址:https://github.com/ltttttttttttt/Retrofit_SocketCallAdapter
總結(jié)
以上是生活随笔為你收集整理的使用Retrofit的方式请求Socket,且Socket可以和Http无缝切换的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 分析Kotlin协程只挂起不恢复会怎样(
- 下一篇: 安卓动态.9图拉伸实现方案