package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobClient.class */
public final class BlobClient implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlobClient.class);
    private final Socket socket;
    private String securityCookie;
    private boolean securityEnable;

    public BlobClient(InetSocketAddress inetSocketAddress, Configuration configuration) throws IOException {
        Socket socket;
        this.securityCookie = "";
        this.securityEnable = false;
        if (configuration != null) {
            try {
                this.securityEnable = configuration.getBoolean(ConfigConstants.SECURITY_ENABLE, false);
                if (this.securityEnable) {
                    this.securityCookie = configuration.getString(ConfigConstants.SECURITY_COOKIE, "");
                    if (this.securityCookie.equals("")) {
                        throw new IllegalConfigurationException("security.cookie must be configured");
                    }
                }
            } catch (Exception e) {
                BlobUtils.closeSilently(null, LOG);
                throw new IOException("Could not connect to BlobServer at address " + inetSocketAddress, e);
            }
        }
        if (SSLUtils.isInternalSSLEnabled(configuration) && configuration.getBoolean(BlobServerOptions.SSL_ENABLED)) {
            LOG.info("Using ssl connection to the blob server");
            socket = SSLUtils.createSSLClientSocketFactory(configuration).createSocket();
        } else {
            socket = new Socket();
        }
        socket.connect(inetSocketAddress, configuration.getInteger(BlobServerOptions.CONNECT_TIMEOUT));
        socket.setSoTimeout(configuration.getInteger(BlobServerOptions.SO_TIMEOUT));
        this.socket = socket;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void downloadFromBlobServer(@Nullable JobID jobID, BlobKey blobKey, File file, InetSocketAddress inetSocketAddress, Configuration configuration, int i) throws IOException {
        byte[] bArr = new byte[65536];
        LOG.info("Downloading {}/{} from {}", jobID, blobKey, inetSocketAddress);
        int i2 = 0;
        while (true) {
            try {
                BlobClient blobClient = new BlobClient(inetSocketAddress, configuration);
                Throwable th = null;
                try {
                    InputStream internal = blobClient.getInternal(jobID, blobKey);
                    Throwable th2 = null;
                    try {
                        FileOutputStream fileOutputStream = new FileOutputStream(file);
                        Throwable th3 = null;
                        while (true) {
                            try {
                                try {
                                    int read = internal.read(bArr);
                                    if (read < 0) {
                                        break;
                                    } else {
                                        fileOutputStream.write(bArr, 0, read);
                                    }
                                } catch (Throwable th4) {
                                    th3 = th4;
                                    throw th4;
                                }
                            } catch (Throwable th5) {
                                if (fileOutputStream != null) {
                                    if (th3 != null) {
                                        try {
                                            fileOutputStream.close();
                                        } catch (Throwable th6) {
                                            th3.addSuppressed(th6);
                                        }
                                    } else {
                                        fileOutputStream.close();
                                    }
                                }
                                throw th5;
                            }
                        }
                        if (fileOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    fileOutputStream.close();
                                } catch (Throwable th7) {
                                    th3.addSuppressed(th7);
                                }
                            } else {
                                fileOutputStream.close();
                            }
                        }
                        if (blobClient != null) {
                            if (th != null) {
                                try {
                                    return;
                                } catch (Throwable th8) {
                                    return;
                                }
                            }
                            return;
                        }
                        return;
                    } finally {
                        if (internal != null) {
                            if (0 != 0) {
                                try {
                                    internal.close();
                                } catch (Throwable th9) {
                                    th2.addSuppressed(th9);
                                }
                            } else {
                                internal.close();
                            }
                        }
                    }
                } finally {
                    if (blobClient != null) {
                        if (0 == 0) {
                            blobClient.close();
                            break;
                        }
                        try {
                            blobClient.close();
                            break;
                        } catch (Throwable th82) {
                            th.addSuppressed(th82);
                        }
                    } else {
                        break;
                    }
                }
            } catch (Throwable th10) {
                String str = "Failed to fetch BLOB " + jobID + "/" + blobKey + " from " + inetSocketAddress + " and store it under " + file.getAbsolutePath();
                if (i2 >= i) {
                    LOG.error(str + " No retries left.", th10);
                    throw new IOException(str, th10);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.error(str + " Retrying...", th10);
                } else {
                    LOG.error(str + " Retrying...");
                }
                i2++;
                LOG.info("Downloading {}/{} from {} (retry {})", jobID, blobKey, inetSocketAddress, Integer.valueOf(i2));
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.socket.close();
    }

    public boolean isClosed() {
        return this.socket.isClosed();
    }

    InputStream getInternal(@Nullable JobID jobID, BlobKey blobKey) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("GET BLOB {}/{} from {}.", jobID, blobKey, this.socket.getLocalSocketAddress());
        }
        try {
            OutputStream outputStream = this.socket.getOutputStream();
            InputStream inputStream = this.socket.getInputStream();
            sendGetHeader(outputStream, jobID, blobKey);
            receiveAndCheckGetResponse(inputStream);
            return new BlobInputStream(inputStream, blobKey, outputStream);
        } catch (Throwable th) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("GET operation failed: " + th.getMessage(), th);
        }
    }

    private void sendGetHeader(OutputStream outputStream, @Nullable JobID jobID, BlobKey blobKey) throws IOException {
        Preconditions.checkNotNull(blobKey);
        Preconditions.checkArgument(jobID != null || (blobKey instanceof TransientBlobKey), "permanent BLOBs must be job-related");
        if (this.securityEnable) {
            sendSecurityCookie(outputStream, this.securityCookie);
        }
        outputStream.write(1);
        if (jobID == null) {
            outputStream.write(0);
        } else {
            outputStream.write(2);
            outputStream.write(jobID.getBytes());
        }
        blobKey.writeToOutputStream(outputStream);
    }

    public static void sendSecurityCookie(OutputStream outputStream, String str) throws IOException {
        outputStream.write(30);
        byte[] bytes = str.getBytes(ConfigConstants.DEFAULT_CHARSET);
        BlobUtils.writeLength(bytes.length, outputStream);
        outputStream.write(bytes);
    }

    private static void receiveAndCheckGetResponse(InputStream inputStream) throws IOException {
        int read = inputStream.read();
        if (read < 0) {
            throw new EOFException("Premature end of response");
        }
        if (read == 1) {
            Throwable readExceptionFromStream = BlobUtils.readExceptionFromStream(inputStream);
            throw new IOException("Server side error: " + readExceptionFromStream.getMessage(), readExceptionFromStream);
        }
        if (read != 0) {
            throw new IOException("Unrecognized response");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobKey putBuffer(@Nullable JobID jobID, byte[] bArr, int i, int i2, BlobKey.BlobType blobType) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        Preconditions.checkNotNull(bArr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("PUT BLOB buffer (" + i2 + " bytes) to " + this.socket.getLocalSocketAddress() + ".");
        }
        try {
            BlobOutputStream blobOutputStream = new BlobOutputStream(jobID, blobType, this.socket, this.securityEnable, this.securityCookie);
            Throwable th = null;
            try {
                blobOutputStream.write(bArr, i, i2);
                BlobKey finish = blobOutputStream.finish();
                if (blobOutputStream != null) {
                    if (0 != 0) {
                        try {
                            blobOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        blobOutputStream.close();
                    }
                }
                return finish;
            } finally {
            }
        } catch (Throwable th3) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("PUT operation failed: " + th3.getMessage(), th3);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobKey putInputStream(@Nullable JobID jobID, InputStream inputStream, BlobKey.BlobType blobType) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        Preconditions.checkNotNull(inputStream);
        if (LOG.isDebugEnabled()) {
            LOG.debug("PUT BLOB stream to {}.", this.socket.getLocalSocketAddress());
        }
        try {
            BlobOutputStream blobOutputStream = new BlobOutputStream(jobID, blobType, this.socket, this.securityEnable, this.securityCookie);
            Throwable th = null;
            try {
                IOUtils.copyBytes(inputStream, blobOutputStream, 65536, false);
                BlobKey finish = blobOutputStream.finish();
                if (blobOutputStream != null) {
                    if (0 != 0) {
                        try {
                            blobOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        blobOutputStream.close();
                    }
                }
                return finish;
            } finally {
            }
        } catch (Throwable th3) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("PUT operation failed: " + th3.getMessage(), th3);
        }
    }

    public static List<PermanentBlobKey> uploadFiles(InetSocketAddress inetSocketAddress, Configuration configuration, JobID jobID, List<Path> list) throws IOException {
        Preconditions.checkNotNull(jobID);
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        BlobClient blobClient = new BlobClient(inetSocketAddress, configuration);
        Throwable th = null;
        try {
            try {
                Iterator<Path> it = list.iterator();
                while (it.hasNext()) {
                    arrayList.add(blobClient.uploadFile(jobID, it.next()));
                }
                if (blobClient != null) {
                    if (0 != 0) {
                        try {
                            blobClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        blobClient.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th3) {
            if (blobClient != null) {
                if (th != null) {
                    try {
                        blobClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    blobClient.close();
                }
            }
            throw th3;
        }
    }

    public PermanentBlobKey uploadFile(JobID jobID, Path path) throws IOException {
        FSDataInputStream open = path.getFileSystem().open(path);
        Throwable th = null;
        try {
            try {
                PermanentBlobKey permanentBlobKey = (PermanentBlobKey) putInputStream(jobID, open, BlobKey.BlobType.PERMANENT_BLOB);
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        open.close();
                    }
                }
                return permanentBlobKey;
            } finally {
            }
        } catch (Throwable th3) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    open.close();
                }
            }
            throw th3;
        }
    }
}
