package org.apache.iotdb.db.sync.externalpipe;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.sync.datasource.PipeOpManager;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/externalpipe/ExtPipePluginManager.class */
public class ExtPipePluginManager {
    private static final Logger logger = LoggerFactory.getLogger(ExtPipePluginManager.class);
    private TsFilePipe tsFilePipe;
    private PipeOpManager pipeOpManager;
    private Map<String, ExtPipePlugin> extPipePluginMap;
    private ExecutorService monitorService;
    boolean alive;
    private long lastPipeDataSerialNumber;
    private Map<String, Map<String, AtomicInteger>> writerInvocationFailures;
    private byte[] commitTriggerLocker;
    private long commitTriggerCounter;

    public ExtPipePluginManager(TsFilePipe tsFilePipe) {
        this.extPipePluginMap = new HashMap();
        this.monitorService = IoTDBThreadPoolFactory.newFixedThreadPool(1, "ExtPipePluginManager-monitor");
        this.alive = false;
        this.lastPipeDataSerialNumber = Long.MIN_VALUE;
        this.commitTriggerLocker = new byte[0];
        this.commitTriggerCounter = 0L;
        this.tsFilePipe = tsFilePipe;
        this.pipeOpManager = new PipeOpManager(tsFilePipe);
        this.pipeOpManager.setNewDataEventHandler(this::newDataEventHandler);
    }

    public ExtPipePluginManager(String str, IExternalPipeSinkWriterFactory iExternalPipeSinkWriterFactory, ExtPipePluginConfiguration extPipePluginConfiguration, TsFilePipe tsFilePipe) {
        this(null);
    }

    public void setPipeOpManager(PipeOpManager pipeOpManager) {
        this.pipeOpManager = pipeOpManager;
    }

    public ExtPipePluginManager setTsFilePipe(TsFilePipe tsFilePipe) {
        this.tsFilePipe = tsFilePipe;
        return this;
    }

    public void startExtPipe(String str, Map<String, String> map) throws IOException {
        logger.debug("Enter startExtPipe(), pipeTypeName={}, sinkParams={}.", str, map);
        ExtPipePlugin computeIfAbsent = this.extPipePluginMap.computeIfAbsent(str, str2 -> {
            return new ExtPipePlugin(str, (Map<String, String>) map, this, this.pipeOpManager);
        });
        if (computeIfAbsent.isAlive()) {
            String str3 = "startExtPipe(), External Pipe " + str + "has been alive, can not be started again.";
            logger.error(str3);
            throw new IOException(str3);
        }
        computeIfAbsent.start();
        this.alive = true;
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this.monitorService;
        if (threadPoolExecutor.getActiveCount() <= 0 && threadPoolExecutor.getQueue().size() <= 0) {
            this.monitorService.submit(this::monitorPipeData);
        }
        logger.info("startExtPipe() finish. pipeTypeName={} ", str);
    }

    public void triggerCommit(String str, long j) throws IOException {
        synchronized (this.commitTriggerLocker) {
            this.commitTriggerCounter++;
            this.commitTriggerLocker.notifyAll();
        }
    }

    public int checkCommitIndex() {
        for (String str : this.pipeOpManager.getSgSet()) {
            long j = Long.MAX_VALUE;
            Iterator<ExtPipePlugin> it = this.extPipePluginMap.values().iterator();
            while (it.hasNext()) {
                long dataCommitIndex = it.next().getDataCommitIndex(str);
                if (dataCommitIndex >= 0 && dataCommitIndex < j) {
                    j = dataCommitIndex;
                }
            }
            if (j < WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX && j >= 0) {
                this.pipeOpManager.commitData(str, j);
            }
        }
        return this.pipeOpManager.getInUseOpBlockNum();
    }

    private void newDataEventHandler(String str, long j, long j2) {
        Iterator<ExtPipePlugin> it = this.extPipePluginMap.values().iterator();
        while (it.hasNext()) {
            it.next().notifyNewDataArrive(str, j, j2);
        }
    }

