/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.assertion.excecutor.AssertExecutor;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;

public class AssertSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void> {
    private final SeaTunnelRowType seaTunnelRowType;
    private final List<AssertFieldRule> assertFieldRules;
    private final List<AssertFieldRule.AssertRule> assertRowRules;
    private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
    private static final LongAccumulator LONG_ACCUMULATOR = new LongAccumulator(Long::sum, 0L);

    public AssertSinkWriter(SeaTunnelRowType seaTunnelRowType, List<AssertFieldRule> assertFieldRules, List<AssertFieldRule.AssertRule> assertRowRules) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.assertFieldRules = assertFieldRules;
        this.assertRowRules = assertRowRules;
    }

    public void write(SeaTunnelRow element) {
        LONG_ACCUMULATOR.accumulate(1L);
        if (Objects.nonNull(this.assertFieldRules)) {
            ASSERT_EXECUTOR.fail(element, this.seaTunnelRowType, this.assertFieldRules).ifPresent(failRule -> {
                throw new AssertConnectorException((SeaTunnelErrorCode)AssertConnectorErrorCode.RULE_VALIDATION_FAILED, "row :" + element + " fail rule: " + failRule);
            });
        }
    }

    public void close() {
        if (Objects.nonNull(this.assertRowRules)) {
            this.assertRowRules.stream().filter(assertRule -> {
                switch (assertRule.getRuleType()) {
                    case MAX_ROW: {
                        return !((double)LONG_ACCUMULATOR.longValue() <= assertRule.getRuleValue());
                    }
                    case MIN_ROW: {
                        return !((double)LONG_ACCUMULATOR.longValue() >= assertRule.getRuleValue());
                    }
                }
                return false;
            }).findFirst().ifPresent(failRule -> {
                throw new AssertConnectorException((SeaTunnelErrorCode)AssertConnectorErrorCode.RULE_VALIDATION_FAILED, "row num :" + LONG_ACCUMULATOR.longValue() + " fail rule: " + failRule);
            });
        }
    }
}

