package com.huawei.middleware.dtm.client.client.connect;

import com.huawei.fusionstage.middleware.dtm.common.configuration.DTMClientConfiguration;
import com.huawei.fusionstage.middleware.dtm.common.logger.DTMLoggerFactory;
import com.huawei.fusionstage.middleware.dtm.common.protocol.MessageBuilder;
import com.huawei.fusionstage.middleware.dtm.common.protocol.message.response.Response;
import com.huawei.fusionstage.middleware.dtm.common.util.StackTraceUtil;
import com.huawei.fusionstage.middleware.dtm.common.util.StringUtils;
import com.huawei.fusionstage.middleware.dtm.rpc.InvokerProxy;
import com.huawei.fusionstage.middleware.dtm.rpc.api.IInvoker;
import com.huawei.middleware.dtm.client.DTMClientData;
import com.huawei.middleware.dtm.client.callback.entity.ReTryGlobalEndEventEntity;
import com.huawei.middleware.dtm.client.client.DtmClientProxyHandler;
import com.huawei.middleware.dtm.client.client.sender.DtmChannel;
import com.huawei.middleware.dtm.client.client.sender.api.IMessageSender;
import io.netty.channel.Channel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/huawei/middleware/dtm/client/client/connect/ReconnectServerRunner.class */
public class ReconnectServerRunner implements Runnable {
    private static final Logger LOGGER = DTMLoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ExecutorService CONNECTION_SHARED_EXECUTOR = new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new DefaultThreadFactory("dtm-Connection-Pool-Handler"));
    private static final IInvoker INVOKER_PROXY = InvokerProxy.getSingleInstance();
    private IMessageSender sender;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReconnectServerRunner(IMessageSender iMessageSender) {
        this.sender = iMessageSender;
    }

    private void connectToServer() throws InterruptedException {
        Set<String> allRunningChannelsAddress = DTMClientData.getSingleton().getAllRunningChannelsAddress();
        List list = (List) DTMClientConfiguration.getSingleInstance().getAllActiveServers().stream().filter(activeServerAddress -> {
            return (allRunningChannelsAddress.contains(activeServerAddress.constructServerAddress()) || allRunningChannelsAddress.contains(activeServerAddress.constructEipServerAddress())) ? false : true;
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        list.forEach(activeServerAddress2 -> {
            CONNECTION_SHARED_EXECUTOR.execute(() -> {
                Channel channel;
                DtmClientProxyHandler singleInstance = DtmClientProxyHandler.getSingleInstance();
                String constructServerAddress = activeServerAddress2.constructServerAddress();
                try {
                    channel = (Channel) singleInstance.registerToServer(constructServerAddress);
                    if (channel == null && !StringUtils.isBlank(activeServerAddress2.constructEipServerAddress())) {
                        constructServerAddress = activeServerAddress2.constructEipServerAddress();
                        channel = (Channel) singleInstance.registerToServer(constructServerAddress);
                    }
                } catch (Throwable th) {
                    LOGGER.error("Register to dtm server: {} failed, error message:{}", activeServerAddress2, th.getMessage());
                    channel = null;
                }
                if (channel != null) {
                    LOGGER.info("Register to dtm server: {} success.", channel);
                    DTMClientData.getSingleton().addToRunningChannels(constructServerAddress, activeServerAddress2, channel);
                }
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
    }

    private void updateIdentifiersToServer() throws InterruptedException {
        Set<DtmChannel> fetchDtmChannelNeedToRefreshIdentifiers = DTMClientData.getSingleton().fetchDtmChannelNeedToRefreshIdentifiers();
        if (fetchDtmChannelNeedToRefreshIdentifiers.isEmpty()) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(fetchDtmChannelNeedToRefreshIdentifiers.size());
        fetchDtmChannelNeedToRefreshIdentifiers.forEach(dtmChannel -> {
            CONNECTION_SHARED_EXECUTOR.execute(() -> {
                try {
                    Response parseFrom = Response.parseFrom(INVOKER_PROXY.syncCall(MessageBuilder.eventMsgWrapperBuild((byte) 2, DtmClientProxyHandler.getSingleInstance().buildRegisterData()), dtmChannel.getChannel()).getMessageBytes());
                    if (parseFrom.getStatusCode() != 200) {
                        LOGGER.error("update identifiers: {} to server {} failed, error message: {}", new Object[]{DTMClientData.getSingleton().getAllRegisterIdentifier(), dtmChannel.getActiveAddress(), parseFrom.getMessage()});
                    } else {
                        dtmChannel.refreshAllRegisterIdentifier(DTMClientData.getSingleton().getAllRegisterIdentifier());
                        LOGGER.info("update identifiers: {} to server {} success", dtmChannel.getAllRegisterIdentifier(), dtmChannel.getActiveAddress());
                    }
                } catch (Throwable th) {
                    LOGGER.error("update identifiers: {} to server {} failed, error message: {}", new Object[]{DTMClientData.getSingleton().getAllRegisterIdentifier(), dtmChannel.getActiveAddress(), th.getMessage()});
                }
                countDownLatch.countDown();
            });
        });
        countDownLatch.await();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            reSendGlobalEndEventForNeverTimeoutTx();
            DTMClientData.getSingleton().refreshAllIdentifies();
            connectToServer();
            updateIdentifiersToServer();
        } catch (Throwable th) {
            LOGGER.error("Run reconnect to server thread failed,error message:{}", th.getMessage());
        }
    }

    private void reSendGlobalEndEventForNeverTimeoutTx() {
        Queue<ReTryGlobalEndEventEntity> globalEndedEventsForNeverTimeout = DTMClientData.getSingleton().getGlobalEndedEventsForNeverTimeout();
        if (globalEndedEventsForNeverTimeout.isEmpty() || !DTMClientData.getSingleton().hasRunningChannels()) {
            return;
        }
        CONNECTION_SHARED_EXECUTOR.submit(() -> {
            ArrayList arrayList = new ArrayList();
            while (true) {
                ReTryGlobalEndEventEntity reTryGlobalEndEventEntity = (ReTryGlobalEndEventEntity) globalEndedEventsForNeverTimeout.poll();
                if (reTryGlobalEndEventEntity == null) {
                    globalEndedEventsForNeverTimeout.addAll(arrayList);
                    return;
                }
                try {
                    LOGGER.info("Re send global end event:{} ", reTryGlobalEndEventEntity);
                    Response.parseFrom(this.sender.sendGlobalEndMessage(reTryGlobalEndEventEntity.getTransactionGlobalEndedEvent().getTransactionGlobalId(), reTryGlobalEndEventEntity.getChannelKey(), reTryGlobalEndEventEntity.getBackupChannelKey(), MessageBuilder.eventMsgWrapperBuild((byte) 4, reTryGlobalEndEventEntity.getTransactionGlobalEndedEvent().toByteArray()), reTryGlobalEndEventEntity.getTransactionGlobalEndedEvent().getTransactionStatus()).getMessageBytes());
                } catch (Throwable th) {
                    LOGGER.error("Re send global end event:{} failed, error message:{}", reTryGlobalEndEventEntity, StackTraceUtil.traceStackMessage(th));
                    arrayList.add(reTryGlobalEndEventEntity);
                }
            }
        });
    }
}
