/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.File;
import java.io.IOException;
import java.util.function.BiFunction;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingResultPartition;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.io.network.partition.PipelinedApproximateSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartition;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ProcessorArchitecture;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultPartitionFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
    private final ResultPartitionManager partitionManager;
    private final FileChannelManager channelManager;
    private final BufferPoolFactory bufferPoolFactory;
    private final BoundedBlockingSubpartitionType blockingSubpartitionType;
    private final int networkBuffersPerChannel;
    private final int floatingNetworkBuffersPerGate;
    private final int networkBufferSize;
    private final boolean blockingShuffleCompressionEnabled;
    private final String compressionCodec;
    private final int maxBuffersPerChannel;
    private final int sortShuffleMinBuffers;
    private final int sortShuffleMinParallelism;
    private final boolean sslEnabled;

    public ResultPartitionFactory(ResultPartitionManager partitionManager, FileChannelManager channelManager, BufferPoolFactory bufferPoolFactory, BoundedBlockingSubpartitionType blockingSubpartitionType, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate, int networkBufferSize, boolean blockingShuffleCompressionEnabled, String compressionCodec, int maxBuffersPerChannel, int sortShuffleMinBuffers, int sortShuffleMinParallelism, boolean sslEnabled) {
        this.partitionManager = partitionManager;
        this.channelManager = channelManager;
        this.networkBuffersPerChannel = networkBuffersPerChannel;
        this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
        this.bufferPoolFactory = bufferPoolFactory;
        this.blockingSubpartitionType = blockingSubpartitionType;
        this.networkBufferSize = networkBufferSize;
        this.blockingShuffleCompressionEnabled = blockingShuffleCompressionEnabled;
        this.compressionCodec = compressionCodec;
        this.maxBuffersPerChannel = maxBuffersPerChannel;
        this.sortShuffleMinBuffers = sortShuffleMinBuffers;
        this.sortShuffleMinParallelism = sortShuffleMinParallelism;
        this.sslEnabled = sslEnabled;
    }

    public ResultPartition create(String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionDeploymentDescriptor desc) {
        return this.create(taskNameWithSubtaskAndId, partitionIndex, desc.getShuffleDescriptor().getResultPartitionID(), desc.getPartitionType(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), this.createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()));
    }

    @VisibleForTesting
    public ResultPartition create(String taskNameWithSubtaskAndId, int partitionIndex, ResultPartitionID id, ResultPartitionType type, int numberOfSubpartitions, int maxParallelism, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        ResultPartition partition;
        BufferCompressor bufferCompressor = null;
        if (type.isBlocking() && this.blockingShuffleCompressionEnabled) {
            bufferCompressor = new BufferCompressor(this.networkBufferSize, this.compressionCodec);
        }
        ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];
        if (type == ResultPartitionType.PIPELINED || type == ResultPartitionType.PIPELINED_BOUNDED || type == ResultPartitionType.PIPELINED_APPROXIMATE) {
            PipelinedResultPartition pipelinedPartition = new PipelinedResultPartition(taskNameWithSubtaskAndId, partitionIndex, id, type, subpartitions, maxParallelism, this.partitionManager, bufferCompressor, bufferPoolFactory);
            BiFunction<Integer, PipelinedResultPartition, PipelinedSubpartition> factory = type == ResultPartitionType.PIPELINED_APPROXIMATE ? PipelinedApproximateSubpartition::new : PipelinedSubpartition::new;
            for (int i = 0; i < subpartitions.length; ++i) {
                subpartitions[i] = factory.apply(i, pipelinedPartition);
            }
            partition = pipelinedPartition;
        } else if (type == ResultPartitionType.BLOCKING || type == ResultPartitionType.BLOCKING_PERSISTENT) {
            if (numberOfSubpartitions >= this.sortShuffleMinParallelism) {
                partition = new SortMergeResultPartition(taskNameWithSubtaskAndId, partitionIndex, id, type, subpartitions.length, maxParallelism, this.networkBufferSize, this.partitionManager, this.channelManager.createChannel().getPath(), bufferCompressor, bufferPoolFactory);
            } else {
                BoundedBlockingResultPartition blockingPartition = new BoundedBlockingResultPartition(taskNameWithSubtaskAndId, partitionIndex, id, type, subpartitions, maxParallelism, this.partitionManager, bufferCompressor, bufferPoolFactory);
                ResultPartitionFactory.initializeBoundedBlockingPartitions(subpartitions, blockingPartition, this.blockingSubpartitionType, this.networkBufferSize, this.channelManager, this.sslEnabled);
                partition = blockingPartition;
            }
        } else {
            throw new IllegalArgumentException("Unrecognized ResultPartitionType: " + (Object)((Object)type));
        }
        LOG.debug("{}: Initialized {}", (Object)taskNameWithSubtaskAndId, (Object)this);
        return partition;
    }

    private static void initializeBoundedBlockingPartitions(ResultSubpartition[] subpartitions, BoundedBlockingResultPartition parent, BoundedBlockingSubpartitionType blockingSubpartitionType, int networkBufferSize, FileChannelManager channelManager, boolean sslEnabled) {
        int i = 0;
        try {
            for (i = 0; i < subpartitions.length; ++i) {
                File spillFile = channelManager.createChannel().getPathFile();
                subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize, sslEnabled);
            }
        }
        catch (IOException e) {
            ResultPartitionFactory.releasePartitionsQuietly(subpartitions, i);
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private static void releasePartitionsQuietly(ResultSubpartition[] partitions, int until) {
        for (int i = 0; i < until; ++i) {
            ResultSubpartition subpartition = partitions[i];
            ExceptionUtils.suppressExceptions(subpartition::release);
        }
    }

    @VisibleForTesting
    SupplierWithException<BufferPool, IOException> createBufferPoolFactory(int numberOfSubpartitions, ResultPartitionType type) {
        return () -> {
            int maxNumberOfMemorySegments = type.isBounded() ? numberOfSubpartitions * this.networkBuffersPerChannel + this.floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
            int numRequiredBuffers = !type.isPipelined() && numberOfSubpartitions >= this.sortShuffleMinParallelism ? this.sortShuffleMinBuffers : numberOfSubpartitions + 1;
            return this.bufferPoolFactory.createBufferPool(numRequiredBuffers, maxNumberOfMemorySegments, numberOfSubpartitions, this.maxBuffersPerChannel);
        };
    }

    static BoundedBlockingSubpartitionType getBoundedBlockingType() {
        switch (ProcessorArchitecture.getMemoryAddressSize()) {
            case _64_BIT: {
                return BoundedBlockingSubpartitionType.FILE_MMAP;
            }
            case _32_BIT: {
                return BoundedBlockingSubpartitionType.FILE;
            }
        }
        LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle.");
        return BoundedBlockingSubpartitionType.FILE;
    }
}

