/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cassandra.sink.CassandraSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

public class CassandraSink
extends AbstractSimpleSink<SeaTunnelRow, Void> {
    private final CassandraParameters cassandraParameters;
    private final CatalogTable catalogTable;
    private final ColumnDefinitions tableSchema;

    public CassandraSink(CassandraParameters cassandraParameters, CatalogTable catalogTable, ReadonlyConfig pluginConfig) {
        this.cassandraParameters = cassandraParameters;
        this.catalogTable = catalogTable;
        try (CqlSession session = (CqlSession)CassandraClient.getCqlSessionBuilder(cassandraParameters.getHost(), cassandraParameters.getKeyspace(), cassandraParameters.getUsername(), cassandraParameters.getPassword(), cassandraParameters.getDatacenter()).build();){
            List<String> fields = cassandraParameters.getFields();
            this.tableSchema = CassandraClient.getTableSchema(session, (String)pluginConfig.get(CassandraSinkOptions.TABLE));
            if (fields == null || fields.isEmpty()) {
                ArrayList<String> newFields = new ArrayList<String>();
                for (int i = 0; i < this.tableSchema.size(); ++i) {
                    newFields.add(this.tableSchema.get(i).getName().asInternal());
                }
                this.cassandraParameters.setFields(newFields);
            } else {
                for (String field : fields) {
                    if (this.tableSchema.contains(field)) continue;
                    throw new CassandraConnectorException((SeaTunnelErrorCode)CassandraConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + (String)pluginConfig.get(CassandraSinkOptions.TABLE));
                }
            }
        }
        catch (Exception e) {
            throw new CassandraConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SINK, ExceptionUtils.getMessage((Throwable)e)));
        }
    }

    public String getPluginName() {
        return "Cassandra";
    }

    @Override
    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
        return new CassandraSinkWriter(this.cassandraParameters, this.catalogTable.getSeaTunnelRowType(), this.tableSchema);
    }

    public Optional<CatalogTable> getWriteCatalogTable() {
        return Optional.of(this.catalogTable);
    }
}

