package com.huawei.middleware.dtm.rpc.consumer;

import com.huawei.middleware.dtm.common.exception.StackTraceUtil;
import com.huawei.middleware.dtm.common.logger.DTMLoggerFactory;
import com.huawei.middleware.dtm.rpc.InvokerProxy;
import io.netty.channel.Channel;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;

/* loaded from: input_file:com/huawei/middleware/dtm/rpc/consumer/MessageSendRunner.class */
public class MessageSendRunner implements Runnable {
    private static final Logger logger = DTMLoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private List<RpcMessageEntity> messageEntities = new ArrayList();
    private Set<Channel> channels = new HashSet();

    @Override // java.lang.Runnable
    public void run() {
        logger.info("Start up message sending consumer : {}", Thread.currentThread().getName());
        while (true) {
            try {
                try {
                    this.messageEntities.add(InvokerProxy.RPC_MESSAGE_BUFFER.take());
                    InvokerProxy.RPC_MESSAGE_BUFFER.drainTo(this.messageEntities, InvokerProxy.MAX_MESSAGE_COUNT);
                    this.messageEntities.forEach(rpcMessageEntity -> {
                        Channel channel = rpcMessageEntity.getChannel();
                        this.channels.add(channel);
                        channel.write(rpcMessageEntity.getMessageWrapper());
                    });
                    this.channels.forEach((v0) -> {
                        v0.flush();
                    });
                    if (logger.isDebugEnabled()) {
                        logger.debug("Message send runner Send {} messages finish.", Integer.valueOf(this.messageEntities.size()));
                    }
                    this.messageEntities.clear();
                    this.channels.clear();
                } catch (Throwable th) {
                    logger.info("Send rpc exception,ignore it: {}", StackTraceUtil.stackTrace(th));
                    this.channels.forEach((v0) -> {
                        v0.flush();
                    });
                    if (logger.isDebugEnabled()) {
                        logger.debug("Message send runner Send {} messages finish.", Integer.valueOf(this.messageEntities.size()));
                    }
                    this.messageEntities.clear();
                    this.channels.clear();
                }
            } catch (Throwable th2) {
                this.channels.forEach((v0) -> {
                    v0.flush();
                });
                if (logger.isDebugEnabled()) {
                    logger.debug("Message send runner Send {} messages finish.", Integer.valueOf(this.messageEntities.size()));
                }
                this.messageEntities.clear();
                this.channels.clear();
                throw th2;
            }
        }
    }
}
