/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;

import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.conversions.Bson;

public class RowDataDocumentSerializer
implements DocumentSerializer<SeaTunnelRow> {
    private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
    private final boolean isUpsertEnable;
    private final Function<BsonDocument, BsonDocument> filterConditions;
    private final Map<RowKind, WriteModelSupplier> writeModelSuppliers;

    public RowDataDocumentSerializer(RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter, MongodbWriterOptions options, Function<BsonDocument, BsonDocument> filterConditions) {
        this.rowDataToBsonConverter = rowDataToBsonConverter;
        this.isUpsertEnable = options.isUpsertEnable();
        this.filterConditions = filterConditions;
        this.writeModelSuppliers = this.createWriteModelSuppliers();
    }

    @Override
    public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
        WriteModelSupplier writeModelSupplier = this.writeModelSuppliers.get(row.getRowKind());
        if (writeModelSupplier == null) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Unsupported message kind: " + row.getRowKind());
        }
        return writeModelSupplier.get(row);
    }

    private Map<RowKind, WriteModelSupplier> createWriteModelSuppliers() {
        HashMap<RowKind, WriteModelSupplier> writeModelSuppliers = new HashMap<RowKind, WriteModelSupplier>();
        WriteModelSupplier upsertSupplier = row -> {
            BsonDocument bsonDocument = this.rowDataToBsonConverter.convert(row);
            Bson filter = RowDataDocumentSerializer.generateFilter(this.filterConditions.apply(bsonDocument));
            bsonDocument.remove("_id");
            BsonDocument update = new BsonDocument("$set", bsonDocument);
            return new UpdateOneModel(filter, update, new UpdateOptions().upsert(true));
        };
        WriteModelSupplier updateSupplier = row -> {
            BsonDocument bsonDocument = this.rowDataToBsonConverter.convert(row);
            Bson filter = RowDataDocumentSerializer.generateFilter(this.filterConditions.apply(bsonDocument));
            bsonDocument.remove("_id");
            BsonDocument update = new BsonDocument("$set", bsonDocument);
            return new UpdateOneModel(filter, update);
        };
        WriteModelSupplier insertSupplier = row -> {
            BsonDocument bsonDocument = this.rowDataToBsonConverter.convert(row);
            return new InsertOneModel<BsonDocument>(bsonDocument);
        };
        WriteModelSupplier deleteSupplier = row -> {
            BsonDocument bsonDocument = this.rowDataToBsonConverter.convert(row);
            Bson filter = RowDataDocumentSerializer.generateFilter(this.filterConditions.apply(bsonDocument));
            return new DeleteOneModel(filter);
        };
        writeModelSuppliers.put(RowKind.INSERT, this.isUpsertEnable ? upsertSupplier : insertSupplier);
        writeModelSuppliers.put(RowKind.UPDATE_AFTER, this.isUpsertEnable ? upsertSupplier : updateSupplier);
        writeModelSuppliers.put(RowKind.DELETE, deleteSupplier);
        return writeModelSuppliers;
    }

    public static Bson generateFilter(BsonDocument filterConditions) {
        List<Bson> filters = filterConditions.entrySet().stream().map(entry -> Filters.eq((String)entry.getKey(), (BsonValue)entry.getValue())).collect(Collectors.toList());
        return Filters.and(filters);
    }

    private static interface WriteModelSupplier {
        public WriteModel<BsonDocument> get(SeaTunnelRow var1);
    }
}

