/*
 * Decompiled with CFR 0.152.
 */
package com.databend.jdbc;

import com.databend.client.DatabendClient;
import com.databend.client.QueryResults;
import com.databend.client.QueryRowField;
import com.databend.jdbc.AbstractDatabendResultSet;
import com.databend.jdbc.QueryLiveness;
import com.github.zafarkhaja.semver.Version;
import com.google.shaded.common.annotations.VisibleForTesting;
import com.google.shaded.common.base.Throwables;
import com.google.shaded.common.collect.AbstractIterator;
import com.google.shaded.common.collect.Streams;
import com.google.shaded.common.util.concurrent.ThreadFactoryBuilder;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;

public class DatabendResultSet
extends AbstractDatabendResultSet {
    private final Statement statement;
    private final DatabendClient client;
    @GuardedBy(value="this")
    private boolean closed;
    @GuardedBy(value="this")
    private boolean closeStatementOnClose;
    private final QueryLiveness liveness;

    private DatabendResultSet(Statement statement, DatabendClient client, List<QueryRowField> schema, long maxRows, QueryLiveness liveness) throws SQLException {
        super(Optional.of(Objects.requireNonNull(statement, "statement is null")), schema, new AsyncIterator<List<Object>>(DatabendResultSet.flatten(new ResultsPageIterator(client, liveness), maxRows), client), client.getResults().getQueryId());
        this.statement = statement;
        this.client = client;
        this.liveness = liveness;
    }

    static DatabendResultSet create(Statement statement, DatabendClient client, long maxRows) throws SQLException {
        Objects.requireNonNull(client, "client is null");
        List<QueryRowField> s = client.getResults().getSchema();
        AtomicLong lastRequestTime = new AtomicLong(System.currentTimeMillis());
        QueryResults r = client.getResults();
        Version serverVersion = null;
        if (client.getServerVersion() != null) {
            try {
                serverVersion = Version.valueOf(client.getServerVersion());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        QueryLiveness liveness = new QueryLiveness(r.getQueryId(), client.getNodeID(), lastRequestTime, r.getResultTimeoutSecs(), serverVersion);
        return new DatabendResultSet(statement, client, s, maxRows, liveness);
    }

    private static <T> Iterator<T> flatten(Iterator<Iterable<T>> iterator2, long maxRows) {
        Stream stream = Streams.stream(iterator2).flatMap(Streams::stream);
        if (maxRows > 0L) {
            stream = stream.limit(maxRows);
        }
        return stream.iterator();
    }

    public QueryLiveness getLiveness() {
        if (this.closed) {
            return null;
        }
        return this.liveness;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setCloseStatementOnClose() throws SQLException {
        boolean alreadyClosed;
        DatabendResultSet databendResultSet = this;
        synchronized (databendResultSet) {
            alreadyClosed = this.closed;
            if (!alreadyClosed) {
                this.closeStatementOnClose = true;
            }
        }
        if (alreadyClosed) {
            this.statement.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws SQLException {
        boolean closeStatement;
        DatabendResultSet databendResultSet = this;
        synchronized (databendResultSet) {
            if (this.closed) {
                return;
            }
            this.liveness.stopped = true;
            this.closed = true;
            closeStatement = this.closeStatementOnClose;
        }
        ((AsyncIterator)this.results).cancel();
        this.client.close();
        if (closeStatement) {
            this.statement.close();
        }
    }

    @Override
    public boolean isClosed() throws SQLException {
        return this.closed;
    }

    static class AsyncIterator<T>
    extends AbstractIterator<T> {
        private static final int MAX_QUEUED_ROWS = 50000;
        private static final ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Databend JDBC worker-%s").setDaemon(true).build());
        private final DatabendClient client;
        private final BlockingQueue<T> rowQueue;
        private final Semaphore semaphore = new Semaphore(0);
        private final Future<?> future;
        private volatile boolean cancelled;
        private volatile boolean finished;

        public AsyncIterator(Iterator<T> dataIterator, DatabendClient client) {
            this(dataIterator, client, Optional.empty());
        }

        @VisibleForTesting
        AsyncIterator(Iterator<T> dataIterator, DatabendClient client, Optional<BlockingQueue<T>> queue) {
            Objects.requireNonNull(dataIterator, "dataIterator is null");
            this.client = client;
            this.rowQueue = queue.orElseGet(() -> new ArrayBlockingQueue(50000));
            this.cancelled = false;
            this.finished = false;
            this.future = executorService.submit(() -> {
                try {
                    while (dataIterator.hasNext()) {
                        this.rowQueue.put(dataIterator.next());
                        this.semaphore.release();
                    }
                }
                catch (InterruptedException e) {
                    client.close();
                    this.rowQueue.clear();
                    throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
                }
                finally {
                    this.semaphore.release();
                    this.finished = true;
                }
            });
        }

        public void cancel() {
            this.cancelled = true;
            this.future.cancel(true);
            this.client.close();
            this.rowQueue.clear();
        }

        @VisibleForTesting
        Future<?> getFuture() {
            return this.future;
        }

        @VisibleForTesting
        boolean isBackgroundThreadFinished() {
            return this.finished;
        }

        @Override
        protected T computeNext() {
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                this.handleInterrupt(e);
            }
            if (this.rowQueue.isEmpty()) {
                try {
                    this.future.get();
                }
                catch (InterruptedException e) {
                    this.handleInterrupt(e);
                }
                catch (ExecutionException e) {
                    Throwables.throwIfUnchecked(e.getCause());
                    throw new RuntimeException(e.getCause());
                }
                return this.endOfData();
            }
            return (T)this.rowQueue.poll();
        }

        private void handleInterrupt(InterruptedException e) {
            this.cancel();
            Thread.currentThread().interrupt();
            throw new RuntimeException(new SQLException("Interrupted", e));
        }
    }

    private static class ResultsPageIterator
    extends AbstractIterator<Iterable<List<Object>>> {
        private final DatabendClient client;
        private QueryLiveness liveness;

        private ResultsPageIterator(DatabendClient client, QueryLiveness liveness) {
            this.client = client;
            this.liveness = liveness;
        }

        @Override
        protected Iterable<List<Object>> computeNext() {
            QueryResults results;
            while (this.client.hasNext()) {
                results = this.client.getResults();
                List<List<Object>> rows = results.getData();
                try {
                    this.client.advance();
                    this.liveness.lastRequestTime.set(System.currentTimeMillis());
                }
                catch (RuntimeException e) {
                    throw new RuntimeException(e);
                }
                if (rows == null) continue;
                return rows;
            }
            this.liveness.stopped = true;
            results = this.client.getResults();
            if (results.getError() != null) {
                throw new RuntimeException(AbstractDatabendResultSet.resultsException(results, this.client.getQuery()));
            }
            return (Iterable)this.endOfData();
        }
    }
}

