/*
 * Decompiled with CFR 0.152.
 */
package com.clickhouse.data.stream;

import com.clickhouse.data.ClickHouseByteBuffer;
import com.clickhouse.data.ClickHouseChecker;
import com.clickhouse.data.ClickHouseDataConfig;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseDataUpdater;
import com.clickhouse.data.ClickHouseInputStream;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHousePipedOutputStream;
import com.clickhouse.data.ClickHouseUtils;
import com.clickhouse.data.ClickHouseWriter;
import com.clickhouse.data.stream.AdaptiveQueue;
import com.clickhouse.data.stream.CapacityPolicy;
import com.clickhouse.data.stream.NonBlockingInputStream;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

public class NonBlockingPipedOutputStream
extends ClickHousePipedOutputStream {
    protected final AdaptiveQueue<ByteBuffer> queue;
    protected final int bufferSize;
    protected final CompletableFuture<Void> future;
    protected final long timeout;
    protected ByteBuffer buffer;

    private void updateBuffer(boolean allocateNewBuffer) throws IOException {
        ByteBuffer b = this.buffer;
        if (b.hasRemaining()) {
            ((Buffer)b).limit(b.position());
        }
        ((Buffer)b).rewind();
        this.updateBuffer(b);
        if (allocateNewBuffer) {
            this.buffer = ByteBuffer.allocate(this.bufferSize);
        }
    }

    private void updateBuffer(ByteBuffer b) throws IOException {
        long startTime;
        AdaptiveQueue<ByteBuffer> q = this.queue;
        long t = this.timeout;
        long l = startTime = t < 1L ? 0L : System.currentTimeMillis();
        while (!q.offer(b)) {
            if (t <= 0L || System.currentTimeMillis() - startTime < t) continue;
            throw new IOException(ClickHouseUtils.format("Write timed out after %d ms", t));
        }
    }

    public NonBlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, CapacityPolicy policy) {
        this(bufferSize, queueLength, timeout, policy, (Runnable)null);
    }

    public NonBlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, CapacityPolicy policy, Runnable postCloseAction) {
        super(postCloseAction);
        this.queue = AdaptiveQueue.create(policy, new ByteBuffer[0]);
        this.bufferSize = ClickHouseDataConfig.getBufferSize(bufferSize);
        this.future = ClickHouseUtils.NULL_FUTURE;
        this.timeout = timeout;
        this.buffer = ByteBuffer.allocate(this.bufferSize);
    }

    public NonBlockingPipedOutputStream(int bufferSize, int queueLength, long timeout, CapacityPolicy policy, ClickHouseWriter writer) {
        super(null);
        this.queue = AdaptiveQueue.create(policy, new ByteBuffer[0]);
        this.bufferSize = ClickHouseDataConfig.getBufferSize(bufferSize);
        this.timeout = timeout;
        this.buffer = ByteBuffer.allocate(this.bufferSize);
        this.future = NonBlockingPipedOutputStream.writeAsync(ClickHouseChecker.nonNull(writer, "Writer"), this);
    }

    @Override
    public ClickHouseInputStream getInputStream(Runnable postCloseAction) {
        return new NonBlockingInputStream(this.queue, this.timeout, () -> NonBlockingPipedOutputStream.handleWriteResult(this.future, this.timeout, postCloseAction));
    }

    @Override
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        try {
            if (this.buffer.position() > 0) {
                this.updateBuffer(false);
            }
        }
        finally {
            this.queue.add(ClickHouseByteBuffer.EMPTY_BUFFER);
            this.closed = true;
            ClickHouseDataStreamFactory.handleCustomAction(this.postCloseAction);
        }
    }

    @Override
    public void flush() throws IOException {
        this.ensureOpen();
        if (this.buffer.position() > 0) {
            this.updateBuffer(true);
        }
    }

    @Override
    public ClickHouseOutputStream transferBytes(byte[] bytes, int offset, int length) throws IOException {
        if (bytes == null) {
            throw new NullPointerException();
        }
        if (offset < 0 || length < 0 || length > bytes.length - offset) {
            throw new IndexOutOfBoundsException();
        }
        if (length == 0) {
            return this;
        }
        this.ensureOpen();
        if (this.buffer.position() > 0) {
            this.updateBuffer(true);
        }
        byte[] copy = new byte[length];
        System.arraycopy(bytes, offset, copy, 0, length);
        this.updateBuffer(ByteBuffer.wrap(copy));
        return this;
    }

    @Override
    public ClickHouseOutputStream writeBuffer(ClickHouseByteBuffer buffer) throws IOException {
        if (buffer == null || buffer.isEmpty()) {
            return this;
        }
        return this.writeBytes(buffer.array(), buffer.position(), buffer.length());
    }

    @Override
    public ClickHouseOutputStream writeByte(byte b) throws IOException {
        this.ensureOpen();
        if (!this.buffer.put(b).hasRemaining()) {
            this.updateBuffer(true);
        }
        return this;
    }

    @Override
    public ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) throws IOException {
        if (bytes == null) {
            throw new NullPointerException();
        }
        if (offset < 0 || length < 0 || length > bytes.length - offset) {
            throw new IndexOutOfBoundsException();
        }
        if (length == 0) {
            return this;
        }
        this.ensureOpen();
        ByteBuffer b = this.buffer;
        while (length > 0) {
            int remain = b.remaining();
            if (length < remain) {
                b.put(bytes, offset, length);
                length = 0;
                continue;
            }
            if (b.position() == 0) {
                byte[] copy = new byte[length];
                System.arraycopy(bytes, offset, copy, 0, length);
                this.updateBuffer(ByteBuffer.wrap(copy));
                length = 0;
                continue;
            }
            b.put(bytes, offset, remain);
            offset += remain;
            length -= remain;
            this.updateBuffer(true);
            b = this.buffer;
        }
        return this;
    }

    @Override
    public ClickHouseOutputStream writeCustom(ClickHouseDataUpdater writer) throws IOException {
        this.ensureOpen();
        int position = 0;
        int written = 0;
        do {
            byte[] bytes;
            position = this.buffer.position();
            int limit = this.buffer.limit();
            if (this.buffer.hasArray()) {
                bytes = this.buffer.array();
            } else {
                bytes = new byte[limit - position];
                this.buffer.get(bytes);
            }
            written = writer.update(bytes, position, limit);
            if (written >= 0) continue;
            ((Buffer)this.buffer).position(limit);
            this.updateBuffer(true);
        } while (written < 0);
        ((Buffer)this.buffer).position(position + written);
        return this;
    }
}