    private void monitorPipeData() {
        Thread.currentThread().setName("ExternalPipe-monitorPipeData-" + Thread.currentThread().getId());
        logger.info("monitorPipeData start. Thread={}", Thread.currentThread().getName());
        if (this.tsFilePipe == null) {
            logger.info("monitorPipeData(), Error! tsFilePipe is null. Thread exit, {}.", Thread.currentThread().getName());
            return;
        }
        while (this.alive) {
            try {
                List<PipeData> pull = this.tsFilePipe.pull(WALNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
                if (pull != null && !pull.isEmpty() && pull.get(pull.size() - 1).getSerialNumber() > this.lastPipeDataSerialNumber) {
                    Iterator<PipeData> it = pull.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        PipeData next = it.next();
                        long serialNumber = next.getSerialNumber();
                        if (serialNumber > this.lastPipeDataSerialNumber) {
                            if (next instanceof TsFilePipeData) {
                                TsFilePipeData tsFilePipeData = (TsFilePipeData) next;
                                String storageGroupName = tsFilePipeData.getStorageGroupName();
                                String tsFilePath = tsFilePipeData.getTsFilePath();
                                try {
                                    this.pipeOpManager.appendTsFileOpBlock(storageGroupName, tsFilePath, tsFilePipeData.getModsFilePath(), serialNumber);
                                    this.lastPipeDataSerialNumber = serialNumber;
                                } catch (IOException e) {
                                    logger.error("monitorPipeData(), Can not append TsFile: {}" + tsFilePath);
                                }
                            } else if (next instanceof DeletionPipeData) {
                                if (this.pipeOpManager.isEmpty()) {
                                    DeletionPipeData deletionPipeData = (DeletionPipeData) next;
                                    this.pipeOpManager.appendDeletionOpBlock(deletionPipeData.getStorageGroup(), deletionPipeData.getDeletion(), serialNumber);
                                    this.lastPipeDataSerialNumber = next.getSerialNumber();
                                }
                            }
                        }
                    }
                }
                synchronized (this.commitTriggerLocker) {
                    if (this.commitTriggerCounter <= 0) {
                        try {
                            this.commitTriggerLocker.wait(2000L);
                        } catch (InterruptedException e2) {
                        }
                    }
                    this.commitTriggerCounter = 0L;
                }
                checkCommitIndex();
            } catch (Throwable th) {
                logger.error("monitorPipeData() Exception: ", th);
            }
        }
        logger.info("monitorPipeData() exits. Thread={}", Thread.currentThread().getName());
    }

    public void stopExtPipe(String str) {
        logger.info("ExtPipePluginManager stop({}).", str);
        ExtPipePlugin extPipePlugin = this.extPipePluginMap.get(str);
        if (extPipePlugin == null) {
            logger.error("ExtPipePluginManager stop(), invalid extPipeTypeName={}", str);
        } else {
            extPipePlugin.stop();
        }
    }

    private void stopAllThreadPool() {
        this.alive = false;
        this.monitorService.shutdown();
        boolean z = false;
        try {
            try {
                z = this.monitorService.awaitTermination(2L, TimeUnit.SECONDS);
                if (!z) {
                    logger.warn("stopAllThreadPool(), for monitorService. Graceful shutdown timed out, so force shutdown.");
                    this.monitorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                logger.error("stopAllThreadPool(), Interrupted when terminating monitorService, ", e);
                if (!z) {
                    logger.warn("stopAllThreadPool(), for monitorService. Graceful shutdown timed out, so force shutdown.");
                    this.monitorService.shutdownNow();
                }
            }
        } catch (Throwable th) {
            if (!z) {
                logger.warn("stopAllThreadPool(), for monitorService. Graceful shutdown timed out, so force shutdown.");
                this.monitorService.shutdownNow();
            }
            throw th;
        }
    }

    public void dropExtPipe(String str) {
        logger.info("ExtPipePluginManager drop {}.", str);
        ExtPipePlugin extPipePlugin = this.extPipePluginMap.get(str);
        if (extPipePlugin == null) {
            logger.error("ExtPipePluginManager dropExtPipe(), invalid pipeTypeName={}", str);
            return;
        }
        if (extPipePlugin.isAlive()) {
            extPipePlugin.stop();
        }
        this.extPipePluginMap.remove(str);
        if (this.extPipePluginMap.size() <= 0) {
            stopAllThreadPool();
            if (this.pipeOpManager != null) {
                this.pipeOpManager.close();
                this.pipeOpManager = null;
            }
        }
    }

    public PipeOpManager getPipeOpManager() {
        return this.pipeOpManager;
    }

    public ExternalPipeStatus getExternalPipeStatus(String str) {
        ExtPipePlugin extPipePlugin = this.extPipePluginMap.get(str);
        if (extPipePlugin == null) {
            return null;
        }
        return extPipePlugin.getStatus();
    }
}
