package com.huawei.cdc.notification;

import com.huawei.cdc.common.conf.CommonConfiguration;
import com.huaweicloud.dis.DIS;
import com.huaweicloud.dis.DISClientBuilder;
import com.huaweicloud.dis.core.util.StringUtils;
import com.huaweicloud.dis.exception.DISClientException;
import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
import com.huaweicloud.dis.iface.data.request.PutRecordsRequestEntry;
import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huawei/cdc/notification/DisNotification.class */
public class DisNotification implements INotification {
    private static final Logger LOG = LoggerFactory.getLogger(DisNotification.class);
    private DIS dis;

    public DisNotification() throws IOException {
        init();
    }

    @Override // com.huawei.cdc.notification.INotification
    public void init() throws IOException {
        this.dis = DISClientBuilder.standard().withEndpoint(CommonConfiguration.DIS_ENDPOINT).withAk(CommonConfiguration.USER_AK).withSk(CommonConfiguration.USER_SK).withProjectId(CommonConfiguration.USER_PROJECT_ID).withRegion(CommonConfiguration.REGION).build();
    }

    @Override // com.huawei.cdc.notification.INotification
    public void sendData(String str, String str2) {
        LOG.info("DISNotification sendData");
        send(str, str2);
    }

    @Override // com.huawei.cdc.notification.INotification
    public void close() {
        LOG.info("DISNotification close");
    }

    private void send(String str, String str2) {
        PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
        putRecordsRequest.setStreamName(str);
        ArrayList arrayList = new ArrayList();
        ByteBuffer wrap = ByteBuffer.wrap(str2.getBytes());
        PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
        putRecordsRequestEntry.setData(wrap);
        putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
        arrayList.add(putRecordsRequestEntry);
        putRecordsRequest.setRecords(arrayList);
        PutRecordsResult putRecordsResult = null;
        try {
            putRecordsResult = this.dis.putRecords(putRecordsRequest);
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        } catch (DISClientException e2) {
            LOG.error("Failed to get a normal response, please check params and retry. Error message [{}]", e2.getMessage(), e2);
        }
        if (putRecordsResult != null) {
            LOG.info("Put {} records[{} successful / {} failed].", new Object[]{Integer.valueOf(putRecordsResult.getRecords().size()), Integer.valueOf(putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get()), putRecordsResult.getFailedRecordCount()});
            for (int i = 0; i < putRecordsResult.getRecords().size(); i++) {
                PutRecordsResultEntry putRecordsResultEntry = (PutRecordsResultEntry) putRecordsResult.getRecords().get(i);
                if (StringUtils.isNullOrEmpty(putRecordsResultEntry.getErrorCode())) {
                    LOG.info("[{}] put success, partitionId [{}], partitionKey [{}], sequenceNumber [{}]", new Object[]{new String(((PutRecordsRequestEntry) arrayList.get(i)).getData().array()), putRecordsResultEntry.getPartitionId(), ((PutRecordsRequestEntry) arrayList.get(i)).getPartitionKey(), putRecordsResultEntry.getSequenceNumber()});
                } else {
                    LOG.error("[{}] put failed, errorCode [{}], errorMessage [{}]", new Object[]{new String(((PutRecordsRequestEntry) arrayList.get(i)).getData().array()), putRecordsResultEntry.getErrorCode(), putRecordsResultEntry.getErrorMessage()});
                }
            }
        }
    }
}
