package io.wondrous.sns.api.tmg.realtime;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import com.google.gson.Gson;
import com.meetme.util.android.Bundles;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.functions.Action;
import io.reactivex.functions.Cancellable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.wondrous.sns.api.tmg.TmgApiConfig;
import io.wondrous.sns.api.tmg.exception.ConnectionRefusedException;
import io.wondrous.sns.api.tmg.exception.RetryException;
import io.wondrous.sns.api.tmg.realtime.RealtimeSubscription;
import io.wondrous.sns.api.tmg.realtime.TmgRealtimeApi;
import io.wondrous.sns.api.tmg.realtime.internal.CompositeWebsocketListener;
import io.wondrous.sns.api.tmg.realtime.internal.RealtimeLoggedEvent;
import io.wondrous.sns.api.tmg.realtime.internal.RealtimeSocketListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketConnectingListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketEnvelopeMessage;
import io.wondrous.sns.api.tmg.realtime.internal.SocketFailureListener;
import io.wondrous.sns.api.tmg.realtime.internal.SocketTopicMessage;
import io.wondrous.sns.logger.SnsLogger;
import io.wondrous.sns.oauth.OAuthInterceptor;
import io.wondrous.sns.util.RetryWhen;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.reactivestreams.Publisher;

@Singleton
/* loaded from: classes5.dex */
public class TmgRealtimeApi {
    public static final int CLOSE_CODE_NORMAL = 1000;
    public static final String TAG = "TmgRealtimeApi";
    public final Gson mGson;
    public final SnsLogger mLogger;

    @Nullable
    public final OAuthInterceptor mOAuthInterceptor;
    public final OkHttpClient mOkHttpClient;
    public final Observable<WebSocket> mSocketTask;
    public final Map<String, Flowable<TopicEvent>> mTopicPublishers = new ConcurrentHashMap();

    @VisibleForTesting
    public final CompositeWebsocketListener mWebsocketListener = new CompositeWebsocketListener();
    public final Flowable<SocketEnvelopeMessage> mStreamPublisher = Flowable.a(new FlowableOnSubscribe() { // from class: c.a.a.b.b.b.j
        @Override // io.reactivex.FlowableOnSubscribe
        public final void a(FlowableEmitter flowableEmitter) {
            TmgRealtimeApi.this.a(flowableEmitter);
        }
    }, BackpressureStrategy.BUFFER).m();

