/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.BaseRestorableTaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
import org.joda.time.Interval;

public class ThreadingTaskRunner
extends BaseRestorableTaskRunner<ThreadingTaskRunnerWorkItem>
implements TaskLogStreamer,
QuerySegmentWalker {
    private static final EmittingLogger LOGGER = new EmittingLogger(ThreadingTaskRunner.class);
    private final TaskToolboxFactory toolboxFactory;
    private final TaskLogPusher taskLogPusher;
    private final DruidNode node;
    private final AppenderatorsManager appenderatorsManager;
    private final MultipleFileTaskReportFileWriter taskReportFileWriter;
    private final ListeningExecutorService taskExecutor;
    private final ListeningExecutorService controlThreadExecutor;
    private final WorkerConfig workerConfig;
    private volatile boolean stopping = false;

    @Inject
    public ThreadingTaskRunner(TaskToolboxFactory toolboxFactory, TaskConfig taskConfig, WorkerConfig workerConfig, TaskLogPusher taskLogPusher, ObjectMapper jsonMapper, AppenderatorsManager appenderatorsManager, TaskReportFileWriter taskReportFileWriter, @Self DruidNode node) {
        super(jsonMapper, taskConfig);
        this.toolboxFactory = toolboxFactory;
        this.taskLogPusher = taskLogPusher;
        this.node = node;
        this.appenderatorsManager = appenderatorsManager;
        this.taskReportFileWriter = (MultipleFileTaskReportFileWriter)taskReportFileWriter;
        this.workerConfig = workerConfig;
        this.taskExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)workerConfig.getCapacity(), (String)"threading-task-runner-executor-%d"));
        this.controlThreadExecutor = MoreExecutors.listeningDecorator((ExecutorService)Execs.multiThreaded((int)workerConfig.getCapacity(), (String)"threading-task-runner-control-%d"));
    }

    public Optional<InputStream> streamTaskLog(String taskid, long offset) {
        return Optional.absent();
    }

    @Override
    public void start() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ListenableFuture<TaskStatus> run(final Task task) {
        ConcurrentHashMap concurrentHashMap = this.tasks;
        synchronized (concurrentHashMap) {
            this.tasks.computeIfAbsent(task.getId(), k -> new ThreadingTaskRunnerWorkItem(task, this.taskExecutor.submit((Callable)new Callable<TaskStatus>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Loose catch block
                 */
                @Override
                public TaskStatus call() {
                    TaskStatus taskStatus;
                    ThreadingTaskRunnerWorkItem taskWorkItem;
                    String attemptUUID = UUID.randomUUID().toString();
                    File taskDir = ThreadingTaskRunner.this.taskConfig.getTaskDir(task.getId());
                    File attemptDir = new File(taskDir, attemptUUID);
                    TaskLocation taskLocation = TaskLocation.create((String)ThreadingTaskRunner.this.node.getHost(), (int)ThreadingTaskRunner.this.node.getPlaintextPort(), (int)ThreadingTaskRunner.this.node.getTlsPort());
                    FileUtils.mkdirp((File)attemptDir);
                    File taskFile = new File(taskDir, "task.json");
                    File reportsFile = new File(attemptDir, "report.json");
                    ThreadingTaskRunner.this.taskReportFileWriter.add(task.getId(), reportsFile);
                    ConcurrentHashMap concurrentHashMap = ThreadingTaskRunner.this.tasks;
                    synchronized (concurrentHashMap) {
                        taskWorkItem = (ThreadingTaskRunnerWorkItem)ThreadingTaskRunner.this.tasks.get(task.getId());
                        if (taskWorkItem == null) {
                            LOGGER.makeAlert("TaskInfo disappeared", new Object[0]).addData("task", (Object)task.getId()).emit();
                            throw new ISE("TaskInfo disappeared for task[%s]!", new Object[]{task.getId()});
                        }
                        if (taskWorkItem.shutdown) {
                            throw new IllegalStateException("Task has been shut down!");
                        }
                    }
                    if (!taskFile.exists()) {
                        ThreadingTaskRunner.this.jsonMapper.writeValue(taskFile, (Object)task);
                    }
                    String priorThreadName = Thread.currentThread().getName();
                    Thread.currentThread().setName(StringUtils.format((String)"[%s]-%s", (Object[])new Object[]{task.getId(), priorThreadName}));
                    TaskToolbox toolbox = ThreadingTaskRunner.this.toolboxFactory.build(task);
                    TaskRunnerUtils.notifyLocationChanged(ThreadingTaskRunner.this.listeners, task.getId(), taskLocation);
                    TaskRunnerUtils.notifyStatusChanged(ThreadingTaskRunner.this.listeners, task.getId(), TaskStatus.running((String)task.getId()));
                    taskWorkItem.setState(RunnerTaskState.RUNNING);
                    try {
                        taskStatus = task.run(toolbox);
                    }
                    catch (Throwable t) {
                        LOGGER.error(t, "Exception caught while running the task.", new Object[0]);
                        taskStatus = TaskStatus.failure((String)task.getId(), (String)"Failed with an exception. See indexer logs for more details.");
                    }
                    finally {
                        taskWorkItem.setState(RunnerTaskState.NONE);
                        Thread.currentThread().setName(priorThreadName);
                        if (reportsFile.exists()) {
                            ThreadingTaskRunner.this.taskLogPusher.pushTaskReports(task.getId(), reportsFile);
                        }
                    }
                    TaskRunnerUtils.notifyStatusChanged(ThreadingTaskRunner.this.listeners, task.getId(), taskStatus);
                    TaskStatus taskStatus2 = taskStatus;
                    try {
                        ThreadingTaskRunner.this.taskReportFileWriter.delete(task.getId());
                        ThreadingTaskRunner.this.appenderatorsManager.removeAppenderatorsForTask(task.getId(), task.getDataSource());
                        ConcurrentHashMap concurrentHashMap2 = ThreadingTaskRunner.this.tasks;
                        synchronized (concurrentHashMap2) {
                            ThreadingTaskRunner.this.tasks.remove(task.getId());
                            if (!ThreadingTaskRunner.this.stopping) {
                                ThreadingTaskRunner.this.saveRunningTasks();
                            }
                        }
                        try {
                            if (!ThreadingTaskRunner.this.stopping && taskDir.exists()) {
                                FileUtils.deleteDirectory((File)taskDir);
                                LOGGER.info("Removed task directory: %s", new Object[]{taskDir});
                            }
                        }
                        catch (Exception e) {
                            LOGGER.makeAlert((Throwable)e, "Failed to delete task directory", new Object[0]).addData("taskDir", (Object)taskDir.toString()).addData("task", (Object)task.getId()).emit();
                        }
                    }
                    catch (Exception e) {
                        LOGGER.error((Throwable)e, "Suppressing exception caught while cleaning up task", new Object[0]);
                    }
                    return taskStatus2;
                    catch (Throwable t) {
                        try {
                            LOGGER.error(t, "Exception caught during execution", new Object[0]);
                            throw new RuntimeException(t);
                        }
                        catch (Throwable throwable) {
                            try {
                                ThreadingTaskRunner.this.taskReportFileWriter.delete(task.getId());
                                ThreadingTaskRunner.this.appenderatorsManager.removeAppenderatorsForTask(task.getId(), task.getDataSource());
                                ConcurrentHashMap concurrentHashMap3 = ThreadingTaskRunner.this.tasks;
                                synchronized (concurrentHashMap3) {
                                    ThreadingTaskRunner.this.tasks.remove(task.getId());
                                    if (!ThreadingTaskRunner.this.stopping) {
                                        ThreadingTaskRunner.this.saveRunningTasks();
                                    }
                                }
                                try {
                                    if (!ThreadingTaskRunner.this.stopping && taskDir.exists()) {
                                        FileUtils.deleteDirectory((File)taskDir);
                                        LOGGER.info("Removed task directory: %s", new Object[]{taskDir});
                                    }
                                }
                                catch (Exception e) {
                                    LOGGER.makeAlert((Throwable)e, "Failed to delete task directory", new Object[0]).addData("taskDir", (Object)taskDir.toString()).addData("task", (Object)task.getId()).emit();
                                }
                            }
                            catch (Exception e) {
                                LOGGER.error((Throwable)e, "Suppressing exception caught while cleaning up task", new Object[0]);
                            }
                            throw throwable;
                        }
                    }
                }
            })));
            this.saveRunningTasks();
            return ((ThreadingTaskRunnerWorkItem)this.tasks.get(task.getId())).getResult();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown(String taskid, String reason) {
        LOGGER.info("Shutdown [%s] because: [%s]", new Object[]{taskid, reason});
        ConcurrentHashMap concurrentHashMap = this.tasks;
        synchronized (concurrentHashMap) {
            ThreadingTaskRunnerWorkItem taskInfo = (ThreadingTaskRunnerWorkItem)this.tasks.get(taskid);
            if (taskInfo == null) {
                LOGGER.info("Ignoring request to cancel unknown task: %s", new Object[]{taskid});
                return;
            }
            if (taskInfo.shutdown) {
                LOGGER.info("Task [%s] is already shutting down, ignoring duplicate shutdown request with reason [%s]", new Object[]{taskid, reason});
            } else {
                taskInfo.shutdown = true;
                this.scheduleTaskShutdown(taskInfo);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ListenableFuture scheduleTaskShutdown(final ThreadingTaskRunnerWorkItem taskInfo) {
        ConcurrentHashMap concurrentHashMap = this.tasks;
        synchronized (concurrentHashMap) {
            if (taskInfo.shutdownFuture != null) {
                return taskInfo.shutdownFuture;
            }
            taskInfo.shutdownFuture = this.controlThreadExecutor.submit((Callable)new Callable<Void>(){

                @Override
                public Void call() {
                    block3: {
                        LOGGER.info("Stopping thread for task: %s", new Object[]{taskInfo.getTaskId()});
                        taskInfo.getTask().stopGracefully(ThreadingTaskRunner.this.taskConfig);
                        try {
                            taskInfo.getResult().get(ThreadingTaskRunner.this.taskConfig.getGracefulShutdownTimeout().toStandardDuration().getMillis(), TimeUnit.MILLISECONDS);
                        }
                        catch (TimeoutException e) {
                            taskInfo.getResult().cancel(true);
                        }
                        catch (Exception e) {
                            LOGGER.info((Throwable)e, "Encountered exception while waiting for task [%s] shutdown", new Object[]{taskInfo.getTaskId()});
                            if (taskInfo.shutdownFuture == null) break block3;
                            taskInfo.shutdownFuture.cancel(true);
                        }
                    }
                    return null;
                }
            });
            return taskInfo.shutdownFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        block14: {
            this.stopping = true;
            this.taskExecutor.shutdown();
            ArrayList<ListenableFuture> shutdownFutures = new ArrayList<ListenableFuture>();
            ConcurrentHashMap concurrentHashMap = this.tasks;
            synchronized (concurrentHashMap) {
                for (ThreadingTaskRunnerWorkItem taskWorkItem : this.tasks.values()) {
                    shutdownFutures.add(this.scheduleTaskShutdown(taskWorkItem));
                }
            }
            this.controlThreadExecutor.shutdown();
            try {
                ListenableFuture shutdownFuture = Futures.successfulAsList(shutdownFutures);
                shutdownFuture.get();
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e, "Encountered exception when stopping all tasks.", new Object[0]);
            }
            DateTime start = DateTimes.nowUtc();
            long gracefulShutdownMillis = this.taskConfig.getGracefulShutdownTimeout().toStandardDuration().getMillis();
            LOGGER.info("Waiting up to %,dms for shutdown.", new Object[]{gracefulShutdownMillis});
            if (gracefulShutdownMillis > 0L) {
                try {
                    ImmutableSet stillRunning;
                    boolean terminated = this.controlThreadExecutor.awaitTermination(gracefulShutdownMillis, TimeUnit.MILLISECONDS);
                    long elapsed = System.currentTimeMillis() - start.getMillis();
                    if (terminated) {
                        LOGGER.info("Finished stopping in %,dms.", new Object[]{elapsed});
                        break block14;
                    }
                    ConcurrentHashMap concurrentHashMap2 = this.tasks;
                    synchronized (concurrentHashMap2) {
                        stillRunning = ImmutableSet.copyOf((Collection)this.tasks.keySet());
                    }
                    LOGGER.makeAlert("Failed to stop task threads", new Object[0]).addData("stillRunning", (Object)stillRunning).addData("elapsed", (Object)elapsed).emit();
                    LOGGER.warn("Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]", new Object[]{elapsed, Joiner.on((String)"; ").join((Iterable)stillRunning)});
                }
                catch (InterruptedException e) {
                    LOGGER.warn((Throwable)e, "Interrupted while waiting for executor to finish.", new Object[0]);
                    Thread.currentThread().interrupt();
                }
            } else {
                LOGGER.warn("Ran out of time, not waiting for executor to finish!", new Object[0]);
            }
        }
        this.appenderatorsManager.shutdown();
    }

    @Override
    public Collection<TaskRunnerWorkItem> getRunningTasks() {
        return this.getTasks(RunnerTaskState.RUNNING);
    }

    @Override
    public Collection<TaskRunnerWorkItem> getPendingTasks() {
        return this.getTasks(RunnerTaskState.PENDING);
    }

    @Override
    @Nullable
    public RunnerTaskState getRunnerTaskState(String taskId) {
        ThreadingTaskRunnerWorkItem workItem = (ThreadingTaskRunnerWorkItem)this.tasks.get(taskId);
        return workItem == null ? null : workItem.getState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Collection<TaskRunnerWorkItem> getTasks(RunnerTaskState state) {
        ConcurrentHashMap concurrentHashMap = this.tasks;
        synchronized (concurrentHashMap) {
            ArrayList<TaskRunnerWorkItem> ret = new ArrayList<TaskRunnerWorkItem>();
            for (ThreadingTaskRunnerWorkItem taskWorkItem : this.tasks.values()) {
                if (taskWorkItem.getState() != state) continue;
                ret.add(taskWorkItem);
            }
            return ret;
        }
    }

    @Override
    public Optional<ScalingStats> getScalingStats() {
        return Optional.absent();
    }

    @Override
    public Map<String, Long> getTotalTaskSlotCount() {
        return ImmutableMap.of((Object)this.workerConfig.getCategory(), (Object)this.workerConfig.getCapacity());
    }

    public long getTotalTaskSlotCountLong() {
        return this.workerConfig.getCapacity();
    }

    @Override
    public Map<String, Long> getIdleTaskSlotCount() {
        return ImmutableMap.of((Object)this.workerConfig.getCategory(), (Object)Math.max(this.getTotalTaskSlotCountLong() - this.getUsedTaskSlotCountLong(), 0L));
    }

    @Override
    public Map<String, Long> getUsedTaskSlotCount() {
        return ImmutableMap.of((Object)this.workerConfig.getCategory(), (Object)this.getRunningTasks().size());
    }

    public long getUsedTaskSlotCountLong() {
        return this.getRunningTasks().size();
    }

    @Override
    public Map<String, Long> getLazyTaskSlotCount() {
        return ImmutableMap.of((Object)this.workerConfig.getCategory(), (Object)0L);
    }

    @Override
    public Map<String, Long> getBlacklistedTaskSlotCount() {
        return ImmutableMap.of((Object)this.workerConfig.getCategory(), (Object)0L);
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        return this.appenderatorsManager.getQueryRunnerForIntervals(query, intervals);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        return this.appenderatorsManager.getQueryRunnerForSegments(query, specs);
    }

    protected static class ThreadingTaskRunnerWorkItem
    extends TaskRunnerWorkItem {
        private final Task task;
        private volatile boolean shutdown = false;
        private volatile ListenableFuture shutdownFuture;
        private volatile RunnerTaskState state;

        private ThreadingTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture) {
            super(task.getId(), statusFuture);
            this.task = task;
            this.state = RunnerTaskState.PENDING;
        }

        public Task getTask() {
            return this.task;
        }

        @Override
        public TaskLocation getLocation() {
            return null;
        }

        @Override
        public String getTaskType() {
            return this.task.getType();
        }

        @Override
        public String getDataSource() {
            return this.task.getDataSource();
        }

        public RunnerTaskState getState() {
            return this.state;
        }

        public void setState(RunnerTaskState state) {
            this.state = state;
        }
    }
}

