/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.server.namenode.EncryptionFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirEncryptionZoneOp;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class ReencryptionHandler
implements Runnable {
    public static final Logger LOG = LoggerFactory.getLogger(ReencryptionHandler.class);
    private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000;
    private final EncryptionZoneManager ezManager;
    private final FSDirectory dir;
    private final long interval;
    private final int reencryptBatchSize;
    private double throttleLimitHandlerRatio;
    private final int reencryptThreadPoolSize;
    private final StopWatch throttleTimerAll = new StopWatch();
    private final StopWatch throttleTimerLocked = new StopWatch();
    private ExecutorCompletionService<ReencryptionUpdater.ReencryptionTask> batchService;
    private BlockingQueue<Runnable> taskQueue;
    private final Map<Long, ReencryptionUpdater.ZoneSubmissionTracker> submissions = new HashMap<Long, ReencryptionUpdater.ZoneSubmissionTracker>();
    private ReencryptionBatch currentBatch;
    private final ReencryptionPendingInodeIdCollector traverser;
    private final ReencryptionUpdater reencryptionUpdater;
    private ExecutorService updaterExecutor;
    private volatile boolean shouldPauseForTesting = false;
    private volatile int pauseAfterNthSubmission = 0;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stopThreads() {
        assert (this.dir.hasWriteLock());
        ReencryptionHandler reencryptionHandler = this;
        synchronized (reencryptionHandler) {
            for (ReencryptionUpdater.ZoneSubmissionTracker zst : this.submissions.values()) {
                zst.cancelAllTasks();
            }
        }
        if (this.updaterExecutor != null) {
            this.updaterExecutor.shutdownNow();
        }
    }

    void startUpdaterThread() {
        this.updaterExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("reencryptionUpdaterThread #%d").build());
        this.updaterExecutor.execute(this.reencryptionUpdater);
    }

    @VisibleForTesting
    synchronized void pauseForTesting() {
        this.shouldPauseForTesting = true;
        LOG.info("Pausing re-encrypt handler for testing.");
        this.notify();
    }

    @VisibleForTesting
    synchronized void resumeForTesting() {
        this.shouldPauseForTesting = false;
        LOG.info("Resuming re-encrypt handler for testing.");
        this.notify();
    }

    @VisibleForTesting
    void pauseForTestingAfterNthSubmission(int count) {
        assert (this.pauseAfterNthSubmission == 0);
        this.pauseAfterNthSubmission = count;
    }

    @VisibleForTesting
    void pauseUpdaterForTesting() {
        this.reencryptionUpdater.pauseForTesting();
    }

    @VisibleForTesting
    void resumeUpdaterForTesting() {
        this.reencryptionUpdater.resumeForTesting();
    }

    @VisibleForTesting
    void pauseForTestingAfterNthCheckpoint(long zoneId, int count) {
        this.reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
    }

    ReencryptionHandler(EncryptionZoneManager ezMgr, Configuration conf) {
        this.ezManager = ezMgr;
        Preconditions.checkNotNull((Object)this.ezManager.getProvider(), (Object)"No provider set, cannot re-encrypt");
        this.dir = ezMgr.getFSDirectory();
        this.interval = conf.getTimeDuration("dfs.namenode.reencrypt.sleep.interval", "1m", TimeUnit.MILLISECONDS);
        Preconditions.checkArgument((this.interval > 0L ? 1 : 0) != 0, (Object)"dfs.namenode.reencrypt.sleep.interval is not positive.");
        this.reencryptBatchSize = conf.getInt("dfs.namenode.reencrypt.batch.size", 1000);
        Preconditions.checkArgument((this.reencryptBatchSize > 0 ? 1 : 0) != 0, (Object)"dfs.namenode.reencrypt.batch.size is not positive.");
        if (this.reencryptBatchSize > 2000) {
            LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer to be full and trigger a logSync within the writelock, greatly impacting namenode throughput.", (Object)this.reencryptBatchSize);
        }
        this.throttleLimitHandlerRatio = conf.getDouble("dfs.namenode.reencrypt.throttle.limit.handler.ratio", 1.0);
        LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption", (Object)this.throttleLimitHandlerRatio);
        Preconditions.checkArgument((this.throttleLimitHandlerRatio > 0.0 ? 1 : 0) != 0, (Object)"dfs.namenode.reencrypt.throttle.limit.handler.ratio is not positive.");
        this.reencryptThreadPoolSize = conf.getInt("dfs.namenode.reencrypt.edek.threads", 10);
        this.taskQueue = new LinkedBlockingQueue<Runnable>();
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(this.reencryptThreadPoolSize, this.reencryptThreadPoolSize, 60L, TimeUnit.SECONDS, this.taskQueue, (ThreadFactory)new Daemon.DaemonFactory(){
            private final AtomicInteger ind = new AtomicInteger(0);

            public Thread newThread(Runnable r) {
                Thread t = super.newThread(r);
                t.setName("reencryption edek Thread-" + this.ind.getAndIncrement());
                return t;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy(){

            @Override
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
                LOG.info("Execution rejected, executing in current thread");
                super.rejectedExecution(runnable, e);
            }
        });
        threadPool.allowCoreThreadTimeOut(true);
        this.batchService = new ExecutorCompletionService(threadPool);
        this.reencryptionUpdater = new ReencryptionUpdater(this.dir, this.batchService, this, conf);
        this.currentBatch = new ReencryptionBatch(this.reencryptBatchSize);
        this.traverser = new ReencryptionPendingInodeIdCollector(this.dir, this, conf);
    }

    ReencryptionStatus getReencryptionStatus() {
        return this.ezManager.getReencryptionStatus();
    }

    void cancelZone(long zoneId, String zoneName) throws IOException {
        assert (this.dir.hasWriteLock());
        ZoneReencryptionStatus zs = this.getReencryptionStatus().getZoneStatus(Long.valueOf(zoneId));
        if (zs == null || zs.getState() == ZoneReencryptionStatus.State.Completed) {
            throw new IOException("Zone " + zoneName + " is not under re-encryption");
        }
        zs.cancel();
        this.removeZoneTrackerStopTasks(zoneId);
    }

    void removeZone(long zoneId) {
        assert (this.dir.hasWriteLock());
        LOG.info("Removing zone {} from re-encryption.", (Object)zoneId);
        this.removeZoneTrackerStopTasks(zoneId);
        this.getReencryptionStatus().removeZone(Long.valueOf(zoneId));
    }

    private synchronized void removeZoneTrackerStopTasks(long zoneId) {
        ReencryptionUpdater.ZoneSubmissionTracker zst = this.submissions.get(zoneId);
        if (zst != null) {
            zst.cancelAllTasks();
            this.submissions.remove(zoneId);
        }
    }

    ReencryptionUpdater.ZoneSubmissionTracker getTracker(long zoneId) {
        assert (this.dir.hasReadLock());
        return this.unprotectedGetTracker(zoneId);
    }

    synchronized ReencryptionUpdater.ZoneSubmissionTracker unprotectedGetTracker(long zoneId) {
        return this.submissions.get(zoneId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addDummyTracker(long zoneId, ReencryptionUpdater.ZoneSubmissionTracker zst) {
        assert (this.dir.hasReadLock());
        if (zst == null) {
            zst = new ReencryptionUpdater.ZoneSubmissionTracker();
        }
        zst.setSubmissionDone();
        Future<ReencryptionUpdater.ReencryptionTask> future = this.batchService.submit(new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
        zst.addTask(future);
        ReencryptionHandler reencryptionHandler = this;
        synchronized (reencryptionHandler) {
            this.submissions.put(zoneId, zst);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Starting up re-encrypt thread with interval={} millisecond.", (Object)this.interval);
        while (true) {
            Long zoneId;
            try {
                ReencryptionHandler reencryptionHandler = this;
                synchronized (reencryptionHandler) {
                    this.wait(this.interval);
                }
                this.traverser.checkPauseForTesting();
            }
            catch (InterruptedException ie) {
                LOG.info("Re-encrypt handler interrupted. Exiting");
                Thread.currentThread().interrupt();
                return;
            }
            this.dir.getFSNamesystem().readLock();
            try {
                zoneId = this.getReencryptionStatus().getNextUnprocessedZone();
                if (zoneId == null) continue;
                LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}", (Object)zoneId, (Object)this.getReencryptionStatus());
                this.getReencryptionStatus().markZoneStarted(zoneId);
                this.resetSubmissionTracker(zoneId);
            }
            finally {
                this.dir.getFSNamesystem().readUnlock();
                continue;
            }
            try {
                this.reencryptEncryptionZone(zoneId);
                continue;
            }
            catch (SafeModeException | RetriableException re) {
                LOG.info("Re-encryption caught exception, will retry", re);
                this.getReencryptionStatus().markZoneForRetry(zoneId);
                continue;
            }
            catch (IOException ioe) {
                LOG.warn("IOException caught when re-encrypting zone {}", (Object)zoneId, (Object)ioe);
                continue;
            }
            catch (InterruptedException ie) {
                LOG.info("Re-encrypt handler interrupted. Exiting.");
                Thread.currentThread().interrupt();
                return;
            }
            catch (Throwable t) {
                LOG.error("Re-encrypt handler thread exiting. Exception caught when re-encrypting zone {}.", (Object)zoneId, (Object)t);
                return;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void reencryptEncryptionZone(long zoneId) throws IOException, InterruptedException {
        this.throttleTimerAll.reset().start();
        this.throttleTimerLocked.reset();
        this.traverser.readLock();
        try {
            INode zoneNode = this.dir.getInode(zoneId);
            if (zoneNode == null) {
                LOG.info("Directory with id {} removed during re-encrypt, skipping", (Object)zoneId);
                return;
            }
            if (!zoneNode.isDirectory()) {
                LOG.info("Cannot re-encrypt directory with id {} because it's not a directory.", (Object)zoneId);
                return;
            }
            ZoneReencryptionStatus zs = this.getReencryptionStatus().getZoneStatus(Long.valueOf(zoneId));
            assert (zs != null);
            LOG.info("Re-encrypting zone {}(id={})", (Object)zoneNode.getFullPathName(), (Object)zoneId);
            if (zs.getLastCheckpointFile() == null) {
                this.traverser.traverseDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME, new ZoneTraverseInfo(zs.getEzKeyVersionName()));
            } else {
                this.restoreFromLastProcessedFile(zoneId, zs);
            }
            this.traverser.submitCurrentBatch(zoneId);
            LOG.info("Submission completed of zone {} for re-encryption.", (Object)zoneId);
            this.reencryptionUpdater.markZoneSubmissionDone(zoneId);
        }
        finally {
            this.traverser.readUnlock();
        }
    }

    private synchronized void resetSubmissionTracker(long zoneId) {
        ReencryptionUpdater.ZoneSubmissionTracker zst = this.submissions.get(zoneId);
        if (zst == null) {
            zst = new ReencryptionUpdater.ZoneSubmissionTracker();
            this.submissions.put(zoneId, zst);
        } else {
            zst.reset();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<XAttr> completeReencryption(INode zoneNode) throws IOException {
        assert (this.dir.hasWriteLock());
        assert (this.dir.getFSNamesystem().hasWriteLock());
        Long zoneId = zoneNode.getId();
        ZoneReencryptionStatus zs = this.getReencryptionStatus().getZoneStatus(zoneId);
        assert (zs != null);
        LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files, failures encountered: {}.", new Object[]{zoneNode.getFullPathName(), zs.getFilesReencrypted(), zs.getNumReencryptionFailures()});
        ReencryptionHandler reencryptionHandler = this;
        synchronized (reencryptionHandler) {
            this.submissions.remove(zoneId);
        }
        return FSDirEncryptionZoneOp.updateReencryptionFinish(this.dir, INodesInPath.fromINode(zoneNode), zs);
    }

    private void restoreFromLastProcessedFile(long zoneId, ZoneReencryptionStatus zs) throws IOException, InterruptedException {
        INodesInPath lpfIIP = this.dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
        INodeDirectory parent = lpfIIP.getLastINode().getParent();
        byte[] startAfter = lpfIIP.getLastINode().getLocalNameBytes();
        this.traverser.traverseDir(parent, zoneId, startAfter, new ZoneTraverseInfo(zs.getEzKeyVersionName()));
    }

    synchronized void notifyNewSubmission() {
        LOG.debug("Notifying handler for new re-encryption command.");
        this.notify();
    }

    public ReencryptionPendingInodeIdCollector getTraverser() {
        return this.traverser;
    }

    private class ZoneTraverseInfo
    extends FSTreeTraverser.TraverseInfo {
        private String ezKeyVerName;

        ZoneTraverseInfo(String ezKeyVerName) {
            this.ezKeyVerName = ezKeyVerName;
        }

        public String getEzKeyVerName() {
            return this.ezKeyVerName;
        }
    }

    class ReencryptionPendingInodeIdCollector
    extends FSTreeTraverser {
        private final ReencryptionHandler reencryptionHandler;

        ReencryptionPendingInodeIdCollector(FSDirectory dir, ReencryptionHandler rHandler, Configuration conf) {
            super(dir, conf);
            this.reencryptionHandler = rHandler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void checkPauseForTesting() throws InterruptedException {
            assert (!ReencryptionHandler.this.dir.hasReadLock());
            assert (!ReencryptionHandler.this.dir.getFSNamesystem().hasReadLock());
            while (ReencryptionHandler.this.shouldPauseForTesting) {
                LOG.info("Sleeping in the re-encrypt handler for unit test.");
                ReencryptionHandler reencryptionHandler = this.reencryptionHandler;
                synchronized (reencryptionHandler) {
                    if (ReencryptionHandler.this.shouldPauseForTesting) {
                        this.reencryptionHandler.wait(30000L);
                    }
                }
                LOG.info("Continuing re-encrypt handler after pausing.");
            }
        }

        @Override
        public boolean processFileInode(INode inode, FSTreeTraverser.TraverseInfo traverseInfo) throws IOException, InterruptedException {
            assert (ReencryptionHandler.this.dir.hasReadLock());
            if (LOG.isTraceEnabled()) {
                LOG.trace("Processing {} for re-encryption", (Object)inode.getFullPathName());
            }
            if (!inode.isFile()) {
                return false;
            }
            FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(ReencryptionHandler.this.dir, INodesInPath.fromINode(inode));
            if (feInfo == null) {
                LOG.warn("File {} skipped re-encryption because it is not encrypted! This is very likely a bug.", (Object)inode.getId());
                return false;
            }
            if (traverseInfo instanceof ZoneTraverseInfo && ((ZoneTraverseInfo)traverseInfo).getEzKeyVerName().equals(feInfo.getEzKeyVersionName())) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("File {} skipped re-encryption because edek's key version name is not changed.", (Object)inode.getFullPathName());
                }
                return false;
            }
            ReencryptionHandler.this.currentBatch.add(inode.asFile());
            return true;
        }

        @Override
        protected void checkINodeReady(long zoneId) throws IOException {
            ZoneReencryptionStatus zs = ReencryptionHandler.this.getReencryptionStatus().getZoneStatus(Long.valueOf(zoneId));
            if (zs == null) {
                throw new IOException("Zone " + zoneId + " status cannot be found.");
            }
            if (zs.isCanceled()) {
                throw new IOException("Re-encryption is canceled for zone " + zoneId);
            }
            ReencryptionHandler.this.dir.getFSNamesystem().checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
            ReencryptionHandler.this.dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void submitCurrentBatch(long zoneId) throws IOException, InterruptedException {
            if (ReencryptionHandler.this.currentBatch.isEmpty()) {
                return;
            }
            ReencryptionHandler reencryptionHandler = ReencryptionHandler.this;
            synchronized (reencryptionHandler) {
                ReencryptionUpdater.ZoneSubmissionTracker zst = (ReencryptionUpdater.ZoneSubmissionTracker)ReencryptionHandler.this.submissions.get(zoneId);
                if (zst == null) {
                    zst = new ReencryptionUpdater.ZoneSubmissionTracker();
                    ReencryptionHandler.this.submissions.put(zoneId, zst);
                }
                Future<ReencryptionUpdater.ReencryptionTask> future = ReencryptionHandler.this.batchService.submit(new EDEKReencryptCallable(zoneId, ReencryptionHandler.this.currentBatch, this.reencryptionHandler));
                zst.addTask(future);
            }
            LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.", new Object[]{ReencryptionHandler.this.currentBatch.getFirstFilePath(), ReencryptionHandler.this.currentBatch.size(), zoneId});
            ReencryptionHandler.this.currentBatch = new ReencryptionBatch(ReencryptionHandler.this.reencryptBatchSize);
            if (ReencryptionHandler.this.pauseAfterNthSubmission > 0 && --ReencryptionHandler.this.pauseAfterNthSubmission == 0) {
                ReencryptionHandler.this.shouldPauseForTesting = true;
            }
        }

        @Override
        @VisibleForTesting
        protected void throttle() throws InterruptedException {
            assert (!ReencryptionHandler.this.dir.hasReadLock());
            assert (!ReencryptionHandler.this.dir.getFSNamesystem().hasReadLock());
            int numCores = Runtime.getRuntime().availableProcessors();
            if (ReencryptionHandler.this.taskQueue.size() >= numCores) {
                LOG.debug("Re-encryption handler throttling because queue size {} islarger than number of cores {}", (Object)ReencryptionHandler.this.taskQueue.size(), (Object)numCores);
                while (ReencryptionHandler.this.taskQueue.size() >= numCores) {
                    Thread.sleep(100L);
                }
            }
            int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
            int numTasks = this.numTasksSubmitted();
            if (numTasks >= maxTasksPiled) {
                LOG.debug("Re-encryption handler throttling because total tasks pending re-encryption updater is {}", (Object)numTasks);
                while (numTasks >= maxTasksPiled) {
                    Thread.sleep(500L);
                    numTasks = this.numTasksSubmitted();
                }
            }
            if (ReencryptionHandler.this.throttleLimitHandlerRatio >= 1.0) {
                return;
            }
            long expect = (long)((double)ReencryptionHandler.this.throttleTimerAll.now(TimeUnit.MILLISECONDS) * ReencryptionHandler.this.throttleLimitHandlerRatio);
            long actual = ReencryptionHandler.this.throttleTimerLocked.now(TimeUnit.MILLISECONDS);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Re-encryption handler throttling expect: {}, actual: {}, throttleTimerAll:{}", new Object[]{expect, actual, ReencryptionHandler.this.throttleTimerAll.now(TimeUnit.MILLISECONDS)});
            }
            if (expect - actual < 0L) {
                long sleepMs = (long)((double)actual / ReencryptionHandler.this.throttleLimitHandlerRatio) - ReencryptionHandler.this.throttleTimerAll.now(TimeUnit.MILLISECONDS);
                LOG.debug("Throttling re-encryption, sleeping for {} ms", (Object)sleepMs);
                Thread.sleep(sleepMs);
            }
            ReencryptionHandler.this.throttleTimerAll.reset().start();
            ReencryptionHandler.this.throttleTimerLocked.reset();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private int numTasksSubmitted() {
            int ret = 0;
            ReencryptionHandler reencryptionHandler = ReencryptionHandler.this;
            synchronized (reencryptionHandler) {
                for (ReencryptionUpdater.ZoneSubmissionTracker zst : ReencryptionHandler.this.submissions.values()) {
                    ret += zst.getTasks().size();
                }
            }
            return ret;
        }

        @Override
        public boolean shouldSubmitCurrentBatch() {
            return ReencryptionHandler.this.currentBatch.size() >= ReencryptionHandler.this.reencryptBatchSize;
        }

        @Override
        public boolean canTraverseDir(INode inode) throws IOException {
            if (ReencryptionHandler.this.ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
                LOG.info("{}({}) is a nested EZ, skipping for re-encryption", (Object)inode.getFullPathName(), (Object)inode.getId());
                return false;
            }
            return true;
        }

        @Override
        protected void readLock() {
            super.readLock();
            ReencryptionHandler.this.throttleTimerLocked.start();
        }

        @Override
        protected void readUnlock() {
            super.readUnlock();
            ReencryptionHandler.this.throttleTimerLocked.stop();
        }
    }

    private static class EDEKReencryptCallable
    implements Callable<ReencryptionUpdater.ReencryptionTask> {
        private final long zoneNodeId;
        private final ReencryptionBatch batch;
        private final ReencryptionHandler handler;

        EDEKReencryptCallable(long zoneId, ReencryptionBatch currentBatch, ReencryptionHandler rh) {
            this.zoneNodeId = zoneId;
            this.batch = currentBatch;
            this.handler = rh;
        }

        @Override
        public ReencryptionUpdater.ReencryptionTask call() {
            LOG.info("Processing batched re-encryption for zone {}, batch size {}, start:{}", new Object[]{this.zoneNodeId, this.batch.size(), this.batch.getFirstFilePath()});
            if (this.batch.isEmpty()) {
                return new ReencryptionUpdater.ReencryptionTask(this.zoneNodeId, 0, this.batch);
            }
            StopWatch kmsSW = new StopWatch().start();
            int numFailures = 0;
            String result = "Completed";
            if (!this.reencryptEdeks()) {
                numFailures += this.batch.size();
                result = "Failed to";
            }
            LOG.info("{} re-encrypting one batch of {} edeks from KMS, time consumed: {}, start: {}.", new Object[]{result, this.batch.size(), kmsSW.stop(), this.batch.getFirstFilePath()});
            return new ReencryptionUpdater.ReencryptionTask(this.zoneNodeId, numFailures, this.batch);
        }

        private boolean reencryptEdeks() {
            ArrayList<KeyProviderCryptoExtension.EncryptedKeyVersion> edeks = new ArrayList<KeyProviderCryptoExtension.EncryptedKeyVersion>(this.batch.size());
            for (ReencryptionUpdater.FileEdekInfo entry : this.batch.getBatch()) {
                edeks.add(entry.getExistingEdek());
            }
            try {
                this.handler.ezManager.getProvider().reencryptEncryptedKeys(edeks);
                EncryptionFaultInjector.getInstance().reencryptEncryptedKeys();
            }
            catch (IOException | GeneralSecurityException ex) {
                LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}", new Object[]{this.batch.size(), this.batch.getFirstFilePath(), ex});
                return false;
            }
            int i = 0;
            for (ReencryptionUpdater.FileEdekInfo entry : this.batch.getBatch()) {
                assert (i < edeks.size());
                entry.setEdek((KeyProviderCryptoExtension.EncryptedKeyVersion)edeks.get(i++));
            }
            return true;
        }
    }

    final class ReencryptionBatch {
        private String firstFilePath;
        private final List<ReencryptionUpdater.FileEdekInfo> batch;

        ReencryptionBatch() {
            this(this$0.reencryptBatchSize);
        }

        ReencryptionBatch(int initialCapacity) {
            this.batch = new ArrayList<ReencryptionUpdater.FileEdekInfo>(initialCapacity);
        }

        void add(INodeFile inode) throws IOException {
            assert (ReencryptionHandler.this.dir.hasReadLock());
            Preconditions.checkNotNull((Object)inode, (Object)"INodeFile is null");
            if (this.batch.isEmpty()) {
                this.firstFilePath = inode.getFullPathName();
            }
            this.batch.add(new ReencryptionUpdater.FileEdekInfo(ReencryptionHandler.this.dir, inode));
        }

        String getFirstFilePath() {
            return this.firstFilePath;
        }

        boolean isEmpty() {
            return this.batch.isEmpty();
        }

        int size() {
            return this.batch.size();
        }

        void clear() {
            this.batch.clear();
        }

        List<ReencryptionUpdater.FileEdekInfo> getBatch() {
            return this.batch;
        }
    }
}

