package org.apache.iotdb.db.protocol.mqtt;

import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.control.clientsession.MqttClientSession;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.class */
public class MPPPublishHandler extends AbstractInterceptHandler {
    private static final Logger LOG = LoggerFactory.getLogger(MPPPublishHandler.class);
    public static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("IoTDB_AUDIT_LOGGER");
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private final PayloadFormatter payloadFormat;
    private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
    private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap = new ConcurrentHashMap<>();
    private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
    private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();

    public MPPPublishHandler(IoTDBConfig ioTDBConfig) {
        this.payloadFormat = PayloadFormatManager.getPayloadFormat(ioTDBConfig.getMqttPayloadFormatter());
    }

    public String getID() {
        return "iotdb-mqtt-broker-listener";
    }

    public void onConnect(InterceptConnectMessage interceptConnectMessage) {
        if (this.clientIdToSessionMap.containsKey(interceptConnectMessage.getClientID())) {
            return;
        }
        try {
            MqttClientSession mqttClientSession = new MqttClientSession(interceptConnectMessage.getClientID());
            this.SESSION_MANAGER.login(mqttClientSession, interceptConnectMessage.getUsername(), new String(interceptConnectMessage.getPassword()), ZoneId.systemDefault().toString(), TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, IoTDBConstant.ClientVersion.V_1_0);
            this.clientIdToSessionMap.put(interceptConnectMessage.getClientID(), mqttClientSession);
        } catch (TException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void onDisconnect(InterceptDisconnectMessage interceptDisconnectMessage) {
        MqttClientSession remove = this.clientIdToSessionMap.remove(interceptDisconnectMessage.getClientID());
        if (null != remove) {
            SessionManager sessionManager = this.SESSION_MANAGER;
            Coordinator coordinator = Coordinator.getInstance();
            Objects.requireNonNull(coordinator);
            sessionManager.closeSession(remove, coordinator::cleanupQueryExecution);
        }
    }

    public void onPublish(InterceptPublishMessage interceptPublishMessage) {
        String clientID = interceptPublishMessage.getClientID();
        if (this.clientIdToSessionMap.containsKey(clientID)) {
            MqttClientSession mqttClientSession = this.clientIdToSessionMap.get(interceptPublishMessage.getClientID());
            ByteBuf payload = interceptPublishMessage.getPayload();
            LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", new Object[]{clientID, interceptPublishMessage.getUsername(), interceptPublishMessage.getQos(), interceptPublishMessage.getTopicName(), payload});
            List<Message> format = this.payloadFormat.format(payload);
            if (format == null) {
                return;
            }
            for (Message message : format) {
                if (message != null) {
                    TSStatus tSStatus = null;
                    try {
                        InsertRowStatement insertRowStatement = new InsertRowStatement();
                        insertRowStatement.setDevicePath(new PartialPath(message.getDevice()));
                        insertRowStatement.setTime(message.getTimestamp().longValue());
                        insertRowStatement.setMeasurements((String[]) message.getMeasurements().toArray(new String[0]));
                        if (message.getDataTypes() == null) {
                            insertRowStatement.setDataTypes(new TSDataType[message.getMeasurements().size()]);
                            insertRowStatement.setValues(message.getValues().toArray(new Object[0]));
                            insertRowStatement.setNeedInferType(true);
                        } else {
                            List<TSDataType> dataTypes = message.getDataTypes();
                            List<String> values = message.getValues();
                            Object[] objArr = new Object[values.size()];
                            for (int i = 0; i < values.size(); i++) {
                                objArr[i] = CommonUtils.parseValue(dataTypes.get(i), values.get(i));
                            }
                            insertRowStatement.setDataTypes((TSDataType[]) dataTypes.toArray(new TSDataType[0]));
                            insertRowStatement.setValues(objArr);
                        }
                        insertRowStatement.setAligned(false);
                        tSStatus = AuthorityChecker.checkAuthority(insertRowStatement, mqttClientSession);
                        if (tSStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                            LOG.warn(tSStatus.message);
                        } else {
                            tSStatus = Coordinator.getInstance().execute(insertRowStatement, this.SESSION_MANAGER.requestQueryId(), this.SESSION_MANAGER.getSessionInfo(mqttClientSession), "", this.partitionFetcher, this.schemaFetcher, config.getQueryTimeoutThreshold()).status;
                        }
                    } catch (Exception e) {
                        LOG.warn("meet error when inserting device {}, measurements {}, at time {}, because ", new Object[]{message.getDevice(), message.getMeasurements(), message.getTimestamp(), e});
                    }
                    LOG.debug("event process result: {}", tSStatus);
                }
            }
        }
    }

    public void onSessionLoopError(Throwable th) {
    }
}
