package org.apache.flink.runtime.blob;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.security.MessageDigest;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobOutputStream.class */
final class BlobOutputStream extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlobOutputStream.class);
    private final BlobKey.BlobType blobType;
    private final OutputStream socketStream;
    private final Socket socket;
    private final MessageDigest md;
    private final String securityCookie;
    private final boolean securityEnable;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobOutputStream(JobID jobID, BlobKey.BlobType blobType, Socket socket, boolean z, String str) throws IOException {
        this.blobType = blobType;
        if (socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        this.socket = socket;
        this.socketStream = socket.getOutputStream();
        this.md = BlobUtils.createMessageDigest();
        this.securityEnable = z;
        this.securityCookie = str;
        sendPutHeader(this.socketStream, jobID, blobType, z, str);
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        BlobUtils.writeLength(1, this.socketStream);
        this.socketStream.write(i);
        this.md.update((byte) i);
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        int i3 = i2;
        while (i3 > 0) {
            int min = Math.min(65536, i3);
            BlobUtils.writeLength(min, this.socketStream);
            this.socketStream.write(bArr, i, min);
            this.md.update(bArr, i, min);
            i3 -= min;
            i += min;
        }
    }

    public BlobKey finish() throws IOException {
        BlobUtils.writeLength(-1, this.socketStream);
        return receiveAndCheckPutResponse(this.socket.getInputStream(), this.md, this.blobType);
    }

    private static void sendPutHeader(OutputStream outputStream, @Nullable JobID jobID, BlobKey.BlobType blobType, boolean z, @Nullable String str) throws IOException {
        if (z) {
            BlobClient.sendSecurityCookie(outputStream, str);
        }
        outputStream.write(0);
        if (jobID == null) {
            outputStream.write(0);
        } else {
            outputStream.write(2);
            outputStream.write(jobID.getBytes());
        }
        outputStream.write(blobType.ordinal());
    }

    private static BlobKey receiveAndCheckPutResponse(InputStream inputStream, MessageDigest messageDigest, BlobKey.BlobType blobType) throws IOException {
        int read = inputStream.read();
        if (read < 0) {
            throw new EOFException("Premature end of response");
        }
        if (read != 0) {
            if (read != 1) {
                throw new IOException("Unrecognized response: " + read + '.');
            }
            Throwable readExceptionFromStream = BlobUtils.readExceptionFromStream(inputStream);
            throw new IOException("Server side error: " + readExceptionFromStream.getMessage(), readExceptionFromStream);
        }
        BlobKey readFromInputStream = BlobKey.readFromInputStream(inputStream);
        byte[] digest = messageDigest.digest();
        if (blobType != readFromInputStream.getType()) {
            throw new IOException("Detected data corruption during transfer");
        }
        if (Arrays.equals(digest, readFromInputStream.getHash())) {
            return readFromInputStream;
        }
        throw new IOException("Detected data corruption during transfer");
    }
}
