package org.apache.falcon.catalog;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/falcon/catalog/CatalogPartitionHandler.class */
public class CatalogPartitionHandler implements WorkflowExecutionListener {
    public static final String CATALOG_TABLE = "catalog.table";
    private ExpressionHelper evaluator = ExpressionHelper.get();
    private static final Logger LOG = LoggerFactory.getLogger(CatalogPartitionHandler.class);
    public static final ConfigurationStore STORE = ConfigurationStore.get();
    private static CatalogPartitionHandler catalogInstance = new CatalogPartitionHandler();
    private static final boolean IS_CATALOG_ENABLED = CatalogServiceFactory.isEnabled();
    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
    private static final PathFilter PATH_FILTER = new PathFilter() { // from class: org.apache.falcon.catalog.CatalogPartitionHandler.1
        public boolean accept(Path path) {
            try {
                FileSystem fileSystem = path.getFileSystem(new Configuration());
                if (!path.getName().startsWith("_") && !path.getName().startsWith(".")) {
                    if (!fileSystem.isFile(path)) {
                        return true;
                    }
                }
                return false;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    };

    public static final CatalogPartitionHandler get() {
        return catalogInstance;
    }

    @Override // org.apache.falcon.workflow.WorkflowExecutionListener
    public void onSuccess(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
        if (IS_CATALOG_ENABLED) {
            String[] outputFeedNamesList = workflowExecutionContext.getOutputFeedNamesList();
            String[] outputFeedInstancePathsList = workflowExecutionContext.getOutputFeedInstancePathsList();
            Cluster cluster = (Cluster) STORE.get(EntityType.CLUSTER, workflowExecutionContext.getClusterName());
            Configuration configuration = ClusterHelper.getConfiguration(cluster);
            if (StringUtils.isEmpty(ClusterHelper.getRegistryEndPoint(cluster))) {
                LOG.info("Catalog endpoint not defined for cluster {}. Skipping partition registration", cluster.getName());
                return;
            }
            for (int i = 0; i < outputFeedNamesList.length; i++) {
                LOG.info("Partition handling for feed {} for path {}", outputFeedNamesList[i], outputFeedInstancePathsList[i]);
                Feed feed = (Feed) STORE.get(EntityType.FEED, outputFeedNamesList[i]);
                Storage createStorage = FeedHelper.createStorage(cluster, feed);
                if (createStorage.getType() == Storage.TYPE.TABLE) {
                    LOG.info("Feed {} is already table based. Skipping partition registration", feed.getName());
                } else {
                    CatalogStorage catalogStorageFromFeedProperties = getCatalogStorageFromFeedProperties(feed, cluster, configuration);
                    if (catalogStorageFromFeedProperties == null) {
                        LOG.info("Feed {} doesn't have table defined in its properties/table doesn't exist. Skipping partition registration", feed.getName());
                    } else {
                        Path path = new Path(new Path(outputFeedInstancePathsList[i]).toUri().getPath());
                        String path2 = new Path(createStorage.getUriTemplate(LocationType.DATA)).toUri().getPath();
                        LOG.debug("Template {} catalogInstance path {}", path2, path);
                        Date date = FeedHelper.getDate(path2, path, UTC);
                        if (date == null) {
                            LOG.info("Feed {} catalogInstance path {} doesn't match the template {}. Skipping partition registration", new Object[]{feed.getName(), path, path2});
                        } else {
                            LOG.debug("Reference date from path {} is {}", path, SchemaHelper.formatDateUTC(date));
                            ExpressionHelper.setReferenceDate(date);
                            ArrayList arrayList = new ArrayList();
                            for (Map.Entry<String, String> entry : catalogStorageFromFeedProperties.getPartitions().entrySet()) {
                                LOG.debug("Evaluating partition {}", entry.getValue());
                                arrayList.add(this.evaluator.evaluateFullExpression(entry.getValue(), String.class));
                            }
                            LOG.debug("Static partition - {}", arrayList);
                            WorkflowExecutionContext.EntityOperations operation = workflowExecutionContext.getOperation();
                            switch (operation) {
                                case DELETE:
                                    dropPartitions(configuration, catalogStorageFromFeedProperties, arrayList);
                                    break;
                                case GENERATE:
                                case REPLICATE:
                                    registerPartitions(configuration, catalogStorageFromFeedProperties, path, arrayList);
                                    break;
                                default:
                                    throw new FalconException("Unhandled operation " + operation);
                            }
                        }
                    }
                }
            }
        }
    }

    private void registerPartitions(Configuration configuration, CatalogStorage catalogStorage, Path path, List<String> list) throws FalconException {
        try {
            FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(configuration);
            if (createProxiedFileSystem.exists(path)) {
                int size = getPartitionColumns(configuration, catalogStorage).size() - list.size();
                Path path2 = path;
                if (size > 0) {
                    path2 = new Path(path, StringUtils.repeat("*", "/", size));
                }
                FileStatus[] globStatus = createProxiedFileSystem.globStatus(path2, PATH_FILTER);
                HashMap hashMap = new HashMap();
                for (FileStatus fileStatus : globStatus) {
                    List<String> dynamicPartitions = getDynamicPartitions(fileStatus.getPath(), path);
                    ArrayList arrayList = new ArrayList(list);
                    arrayList.addAll(dynamicPartitions);
                    LOG.debug("Final partition - " + arrayList);
                    hashMap.put(arrayList, fileStatus.getPath().toString());
                }
                List<List<String>> listPartitions = listPartitions(configuration, catalogStorage, list);
                Set keySet = hashMap.keySet();
                Collection subtract = CollectionUtils.subtract(listPartitions, keySet);
                Collection<List<String>> subtract2 = CollectionUtils.subtract(keySet, listPartitions);
                Collection<List<String>> intersection = CollectionUtils.intersection(listPartitions, keySet);
                Iterator it = subtract.iterator();
                while (it.hasNext()) {
                    dropPartitions(configuration, catalogStorage, (List) it.next());
                }
                for (List<String> list2 : subtract2) {
                    addPartition(configuration, catalogStorage, list2, (String) hashMap.get(list2));
                }
                for (List<String> list3 : intersection) {
                    updatePartition(configuration, catalogStorage, list3, (String) hashMap.get(list3));
                }
            }
        } catch (IOException e) {
            throw new FalconException(e);
        }
    }

    private void updatePartition(Configuration configuration, CatalogStorage catalogStorage, List<String> list, String str) throws FalconException {
        CatalogServiceFactory.getCatalogService().updatePartition(configuration, catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable(), list, str);
    }

    private void addPartition(Configuration configuration, CatalogStorage catalogStorage, List<String> list, String str) throws FalconException {
        CatalogServiceFactory.getCatalogService().addPartition(configuration, catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable(), list, str);
    }

    private List<List<String>> listPartitions(Configuration configuration, CatalogStorage catalogStorage, List<String> list) throws FalconException {
        List<CatalogPartition> listPartitions = CatalogServiceFactory.getCatalogService().listPartitions(configuration, catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable(), list);
        ArrayList arrayList = new ArrayList();
        Iterator<CatalogPartition> it = listPartitions.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValues());
        }
        return arrayList;
    }

