/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redis.source;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import redis.clients.jedis.Jedis;

public class RedisSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private final RedisParameters redisParameters;
    private final SingleSplitReaderContext context;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private Jedis jedis;

    public RedisSourceReader(RedisParameters redisParameters, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
        this.redisParameters = redisParameters;
        this.context = context;
        this.deserializationSchema = deserializationSchema;
    }

    public void open() throws Exception {
        this.jedis = this.redisParameters.buildJedis();
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.jedis)) {
            this.jedis.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Set<String> keys = this.jedis.keys(this.redisParameters.getKeysPattern());
        RedisDataType redisDataType = this.redisParameters.getRedisDataType();
        for (String key : keys) {
            List<String> values = redisDataType.get(this.jedis, key);
            for (String value : values) {
                if (this.deserializationSchema == null) {
                    output.collect((Object)new SeaTunnelRow(new Object[]{value}));
                    continue;
                }
                if (this.redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV && redisDataType == RedisDataType.HASH) {
                    Map recordsMap = JsonUtils.toMap((String)value);
                    for (Map.Entry entry : recordsMap.entrySet()) {
                        String k = (String)entry.getKey();
                        String v = (String)entry.getValue();
                        Map valuesMap = JsonUtils.toMap((String)v);
                        SeaTunnelDataType seaTunnelRowType = this.deserializationSchema.getProducedType();
                        valuesMap.put(((SeaTunnelRowType)seaTunnelRowType).getFieldName(0), k);
                        this.deserializationSchema.deserialize(JsonUtils.toJsonString((Object)valuesMap).getBytes(), output);
                    }
                    continue;
                }
                this.deserializationSchema.deserialize(value.getBytes(), output);
            }
        }
        this.context.signalNoMoreElement();
    }
}

