package com.aliyun.openservices.eas.predict.queue_client;

import com.aliyun.openservices.eas.predict.http.HttpException;
import com.aliyun.openservices.eas.predict.http.QueueClient;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

/* loaded from: input_file:com/aliyun/openservices/eas/predict/queue_client/WebSocketWatcher.class */
public class WebSocketWatcher {
    private static Log log = LogFactory.getLog(WebSocketWatcher.class);
    private QueueClient queueClient;
    private URI uri;
    private Map<String, String> headers;
    private AtomicBoolean tryReconnect;
    private AtomicBoolean needPing;
    private boolean unlimitedReConnect;
    private AtomicInteger reConnectTimes;
    private int maxReConnectCnt;
    private AtomicBoolean end;
    private int reConnectInterval;
    private BlockingQueue<DataFrame> dataQueue;
    private Exception error;

    public WebSocketWatcher(QueueClient queueClient, URI uri, Map<String, String> map, WatchConfig watchConfig) throws Exception {
        this.queueClient = queueClient;
        this.uri = uri;
        if (map != null && !map.isEmpty()) {
            this.headers = new HashMap();
            this.headers.putAll(map);
        }
        this.unlimitedReConnect = watchConfig.isUnLimitedReCon();
        this.maxReConnectCnt = watchConfig.getReConCnt();
        this.reConnectTimes = new AtomicInteger(0);
        this.reConnectInterval = watchConfig.getReConInterval();
        this.needPing = new AtomicBoolean(true);
        this.tryReconnect = new AtomicBoolean(false);
        this.end = new AtomicBoolean(false);
        this.dataQueue = new LinkedBlockingQueue(100);
        createWebSocketClient();
        pingServer();
        this.queueClient.webSocketClient.connectBlocking();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void needReconnect(String str) throws Exception {
        while (true) {
            Thread.sleep(this.reConnectInterval * 1000);
            if (this.tryReconnect.get()) {
                log.warn("WebSocketClient Failed to Connect to Server");
                return;
            }
            int incrementAndGet = this.reConnectTimes.incrementAndGet();
            if (!this.unlimitedReConnect && incrementAndGet > this.maxReConnectCnt) {
                close();
                log.error("WebSocketClient Re-Connect Failed, Exhausted maxReConnectCnt: " + this.maxReConnectCnt + ", error = " + str);
                throw new Exception(str);
            }
            try {
                try {
                    this.tryReconnect.set(true);
                    if (this.queueClient.webSocketClient.isOpen()) {
                        log.warn("Prepare to Re-Connect, Close Existing WebSocket Connection");
                        this.queueClient.webSocketClient.closeConnection(1000, "Re-Connect Stop");
                    }
                    this.queueClient.webSocketClient = null;
                    createWebSocketClient();
                } catch (Exception e) {
                    log.warn("WebSocketClient Re-Connect Error, Error: " + e.getMessage());
                    this.tryReconnect.set(false);
                }
                if (this.queueClient.webSocketClient.connectBlocking()) {
                    this.tryReconnect.set(false);
                    return;
                }
                this.tryReconnect.set(false);
            } catch (Throwable th) {
                this.tryReconnect.set(false);
                throw th;
            }
        }
    }

    private void createWebSocketClient() throws Exception {
        try {
            this.queueClient.lock.lock();
            if (this.queueClient.webSocketClient != null) {
                throw new HttpException(400, "Another watcher is already running");
            }
            this.queueClient.webSocketClient = new WebSocketClient(this.uri, this.headers) { // from class: com.aliyun.openservices.eas.predict.queue_client.WebSocketWatcher.1
                public void onOpen(ServerHandshake serverHandshake) {
                    WebSocketWatcher.log.info("WebSocketClient Successfully Connects to Server: " + getRemoteSocketAddress());
                    WebSocketWatcher.this.reConnectTimes.set(0);
                    WebSocketWatcher.this.tryReconnect.set(false);
                }

                public void onMessage(String str) {
                }

                public void onMessage(ByteBuffer byteBuffer) {
                    try {
                        WebSocketWatcher.this.dataQueue.put(new DataFrame().decode(byteBuffer));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                public void onClose(int i, String str, boolean z) {
                    WebSocketWatcher.log.warn(String.format("WebSocketClient is Closed, Code: %d, Reason: %s, Re-Connect times: %d", Integer.valueOf(i), str, Integer.valueOf(WebSocketWatcher.this.reConnectTimes.get())));
                    if (WebSocketWatcher.this.end.get() || WebSocketWatcher.this.tryReconnect.get()) {
                        return;
                    }
                    try {
                        WebSocketWatcher.this.needReconnect(str);
                    } catch (Exception e) {
                        throw new RuntimeException(e.getMessage());
                    }
                }

                public void onError(Exception exc) {
                    try {
                        WebSocketWatcher.this.dataQueue.put(new DataFrame(exc));
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        } finally {
            this.queueClient.lock.unlock();
        }
    }

    public void pingServer() {
        Thread thread = new Thread(() -> {
            while (this.needPing.get()) {
                try {
                    this.queueClient.lock.lock();
                    if (this.queueClient.webSocketClient != null && this.queueClient.webSocketClient.isOpen()) {
                        this.queueClient.webSocketClient.sendPing();
                    }
                } catch (Exception e) {
                    log.warn("PingServer Error, error = " + e.getMessage());
                } finally {
                    this.queueClient.lock.unlock();
                }
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
            log.debug("PingServer is Closed");
        });
        thread.setDaemon(true);
        thread.start();
    }

    public DataFrame getDataFrame() throws Exception {
        DataFrame take = this.dataQueue.take();
        if (take.getError() != null) {
            throw take.getError();
        }
        return take;
    }

    public void close() {
        try {
            this.queueClient.lock.lock();
            this.needPing.set(false);
            this.end.set(true);
            if (this.queueClient.webSocketClient != null) {
                this.queueClient.webSocketClient.closeConnection(1000, "Real Stop");
            }
        } finally {
            this.queueClient.lock.unlock();
        }
    }
}