    protected List<String> getDynamicPartitions(Path path, Path path2) {
        String removeEnd = StringUtils.removeEnd(StringUtils.removeStart(path.toUri().getPath().substring(path2.toString().length()), "/"), "/");
        return StringUtils.isEmpty(removeEnd) ? new ArrayList() : Arrays.asList(removeEnd.split("/"));
    }

    private List<String> getPartitionColumns(Configuration configuration, CatalogStorage catalogStorage) throws FalconException {
        return CatalogServiceFactory.getCatalogService().getPartitionColumns(configuration, catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable());
    }

    private void dropPartitions(Configuration configuration, CatalogStorage catalogStorage, List<String> list) throws FalconException {
        CatalogServiceFactory.getCatalogService().dropPartitions(configuration, catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable(), list, false);
    }

    protected CatalogStorage getCatalogStorageFromFeedProperties(Feed feed, Cluster cluster, Configuration configuration) throws FalconException {
        String property = FeedHelper.getFeedProperties(feed).getProperty(CATALOG_TABLE);
        if (property == null) {
            return null;
        }
        CatalogTable catalogTable = new CatalogTable();
        catalogTable.setUri(property.replace(VectorFormat.DEFAULT_PREFIX, "${"));
        try {
            CatalogStorage catalogStorage = new CatalogStorage(cluster, catalogTable);
            if (CatalogServiceFactory.getCatalogService().tableExists(configuration, catalogStorage.getCatalogUrl(), catalogStorage.getDatabase(), catalogStorage.getTable())) {
                return catalogStorage;
            }
            return null;
        } catch (URISyntaxException e) {
            throw new FalconException(e);
        }
    }

    @Override // org.apache.falcon.workflow.WorkflowExecutionListener
    public void onFailure(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }

    @Override // org.apache.falcon.workflow.WorkflowExecutionListener
    public void onStart(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }

    @Override // org.apache.falcon.workflow.WorkflowExecutionListener
    public void onSuspend(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }

    @Override // org.apache.falcon.workflow.WorkflowExecutionListener
    public void onWait(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }
}
