package com.huawei.fusionstage.middleware.dtm.rpc.consumer;

import com.huawei.fusionstage.middleware.dtm.common.logger.DTMLoggerFactory;
import com.huawei.fusionstage.middleware.dtm.common.util.StackTraceUtil;
import com.huawei.fusionstage.middleware.dtm.rpc.InvokerProxy;
import com.huawei.fusionstage.middleware.dtm.rpc.invoker.InvokerFuture;
import io.netty.channel.Channel;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;

/* loaded from: input_file:com/huawei/fusionstage/middleware/dtm/rpc/consumer/MessageSendRunner.class */
public class MessageSendRunner implements Runnable {
    private static final Logger LOGGER = DTMLoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private List<InvokerFuture> invokerFutures = new LinkedList();
    private Set<Channel> channels = new HashSet();

    private void sendToAllChannels() {
        this.invokerFutures.forEach(invokerFuture -> {
            Channel targetChannel = invokerFuture.getTargetChannel();
            this.channels.add(targetChannel);
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("write message invoker:{}, channel:{}", Long.valueOf(invokerFuture.getRequestMessage().getInvokeId()), targetChannel);
                }
                targetChannel.write(invokerFuture.getRequestMessage());
            } catch (Throwable th) {
                LOGGER.warn("write message to:{} failed, error message:{}", targetChannel, th.getMessage());
            }
        });
        this.channels.forEach(channel -> {
            try {
                channel.flush();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("flush finished for channel:{}", channel);
                }
            } catch (Throwable th) {
                LOGGER.warn("flush message to:{} failed, error message:{}", channel, th.getMessage());
            }
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread currentThread = Thread.currentThread();
        LOGGER.info("Start up message sending consumer : {}", currentThread.getName());
        while (!currentThread.isInterrupted()) {
            try {
                try {
                    this.invokerFutures.add(InvokerProxy.getSingleInstance().getRpcMessageBuffer().take());
                    InvokerProxy.getSingleInstance().getRpcMessageBuffer().drainTo(this.invokerFutures, 1024);
                    sendToAllChannels();
                    LOGGER.trace("Message send runner Send {} messages finish.", Integer.valueOf(this.invokerFutures.size()));
                    this.invokerFutures.clear();
                    this.channels.clear();
                } catch (Throwable th) {
                    LOGGER.warn("Send rpc exception,ignore it: {}", StackTraceUtil.traceStackMessage(th));
                    LOGGER.trace("Message send runner Send {} messages finish.", Integer.valueOf(this.invokerFutures.size()));
                    this.invokerFutures.clear();
                    this.channels.clear();
                }
            } catch (Throwable th2) {
                LOGGER.trace("Message send runner Send {} messages finish.", Integer.valueOf(this.invokerFutures.size()));
                this.invokerFutures.clear();
                this.channels.clear();
                throw th2;
            }
        }
    }
}