    @Inject
    public TmgRealtimeApi(SnsLogger snsLogger, @Named("realtime-client") OkHttpClient okHttpClient, final TmgApiConfig tmgApiConfig, TmgRealtimeConfig tmgRealtimeConfig, Gson gson) {
        this.mLogger = snsLogger;
        this.mOkHttpClient = okHttpClient;
        this.mOAuthInterceptor = extractOAuthInterceptor(okHttpClient);
        this.mGson = gson;
        this.mSocketTask = Observable.create(new ObservableOnSubscribe() { // from class: c.a.a.b.b.b.g
            @Override // io.reactivex.ObservableOnSubscribe
            public final void a(ObservableEmitter observableEmitter) {
                TmgRealtimeApi.this.a(tmgApiConfig, observableEmitter);
            }
        }).doOnError(new Consumer() { // from class: c.a.a.b.b.b.d
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                TmgRealtimeApi.this.a((Throwable) obj);
            }
        }).replay(1).a(1, tmgRealtimeConfig.getSocketReuseTimeoutInSecs(), TimeUnit.SECONDS);
    }

    private Flowable<TopicEvent> createTopicPublisher(@NonNull final String str) {
        return subscribeToTopic(str).toFlowable(BackpressureStrategy.LATEST).i(retryPolicy().a()).j(new Function() { // from class: c.a.a.b.b.b.n
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return TmgRealtimeApi.this.a((RealtimeSubscription) obj);
            }
        }).b(SocketTopicMessage.class).a(new Predicate() { // from class: c.a.a.b.b.b.e
            @Override // io.reactivex.functions.Predicate
            public final boolean test(Object obj) {
                boolean equals;
                equals = str.equals(((SocketTopicMessage) obj).getTopic());
                return equals;
            }
        }).f(new Function() { // from class: c.a.a.b.b.b.a
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return ((SocketTopicMessage) obj).getMessage();
            }
        });
    }

    @Nullable
    public static OAuthInterceptor extractOAuthInterceptor(@NonNull OkHttpClient okHttpClient) {
        for (Interceptor interceptor : okHttpClient.interceptors()) {
            if (interceptor instanceof OAuthInterceptor) {
                return (OAuthInterceptor) interceptor;
            }
        }
        return null;
    }

    private Flowable<SocketEnvelopeMessage> getMessagesStream() {
        return this.mStreamPublisher;
    }

    private RetryWhen.Builder retryPolicy() {
        return RetryWhen.b((Consumer<? super RetryWhen.ErrorAndDuration>) new Consumer() { // from class: c.a.a.b.b.b.o
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                TmgRealtimeApi.this.a((RetryWhen.ErrorAndDuration) obj);
            }
        }).a(2L, 10L, TimeUnit.SECONDS, 2.0d);
    }

    private Observable<RealtimeSubscription> subscribeToTopic(@NonNull final String str) {
        return getSocket().switchMap(new Function() { // from class: c.a.a.b.b.b.h
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return TmgRealtimeApi.this.a(str, (WebSocket) obj);
            }
        });
    }

    public /* synthetic */ ObservableSource a(final String str, final WebSocket webSocket) throws Exception {
        return Observable.create(new ObservableOnSubscribe() { // from class: c.a.a.b.b.b.l
            @Override // io.reactivex.ObservableOnSubscribe
            public final void a(ObservableEmitter observableEmitter) {
                TmgRealtimeApi.this.a(webSocket, str, observableEmitter);
            }
        });
    }

    public /* synthetic */ Publisher a(RealtimeSubscription realtimeSubscription) throws Exception {
        return getMessagesStream();
    }

    public /* synthetic */ Publisher a(final String str) throws Exception {
        OAuthInterceptor oAuthInterceptor = this.mOAuthInterceptor;
        return oAuthInterceptor == null ? Flowable.a(new IllegalStateException("Unable to subscribe to privileged realtime topic.")) : oAuthInterceptor.b(this.mOkHttpClient).e(new Function() { // from class: c.a.a.b.b.b.b
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                return TmgRealtimeApi.this.a(str, (String) obj);
            }
        });
    }

    public /* synthetic */ Publisher a(String str, String str2) throws Exception {
        return events("/" + str2 + str);
    }

    public /* synthetic */ void a(FlowableEmitter flowableEmitter) throws Exception {
        final WebSocketStreamCallbacks webSocketStreamCallbacks = new WebSocketStreamCallbacks(flowableEmitter, this.mGson);
        this.mWebsocketListener.addListener(webSocketStreamCallbacks);
        flowableEmitter.a(new Cancellable() { // from class: c.a.a.b.b.b.i
            @Override // io.reactivex.functions.Cancellable
            public final void cancel() {
                TmgRealtimeApi.this.a(webSocketStreamCallbacks);
            }
        });
    }

    public /* synthetic */ void a(TmgApiConfig tmgApiConfig, ObservableEmitter observableEmitter) throws Exception {
        final SocketFailureListener socketFailureListener = new SocketFailureListener(observableEmitter);
        this.mWebsocketListener.addListener(socketFailureListener);
        final SocketConnectingListener socketConnectingListener = new SocketConnectingListener(observableEmitter, this.mGson);
        socketConnectingListener.setCancellable(new Cancellable() { // from class: c.a.a.b.b.b.f
            @Override // io.reactivex.functions.Cancellable
            public final void cancel() {
                TmgRealtimeApi.this.a(socketConnectingListener);
            }
        });
        this.mWebsocketListener.addListener(socketConnectingListener);
        final WebSocket newWebSocket = this.mOkHttpClient.newWebSocket(new Request.Builder().url(tmgApiConfig.getWebSocketUrl()).build(), this.mWebsocketListener);
        observableEmitter.a(new Cancellable() { // from class: c.a.a.b.b.b.k
            @Override // io.reactivex.functions.Cancellable
            public final void cancel() {
                TmgRealtimeApi.this.a(socketFailureListener, newWebSocket);
            }
        });
    }

    public /* synthetic */ void a(WebSocketStreamCallbacks webSocketStreamCallbacks) throws Exception {
        this.mWebsocketListener.removeListener(webSocketStreamCallbacks);
    }

    public /* synthetic */ void a(SocketConnectingListener socketConnectingListener) throws Exception {
        this.mWebsocketListener.removeListener(socketConnectingListener);
    }

    public /* synthetic */ void a(SocketFailureListener socketFailureListener, WebSocket webSocket) throws Exception {
        this.mWebsocketListener.removeListener(socketFailureListener);
        webSocket.close(1000, "Client disconnected");
    }

    public /* synthetic */ void a(RetryWhen.ErrorAndDuration errorAndDuration) throws Exception {
        this.mLogger.a(RealtimeLoggedEvent.RETRY, Bundles.a().a("error", errorAndDuration.b().toString()).a("delayMs", errorAndDuration.a()).a());
        this.mLogger.a(new RetryException("Error in Stream socket. Reconnecting in " + errorAndDuration.a() + " ms", errorAndDuration.b()));
    }

    public /* synthetic */ void a(Throwable th) throws Exception {
        OAuthInterceptor oAuthInterceptor;
        if (!(th instanceof ConnectionRefusedException) || (oAuthInterceptor = this.mOAuthInterceptor) == null) {
            return;
        }
        oAuthInterceptor.d();
    }

    public /* synthetic */ void a(WebSocket webSocket, String str, ObservableEmitter observableEmitter) throws Exception {
        RealtimeSubscription realtimeSubscription = new RealtimeSubscription(webSocket, str, this.mGson);
        realtimeSubscription.subscribe();
        observableEmitter.a(realtimeSubscription);
        observableEmitter.onNext(realtimeSubscription);
    }

    public void addStreamSocketListener(RealtimeSocketListener realtimeSocketListener) {
        this.mWebsocketListener.addListener(new StreamWebsocketAdapter(realtimeSocketListener));
    }

    public Flowable<TopicEvent> authenticatedEvents(final String str) {
        if (!str.startsWith("/")) {
            str = "/" + str;
        }
        return Flowable.a(new Callable() { // from class: c.a.a.b.b.b.c
            @Override // java.util.concurrent.Callable
            public final Object call() {
                return TmgRealtimeApi.this.a(str);
            }
        });
    }

    public /* synthetic */ void b(String str) throws Exception {
        this.mTopicPublishers.remove(str);
    }

    public Flowable<TopicEvent> events(final String str) {
        if (!str.startsWith("/")) {
            str = "/" + str;
        }
        Flowable<TopicEvent> flowable = this.mTopicPublishers.get(str);
        if (flowable != null) {
            return flowable;
        }
        Flowable<TopicEvent> m = createTopicPublisher(str).c(new Action() { // from class: c.a.a.b.b.b.m
            @Override // io.reactivex.functions.Action
            public final void run() {
                TmgRealtimeApi.this.b(str);
            }
        }).m();
        this.mTopicPublishers.put(str, m);
        return m;
    }

    @NonNull
    public Observable<WebSocket> getSocket() {
        return this.mSocketTask;
    }

    @Nullable
    @Deprecated
    public String getUserId() {
        OAuthInterceptor oAuthInterceptor = this.mOAuthInterceptor;
        if (oAuthInterceptor != null) {
            return oAuthInterceptor.b();
        }
        return null;
    }
}
