/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log.block;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlockVersion;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;

public class HoodieAvroDataBlock
extends HoodieDataBlock {
    private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal();

    public HoodieAvroDataBlock(FSDataInputStream inputStream, Option<byte[]> content, boolean readBlockLazily, HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLocation, Option<Schema> readerSchema, Map<HoodieLogBlock.HeaderMetadataType, String> header, Map<HoodieLogBlock.HeaderMetadataType, String> footer, String keyField, InternalSchema internalSchema) {
        super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false, internalSchema);
    }

    public HoodieAvroDataBlock(FSDataInputStream inputStream, Option<byte[]> content, boolean readBlockLazily, HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLocation, Option<Schema> readerSchema, Map<HoodieLogBlock.HeaderMetadataType, String> header, Map<HoodieLogBlock.HeaderMetadataType, String> footer, String keyField) {
        super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);
    }

    public HoodieAvroDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HoodieLogBlock.HeaderMetadataType, String> header, @Nonnull String keyField) {
        super(records, header, new HashMap<HoodieLogBlock.HeaderMetadataType, String>(), keyField);
    }

    @Override
    public HoodieLogBlock.HoodieLogBlockType getBlockType() {
        return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
    }

    @Override
    protected byte[] serializeRecords(List<IndexedRecord> records) throws IOException {
        Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.SCHEMA));
        GenericDatumWriter writer = new GenericDatumWriter(schema);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream output = new DataOutputStream(baos);
        output.writeInt(HoodieLogBlock.version);
        output.writeInt(records.size());
        for (IndexedRecord s : records) {
            ByteArrayOutputStream temp = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder((OutputStream)temp, this.encoderCache.get());
            this.encoderCache.set(encoder);
            try {
                writer.write((Object)s, (Encoder)encoder);
                encoder.flush();
                int size = temp.toByteArray().length;
                output.writeInt(size);
                output.write(temp.toByteArray());
            }
            catch (IOException e) {
                throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
            }
        }
        this.encoderCache.remove();
        output.close();
        return baos.toByteArray();
    }

    @Override
    protected ClosableIterator<IndexedRecord> deserializeRecords(byte[] content) throws IOException {
        ValidationUtils.checkState(this.readerSchema != null, "Reader's schema has to be non-null");
        return RecordIterator.getInstance(this, content, this.internalSchema);
    }

    @Deprecated
    public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
        super(records, Collections.singletonMap(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()), new HashMap<HoodieLogBlock.HeaderMetadataType, String>(), "_hoodie_record_key");
    }

    public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
        return HoodieAvroDataBlock.getBlock(content, readerSchema, InternalSchema.getEmptyInternalSchema());
    }

    @Deprecated
    public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema, InternalSchema internalSchema) throws IOException {
        SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));
        int schemaLength = dis.readInt();
        byte[] compressedSchema = new byte[schemaLength];
        dis.readFully(compressedSchema, 0, schemaLength);
        Schema writerSchema = new Schema.Parser().parse(HoodieAvroDataBlock.decompress(compressedSchema));
        if (readerSchema == null) {
            readerSchema = writerSchema;
        }
        if (!internalSchema.isEmptySchema()) {
            readerSchema = writerSchema;
        }
        GenericDatumReader reader = new GenericDatumReader(writerSchema, readerSchema);
        int totalRecords = dis.readInt();
        ArrayList<IndexedRecord> records = new ArrayList<IndexedRecord>(totalRecords);
        for (int i = 0; i < totalRecords; ++i) {
            int recordLength = dis.readInt();
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(content, dis.getNumberOfBytesRead().intValue(), recordLength, null);
            IndexedRecord record = (IndexedRecord)reader.read(null, (Decoder)decoder);
            records.add(record);
            dis.skipBytes(recordLength);
        }
        dis.close();
        return new HoodieAvroDataBlock(records, readerSchema);
    }

    private static byte[] compress(String text) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            DeflaterOutputStream out = new DeflaterOutputStream(baos);
            ((OutputStream)out).write(text.getBytes(StandardCharsets.UTF_8));
            ((OutputStream)out).close();
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException while compressing text " + text, e);
        }
        return baos.toByteArray();
    }

    private static String decompress(byte[] bytes) {
        InflaterInputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes));
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            int len;
            byte[] buffer = new byte[8192];
            while ((len = ((InputStream)in).read(buffer)) > 0) {
                baos.write(buffer, 0, len);
            }
            return new String(baos.toByteArray(), StandardCharsets.UTF_8);
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException while decompressing text", e);
        }
    }

    @Deprecated
    public byte[] getBytes(Schema schema) throws IOException {
        GenericDatumWriter writer = new GenericDatumWriter(schema);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream output = new DataOutputStream(baos);
        byte[] schemaContent = HoodieAvroDataBlock.compress(schema.toString());
        output.writeInt(schemaContent.length);
        output.write(schemaContent);
        ArrayList records = new ArrayList();
        try (ClosableIterator<IndexedRecord> recordItr = this.getRecordIterator();){
            recordItr.forEachRemaining(records::add);
        }
        output.writeInt(records.size());
        Iterator itr = records.iterator();
        while (itr.hasNext()) {
            IndexedRecord s = (IndexedRecord)itr.next();
            ByteArrayOutputStream temp = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder((OutputStream)temp, null);
            try {
                writer.write((Object)s, (Encoder)encoder);
                encoder.flush();
                int size = temp.toByteArray().length;
                output.writeInt(size);
                output.write(temp.toByteArray());
                itr.remove();
            }
            catch (IOException e) {
                throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
            }
        }
        output.close();
        return baos.toByteArray();
    }

    private static class RecordIterator
    implements ClosableIterator<IndexedRecord> {
        private byte[] content;
        private final SizeAwareDataInputStream dis;
        private final GenericDatumReader<IndexedRecord> reader;
        private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal();
        private int totalRecords = 0;
        private int readRecords = 0;

        private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content, InternalSchema internalSchema) throws IOException {
            this.content = content;
            this.dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(this.content)));
            int version = this.dis.readInt();
            HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version);
            Schema finalReadSchema = readerSchema;
            if (!internalSchema.isEmptySchema()) {
                finalReadSchema = writerSchema;
            }
            this.reader = new GenericDatumReader(writerSchema, finalReadSchema);
            if (logBlockVersion.hasRecordCount()) {
                this.totalRecords = this.dis.readInt();
            }
        }

        public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, byte[] content, InternalSchema internalSchema) throws IOException {
            Schema writerSchema = new Schema.Parser().parse(dataBlock.getLogBlockHeader().get((Object)HoodieLogBlock.HeaderMetadataType.SCHEMA));
            return new RecordIterator(dataBlock.readerSchema, writerSchema, content, internalSchema);
        }

        @Override
        public void close() {
            try {
                this.dis.close();
                this.decoderCache.remove();
                this.content = null;
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        @Override
        public boolean hasNext() {
            return this.readRecords < this.totalRecords;
        }

        @Override
        public IndexedRecord next() {
            try {
                int recordLength = this.dis.readInt();
                BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(this.content, this.dis.getNumberOfBytesRead().intValue(), recordLength, this.decoderCache.get());
                this.decoderCache.set(decoder);
                IndexedRecord record = (IndexedRecord)this.reader.read(null, (Decoder)decoder);
                this.dis.skipBytes(recordLength);
                ++this.readRecords;
                return record;
            }
            catch (IOException e) {
                throw new HoodieIOException("Unable to convert bytes to record.", e);
            }
        }
    }
}

