package org.apache.hudi.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.NanoTimerGauge;
import org.apache.flink.runtime.metrics.SettableGauge;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MultiStreamMergeWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.GlobalPartitionMetadataUtils;
import org.apache.hudi.sink.aggregate.GetLatestFileSlice;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction.class */
public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class);
    private transient Map<String, DataBucket> buckets;
    private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
    private transient TotalSizeTracer tracer;
    private boolean supportFilter;
    private boolean printWarnLog;
    private boolean filterDeleteRecord;
    private transient boolean alignmentIndex;
    private transient GlobalAggregateManager aggregateManager;
    private transient List<FileSlice> latestFileSlices;
    private transient boolean listStatusOptimized;
    private transient String basePath;
    private Schema schema;
    protected transient Counter consumedRecords;
    protected transient Counter timerCounter;
    protected transient NanoTimerGauge timer;
    protected transient SettableGauge bufferSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hudi.sink.StreamWriteFunction$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hudi$common$model$WriteOperationType = new int[WriteOperationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.UPSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT_OVERWRITE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.INSERT_OVERWRITE_TABLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$BufferSizeDetector.class */
    public static class BufferSizeDetector {
        private static final int DENOMINATOR = 100;
        private final double batchSizeBytes;
        private final Random random = new Random(47);
        private long lastRecordSize = -1;
        private long totalSize = 0;

        BufferSizeDetector(double d) {
            this.batchSizeBytes = d * 1024.0d * 1024.0d;
        }

        boolean detect(Object obj) {
            if (this.lastRecordSize == -1 || sampling()) {
                this.lastRecordSize = ObjectSizeCalculator.getObjectSize(obj);
            }
            this.totalSize += this.lastRecordSize;
            return ((double) this.totalSize) > this.batchSizeBytes;
        }

        boolean sampling() {
            return this.random.nextInt(DENOMINATOR) == 1;
        }

        void reset() {
            this.lastRecordSize = -1L;
            this.totalSize = 0L;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$DataBucket.class */
    public static class DataBucket {
        private final List<DataItem> records;
        private final BufferSizeDetector detector;
        private final String partitionPath;
        private final String fileID;

        private DataBucket(Double d, HoodieRecord<?> hoodieRecord) {
            this.records = new ArrayList();
            this.detector = new BufferSizeDetector(d.doubleValue());
            this.partitionPath = hoodieRecord.getPartitionPath();
            this.fileID = hoodieRecord.getCurrentLocation().getFileId();
        }

        public List<HoodieRecord> writeBuffer() {
            return (List) this.records.stream().map(dataItem -> {
                return dataItem.toHoodieRecord(this.partitionPath);
            }).collect(Collectors.toList());
        }

        public void preWrite(List<HoodieRecord> list, boolean z) {
            if (z) {
                list.stream().forEach(hoodieRecord -> {
                    if (this.fileID.equals(hoodieRecord.getCurrentLocation().getFileId())) {
                        return;
                    }
                    StreamWriteFunction.LOG.error("preWrite finds incorrect data. {} {}", this.fileID, hoodieRecord);
                    throw new HoodieDuplicateKeyException("preWrite finds incorrect data." + this.fileID + hoodieRecord);
                });
            }
            HoodieRecord hoodieRecord2 = list.get(0);
            HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(hoodieRecord2.getKey(), (HoodieRecordPayload) hoodieRecord2.getData(), hoodieRecord2.getOperation());
            hoodieAvroRecord.setCurrentLocation(new HoodieRecordLocation(hoodieRecord2.getCurrentLocation().getInstantTime(), this.fileID));
            list.set(0, hoodieAvroRecord);
        }

        public void reset() {
            this.records.clear();
            this.detector.reset();
        }

        /* synthetic */ DataBucket(Double d, HoodieRecord hoodieRecord, AnonymousClass1 anonymousClass1) {
            this(d, hoodieRecord);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$DataItem.class */
    public static class DataItem {
        private final String key;
        private final String instant;
        private final HoodieRecordPayload<?> data;
        private final HoodieOperation operation;
        private final String fileId;

        private DataItem(String str, String str2, HoodieRecordPayload<?> hoodieRecordPayload, HoodieOperation hoodieOperation, String str3) {
            this.key = str;
            this.instant = str2;
            this.data = hoodieRecordPayload;
            this.operation = hoodieOperation;
            this.fileId = str3;
        }

        public static DataItem fromHoodieRecord(HoodieRecord<?> hoodieRecord, boolean z) {
            return new DataItem(hoodieRecord.getRecordKey(), hoodieRecord.getCurrentLocation().getInstantTime(), ((HoodieAvroRecord) hoodieRecord).getData(), hoodieRecord.getOperation(), z ? hoodieRecord.getCurrentLocation().getFileId() : null);
        }

        public HoodieRecord<?> toHoodieRecord(String str) {
            HoodieAvroRecord hoodieAvroRecord = new HoodieAvroRecord(new HoodieKey(this.key, str), this.data, this.operation);
            hoodieAvroRecord.setCurrentLocation(new HoodieRecordLocation(this.instant, this.fileId));
            return hoodieAvroRecord;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hudi/sink/StreamWriteFunction$TotalSizeTracer.class */
    public static class TotalSizeTracer {
        private long bufferSize = 0;
        private final double maxBufferSize;

        TotalSizeTracer(Configuration configuration) {
            this.maxBufferSize = ((configuration.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - 100) - configuration.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY)) * 1024.0d * 1024.0d;
            ValidationUtils.checkState(this.maxBufferSize > 0.0d, String.format("'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key()));
        }

        boolean trace(long j) {
            this.bufferSize += j;
            return ((double) this.bufferSize) > this.maxBufferSize;
        }

        void countDown(long j) {
            this.bufferSize -= j;
        }

        public void reset() {
            this.bufferSize = 0L;
        }
    }

    public StreamWriteFunction(Configuration configuration) {
        super(configuration);
        this.supportFilter = false;
        this.printWarnLog = true;
        this.filterDeleteRecord = false;
    }

    public void open(Configuration configuration) throws IOException {
        this.tracer = new TotalSizeTracer(this.config);
        initBuffer();
        initWriteFunction();
        this.supportFilter = !this.config.getBoolean(FlinkOptions.CHANGELOG_ENABLED) && ((String) this.config.get(FlinkOptions.TABLE_TYPE)).equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        if (this.supportFilter) {
            this.filterDeleteRecord = this.config.getBoolean(FlinkOptions.FILTER_DELETE_RECORD);
            LOG.warn("filter delete record is {}", Boolean.valueOf(this.filterDeleteRecord));
        }
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
        this.consumedRecords = getRuntimeContext().getMetricGroup().counter("consumedBufferRecords");
        this.timerCounter = getRuntimeContext().getMetricGroup().counter("bufferConsumptionTimeNs");
        this.bufferSize = new SettableGauge();
        this.timer = new NanoTimerGauge(this.timerCounter);
        getRuntimeContext().getMetricGroup().gauge("bufferConsumptionTimeNsPerSecond", this.timer);
        getRuntimeContext().getMetricGroup().gauge("currentBufferSize", this.bufferSize);
        this.schema = new Schema.Parser().parse(this.writeClient.getConfig().getWriteSchema());
        this.alignmentIndex = this.config.getBoolean(FlinkOptions.INDEX_ALIGNMENT_ENABLE) && OptionsResolver.isFlinkInMemoryIndex(this.config);
        this.basePath = this.writeClient.getConfig().getBasePath();
        this.listStatusOptimized = this.config.getBoolean(FlinkOptions.WRITE_LISTSTATUS_OPTIMIZED) && WriteOperationType.fromValue((String) this.config.get(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT && ((String) this.config.get(FlinkOptions.TABLE_TYPE)).equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
        if (this.listStatusOptimized) {
            this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
        }
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction
    public void snapshotStateInternal(FunctionSnapshotContext functionSnapshotContext) throws IOException {
        getLatestFileSlice(functionSnapshotContext.getCheckpointId());
        flushRemaining(false);
        this.latestFileSlices = null;
    }

    private void getLatestFileSlice(long j) throws IOException {
        if (!this.listStatusOptimized || this.buckets.isEmpty()) {
            return;
        }
        Map map = (Map) this.buckets.values().stream().collect(Collectors.toMap(dataBucket -> {
            return this.basePath + dataBucket.partitionPath;
        }, dataBucket2 -> {
            HashSet hashSet = new HashSet();
            hashSet.add(dataBucket2.fileID);
            return hashSet;
        }, (set, set2) -> {
            set.addAll(set2);
            return set;
        }));
        if (map.isEmpty()) {
            return;
        }
        this.latestFileSlices = (List) this.aggregateManager.updateGlobalAggregate(GetLatestFileSlice.NAME + getRuntimeContext().getIndexOfThisSubtask(), new Tuple3(Long.valueOf(j), map, this.basePath), new GetLatestFileSlice());
        LOG.info("Obtain {} fileSlices from JobManager.", Integer.valueOf(this.latestFileSlices.size()));
    }

    public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
        this.timer.markStart();
        bufferRecord((HoodieRecord) i);
        this.timer.markEnd();
    }

    public void close() {
        this.timer = null;
        this.timerCounter = null;
        this.bufferSize = null;
        this.consumedRecords = null;
        if (this.writeClient != null) {
            this.writeClient.cleanHandlesGracefully();
            this.writeClient.close();
        }
    }

    @Override // org.apache.hudi.sink.common.AbstractStreamWriteFunction, org.apache.hudi.sink.common.AbstractWriteFunction
    public void endInput() {
        super.endInput();
        flushRemaining(true);
        this.writeClient.cleanHandles();
        this.writeStatuses.clear();
    }

    @VisibleForTesting
    public Map<String, List<HoodieRecord>> getDataBuffer() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, DataBucket> entry : this.buckets.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().writeBuffer());
        }
        return hashMap;
    }

    private void initBuffer() {
        this.buckets = new LinkedHashMap();
    }

    private void initWriteFunction() {
        String str = (String) this.config.get(FlinkOptions.OPERATION);
        switch (AnonymousClass1.$SwitchMap$org$apache$hudi$common$model$WriteOperationType[WriteOperationType.fromValue(str).ordinal()]) {
            case 1:
                this.writeFunction = (list, str2) -> {
                    return this.writeClient.insert(list, str2);
                };
                return;
            case 2:
                this.writeFunction = (list2, str3) -> {
                    return this.writeClient.upsert(list2, str3);
                };
                return;
            case 3:
                this.writeFunction = (list3, str4) -> {
                    return this.writeClient.insertOverwrite(list3, str4);
                };
                return;
            case 4:
                this.writeFunction = (list4, str5) -> {
                    return this.writeClient.insertOverwriteTable(list4, str5);
                };
                return;
            default:
                throw new RuntimeException("Unsupported write operation : " + str);
        }
    }

    private String getBucketID(HoodieRecord<?> hoodieRecord) {
        return StreamerUtil.generateBucketKey(hoodieRecord.getPartitionPath(), hoodieRecord.getCurrentLocation().getFileId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean filterDeleteRecord(HoodieRecord<?> hoodieRecord) {
        if (this.supportFilter && this.printWarnLog && HoodieOperation.DELETE.equals(hoodieRecord.getOperation())) {
            LOG.warn("MOR {} has delete record!!!", getRuntimeContext().getTaskName());
            this.printWarnLog = false;
        }
        return this.filterDeleteRecord && HoodieOperation.DELETE.equals(hoodieRecord.getOperation());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void bufferRecord(HoodieRecord<?> hoodieRecord) {
        if (filterDeleteRecord(hoodieRecord)) {
            return;
        }
        String bucketID = getBucketID(hoodieRecord);
        if (this.alignmentIndex && KeyGroupRangeAssignment.assignKeyToParallelOperator(hoodieRecord.getCurrentLocation().getFileId(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks()) != getRuntimeContext().getIndexOfThisSubtask()) {
            LOG.warn("The stream write {}#{}#{} task should not process the {}.", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks()), Integer.valueOf(getRuntimeContext().getMaxNumberOfParallelSubtasks()), hoodieRecord});
            throw new HoodieDuplicateKeyException(hoodieRecord.toString());
        }
        DataBucket computeIfAbsent = this.buckets.computeIfAbsent(bucketID, str -> {
            return new DataBucket(Double.valueOf(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)), hoodieRecord, null);
        });
        DataItem fromHoodieRecord = DataItem.fromHoodieRecord(hoodieRecord, this.alignmentIndex);
        computeIfAbsent.records.add(fromHoodieRecord);
        this.bufferSize.setValue(this.bufferSize.getValue().doubleValue() + 1.0d);
        boolean detect = computeIfAbsent.detector.detect(fromHoodieRecord);
        boolean trace = this.tracer.trace(computeIfAbsent.detector.lastRecordSize);
        if (detect) {
            int size = computeIfAbsent.records.size();
            if (flushBucket(computeIfAbsent)) {
                this.consumedRecords.inc(size);
                this.bufferSize.setValue(this.bufferSize.getValue().doubleValue() - size);
                this.tracer.countDown(computeIfAbsent.detector.totalSize);
                computeIfAbsent.reset();
                return;
            }
            return;
        }
        if (trace) {
            DataBucket orElseThrow = this.buckets.values().stream().max(Comparator.comparingLong(dataBucket -> {
                return dataBucket.detector.totalSize;
            })).orElseThrow(RuntimeException::new);
            int size2 = orElseThrow.records.size();
            if (!flushBucket(orElseThrow)) {
                LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", Double.valueOf(this.tracer.maxBufferSize));
                return;
            }
            this.consumedRecords.inc(size2);
            this.bufferSize.setValue(this.bufferSize.getValue().doubleValue() - size2);
            this.tracer.countDown(orElseThrow.detector.totalSize);
            orElseThrow.reset();
        }
    }

    private boolean hasData() {
        return this.buckets.size() > 0 && this.buckets.values().stream().anyMatch(dataBucket -> {
            return dataBucket.records.size() > 0;
        });
    }

    private boolean flushBucket(DataBucket dataBucket) {
        String instantToWrite = instantToWrite(true);
        if (instantToWrite == null) {
            LOG.info("No inflight instant when flushing data, skip.");
            return false;
        }
        List<HoodieRecord> writeBuffer = dataBucket.writeBuffer();
        ValidationUtils.checkState(writeBuffer.size() > 0, "Data bucket to flush has no buffering records");
        if (this.config.getBoolean(FlinkOptions.PRE_COMBINE)) {
            writeBuffer = (OptionsResolver.isCowTable(this.config) && MultiStreamMergeWithLatestAvroPayload.class.getName().equals(this.config.getString(FlinkOptions.PAYLOAD_CLASS_NAME))) ? FlinkWriteHelper.newInstance().deduplicateRecordsByMerge(writeBuffer, this.schema, this.config.getString(FlinkOptions.MULTI_STREAM_MERGE_GROUP_FIELD)) : FlinkWriteHelper.newInstance().deduplicateRecords(writeBuffer, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema());
        }
        dataBucket.preWrite(writeBuffer, this.alignmentIndex);
        ArrayList arrayList = new ArrayList(this.writeFunction.apply(writeBuffer, instantToWrite));
        if (GlobalPartitionMetadataUtils.isHiddenPartitionEnabled(this.basePath, this.writeClient.getEngineContext().getHadoopConf().get())) {
            List list = (List) arrayList.stream().map((v0) -> {
                return v0.getPartitionPath();
            }).collect(Collectors.toList());
            try {
                GlobalPartitionMetadataUtils.addNewPaths(this.basePath, this.writeClient.getEngineContext().getHadoopConf().get(), list);
            } catch (IOException e) {
                LOG.error("Error adding paths: " + list + " to metadata file /.hoodie/.hoodie_global_partition_metadata");
            }
        }
        writeBuffer.clear();
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(instantToWrite).writeStatus(arrayList).lastBatch(false).endInput(false).build());
        this.writeStatuses.addAll(arrayList);
        return true;
    }

    private void flushRemaining(boolean z) {
        List<WriteStatus> emptyList;
        this.currentInstant = instantToWrite(hasData());
        if (this.currentInstant == null) {
            throw new HoodieException("No inflight instant when flushing data!");
        }
        if (this.buckets.size() > 0) {
            this.timer.markStart();
            emptyList = new ArrayList();
            this.buckets.values().forEach(dataBucket -> {
                List<HoodieRecord> writeBuffer = dataBucket.writeBuffer();
                if (writeBuffer.size() > 0) {
                    long size = writeBuffer.size();
                    if (this.config.getBoolean(FlinkOptions.PRE_COMBINE)) {
                        writeBuffer = (OptionsResolver.isCowTable(this.config) && MultiStreamMergeWithLatestAvroPayload.class.getName().equals(this.config.getString(FlinkOptions.PAYLOAD_CLASS_NAME))) ? FlinkWriteHelper.newInstance().deduplicateRecordsByMerge(writeBuffer, this.schema, this.config.getString(FlinkOptions.MULTI_STREAM_MERGE_GROUP_FIELD)) : FlinkWriteHelper.newInstance().deduplicateRecords(writeBuffer, (HoodieIndex) null, -1, this.writeClient.getConfig().getSchema());
                    }
                    dataBucket.preWrite(writeBuffer, this.alignmentIndex);
                    if (!this.listStatusOptimized || this.latestFileSlices == null || this.latestFileSlices.isEmpty()) {
                        emptyList.addAll(this.writeFunction.apply(writeBuffer, this.currentInstant));
                    } else {
                        List<HoodieRecord> list = writeBuffer;
                        emptyList.addAll(this.writeClient.upsert(writeBuffer, this.currentInstant, Option.ofNullable(this.latestFileSlices.stream().filter(fileSlice -> {
                            return fileSlice.getFileId().equals(((HoodieRecord) list.get(0)).getCurrentLocation().getFileId());
                        }).findFirst().orElse(null))));
                    }
                    this.consumedRecords.inc(size);
                    this.bufferSize.setValue(this.bufferSize.getValue().doubleValue() - size);
                    writeBuffer.clear();
                    dataBucket.reset();
                }
            });
            this.timer.markEnd();
            if (GlobalPartitionMetadataUtils.isHiddenPartitionEnabled(this.basePath, this.writeClient.getEngineContext().getHadoopConf().get())) {
                List list = (List) emptyList.stream().map((v0) -> {
                    return v0.getPartitionPath();
                }).collect(Collectors.toList());
                try {
                    GlobalPartitionMetadataUtils.addNewPaths(this.basePath, this.writeClient.getEngineContext().getHadoopConf().get(), list);
                } catch (IOException e) {
                    LOG.error("Error adding paths: " + list + " to metadata file /.hoodie/.hoodie_global_partition_metadata");
                }
            }
        } else {
            LOG.info("No data to write in subtask [{}] for instant [{}]", Integer.valueOf(this.taskID), this.currentInstant);
            emptyList = Collections.emptyList();
        }
        this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.currentInstant).writeStatus(emptyList).lastBatch(true).endInput(z).build());
        this.buckets.clear();
        this.tracer.reset();
        this.writeClient.cleanHandles();
        this.writeStatuses.addAll(emptyList);
        this.confirming = true;
    }
}
