/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms.outbox;

import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.time.Timestamp;
import io.debezium.transforms.outbox.EventRouter;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.fest.assertions.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class EventRouterTest {
    @Rule
    public ExpectedException exceptionRule = ExpectedException.none();

    @Test
    public void canSkipTombstone() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"123123", null, null);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNull();
    }

    @Test
    public void canSkipDeletion() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("id", (Schema)SchemaBuilder.string()).build();
        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("id", (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Struct payload = envelope.delete((Object)before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1383"})
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingSchemaName() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", valueSchema, (Object)value);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isSameAs((Object)eventRecord);
    }

    @Test
    public void shouldFailWhenTheSchemaLooksValidButDoesNotHaveTheCorrectFields() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat.Envelope").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", valueSchema, (Object)value);
        this.exceptionRule.expect(DataException.class);
        this.exceptionRule.expectMessage("op is not a valid field name");
        router.apply((ConnectRecord)eventRecord);
    }

    @Test
    @FixFor(value={"DBZ-1383"})
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingSchemaNameSuffix() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", valueSchema, (Object)value);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isSameAs((Object)eventRecord);
    }

    @Test
    @FixFor(value={"DBZ-1383"})
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingValueSchema() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", null, (Object)value);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isSameAs((Object)eventRecord);
    }

    @Test
    public void canSkipUpdates() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("id", (Schema)SchemaBuilder.string()).build();
        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("id", (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Struct payload = envelope.update((Object)before, before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNull();
    }

    @Test(expected=IllegalStateException.class)
    public void canFailOnUpdates() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), EventRouterConfigDefinition.InvalidOperationBehavior.FATAL.getValue());
        router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("id", (Schema)SchemaBuilder.string()).build();
        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("id", (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Struct payload = envelope.update((Object)before, before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", envelope.schema(), (Object)payload);
        router.apply((ConnectRecord)eventRecord);
    }

    @Test
    public void canExtractTableFields() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.value()).isEqualTo((Object)"{}");
        Assertions.assertThat((Integer)eventRouted.valueSchema().version()).isNull();
    }

    @Test
    public void canSetDefaultMessageKey() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)"10711fa5");
    }

    @Test
    public void canSetMessageKey() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "type");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)"UserCreated");
    }

    @Test(expected=DataException.class)
    public void failsOnInvalidSetMessageKey() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "fakefield");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        router.apply((ConnectRecord)eventRecord);
    }

    @Test
    public void canSetTimestampFromDebeziumEnvelopeByDefault() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord userEventRecord = this.createEventRecord();
        SourceRecord userEventRouted = (SourceRecord)router.apply((ConnectRecord)userEventRecord);
        Struct userEvent = Requirements.requireStruct((Object)userEventRecord.value(), (String)"Test timestamp");
        Long expectedTimestamp = userEvent.getInt64("ts_ms");
        Assertions.assertThat((Long)userEventRecord.timestamp()).isNull();
        Assertions.assertThat((Long)userEventRouted.timestamp()).isEqualTo((Object)expectedTimestamp);
    }

    @Test
    public void canSetTimestampByUserDefinedConfiguration() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "event_timestamp");
        router.configure(config);
        Long expectedTimestamp = 14222264625338L;
        HashMap<String, Schema> extraFields = new HashMap<String, Schema>();
        extraFields.put("event_timestamp", Timestamp.schema());
        HashMap<String, Object> extraValues = new HashMap<String, Object>();
        extraValues.put("event_timestamp", expectedTimestamp);
        SourceRecord userEventRecord = this.createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", extraFields, extraValues);
        SourceRecord userEventRouted = (SourceRecord)router.apply((ConnectRecord)userEventRecord);
        Assertions.assertThat((Long)userEventRecord.timestamp()).isNull();
        Assertions.assertThat((Long)userEventRouted.timestamp()).isEqualTo((Object)expectedTimestamp);
    }

    @Test
    public void canRouteBasedOnField() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "aggregatetype");
        router.configure(config);
        SourceRecord userEventRecord = this.createEventRecord();
        SourceRecord userEventRouted = (SourceRecord)router.apply((ConnectRecord)userEventRecord);
        Assertions.assertThat((Object)userEventRouted).isNotNull();
        Assertions.assertThat((String)userEventRouted.topic()).isEqualTo((Object)"outbox.event.User");
        SourceRecord userUpdatedEventRecord = this.createEventRecord("ab720dd3-176d-40a6-96f3-6cf961d7df6a", "UserUpdate", "10711fa5", "User", "{}");
        SourceRecord userUpdatedEventRouted = (SourceRecord)router.apply((ConnectRecord)userUpdatedEventRecord);
        Assertions.assertThat((Object)userUpdatedEventRouted).isNotNull();
        Assertions.assertThat((String)userUpdatedEventRouted.topic()).isEqualTo((Object)"outbox.event.User");
        SourceRecord addressCreatedEventRecord = this.createEventRecord("ab720dd3-176d-40a6-96f3-6cf961d7df6a", "AddressCreated", "10711fa5", "Address", "{}");
        SourceRecord addressCreatedEventRouted = (SourceRecord)router.apply((ConnectRecord)addressCreatedEventRecord);
        Assertions.assertThat((Object)addressCreatedEventRouted).isNotNull();
        Assertions.assertThat((String)addressCreatedEventRouted.topic()).isEqualTo((Object)"outbox.event.Address");
    }

    @Test
    public void canConfigureEveryTableField() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
        config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "payload_id");
        router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("event_id", (Schema)SchemaBuilder.string()).field("payload_id", (Schema)SchemaBuilder.string()).field("event_type", (Schema)SchemaBuilder.string()).field("payload_body", (Schema)SchemaBuilder.string()).build();
        Envelope envelope = Envelope.defineSchema().withName("event.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("event_id", (Object)"da8d6de6-3b77-45ff-8f44-57db55a7a06c");
        before.put("payload_id", (Object)"10711fa5");
        before.put("event_type", (Object)"UserCreated");
        before.put("payload_body", (Object)"{}");
        Struct payload = envelope.create((Object)before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.value()).isEqualTo((Object)"{}");
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(1);
        Header header = (Header)headers.iterator().next();
        Assertions.assertThat((String)header.key()).isEqualTo((Object)"id");
        Assertions.assertThat((Object)header.value()).isEqualTo((Object)"da8d6de6-3b77-45ff-8f44-57db55a7a06c");
    }

    @Test
    public void canInfluenceTableColumnTypes() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
        config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "my_route_field");
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "some_boolean:envelope:bool");
        router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("event_id", (Schema)SchemaBuilder.int32()).field("payload_id", (Schema)SchemaBuilder.int32()).field("my_route_field", (Schema)SchemaBuilder.string()).field("event_type", (Schema)SchemaBuilder.bytes()).field("payload_body", (Schema)SchemaBuilder.bytes()).field("some_boolean", (Schema)SchemaBuilder.bool()).build();
        Envelope envelope = Envelope.defineSchema().withName("event.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("event_id", (Object)2);
        before.put("payload_id", (Object)1232);
        before.put("event_type", (Object)"CoolSchemaCreated".getBytes());
        before.put("my_route_field", (Object)"routename");
        before.put("payload_body", (Object)"{}".getBytes());
        before.put("some_boolean", (Object)true);
        Struct payload = envelope.create((Object)before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((String)eventRouted.topic()).isEqualTo((Object)"outbox.event.routename");
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Object)valueSchema.field("payload").schema().type()).isEqualTo((Object)SchemaBuilder.bytes().type());
        Assertions.assertThat((Object)valueSchema.field("bool").schema().type()).isEqualTo((Object)SchemaBuilder.bool().type());
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("payload")).isEqualTo((Object)"{}".getBytes());
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)1232);
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(1);
        Header header = (Header)headers.iterator().next();
        Assertions.assertThat((String)header.key()).isEqualTo((Object)"id");
        Assertions.assertThat((Object)header.value()).isEqualTo((Object)2);
    }

    @Test
    public void canSetSchemaVersionWhenMoreThanPayloadIsInEnvelope() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:eventType");
        router.configure(config);
        HashMap<String, Schema> extraFields = new HashMap<String, Schema>();
        extraFields.put("version", Schema.INT32_SCHEMA);
        HashMap<String, Object> extraValuesV1 = new HashMap<String, Object>();
        extraValuesV1.put("version", 1);
        SourceRecord eventRecordV1 = this.createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", extraFields, extraValuesV1);
        SourceRecord eventRoutedV1 = (SourceRecord)router.apply((ConnectRecord)eventRecordV1);
        Assertions.assertThat((Integer)eventRoutedV1.valueSchema().version()).isEqualTo(1);
        HashMap<String, Object> extraValuesV3 = new HashMap<String, Object>();
        extraValuesV3.put("version", 3);
        SourceRecord eventRecordV3 = this.createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", extraFields, extraValuesV3);
        SourceRecord eventRoutedV3 = (SourceRecord)router.apply((ConnectRecord)eventRecordV3);
        Assertions.assertThat((Integer)eventRoutedV3.valueSchema().version()).isEqualTo(3);
        SourceRecord eventRecordV1E2 = this.createEventRecord("18f94a39-b931-41b7-837c-6fc23b013597", "UserCreated", "1b10b70b", "User", "{}", extraFields, extraValuesV1);
        SourceRecord eventRoutedV1E2 = (SourceRecord)router.apply((ConnectRecord)eventRecordV1E2);
        Assertions.assertThat((Integer)eventRoutedV1E2.valueSchema().version()).isEqualTo(1);
        Assertions.assertThat((Object)eventRoutedV1.valueSchema()).isSameAs((Object)eventRoutedV1E2.valueSchema());
    }

    @Test
    public void shouldNotSetSchemaVersionByDefault() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        router.configure(config);
        HashMap<String, Schema> extraFields = new HashMap<String, Schema>();
        extraFields.put("version", Schema.INT32_SCHEMA);
        HashMap<String, Object> extraValuesV1 = new HashMap<String, Object>();
        extraValuesV1.put("version", 1);
        SourceRecord eventRecordV1 = this.createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", extraFields, extraValuesV1);
        SourceRecord eventRoutedV1 = (SourceRecord)router.apply((ConnectRecord)eventRecordV1);
        Assertions.assertThat((Integer)eventRoutedV1.valueSchema().version()).isNull();
        HashMap<String, Object> extraValuesV3 = new HashMap<String, Object>();
        extraValuesV3.put("version", 3);
        SourceRecord eventRecordV3 = this.createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", extraFields, extraValuesV3);
        SourceRecord eventRoutedV3 = (SourceRecord)router.apply((ConnectRecord)eventRecordV3);
        Assertions.assertThat((Integer)eventRoutedV3.valueSchema().version()).isNull();
        SourceRecord eventRecordV1E2 = this.createEventRecord("18f94a39-b931-41b7-837c-6fc23b013597", "UserCreated", "1b10b70b", "User", "{}", extraFields, extraValuesV1);
        SourceRecord eventRoutedV1E2 = (SourceRecord)router.apply((ConnectRecord)eventRecordV1E2);
        Assertions.assertThat((Integer)eventRoutedV1E2.valueSchema().version()).isNull();
    }

    @Test
    public void canSetPayloadTypeIntoTheEnvelope() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("type")).isEqualTo((Object)"UserCreated");
    }

    @Test
    public void canSetPayloadTypeIntoTheEnvelopeWithAlias() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:aggregateType");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("aggregateType")).isEqualTo((Object)"UserCreated");
    }

    @Test
    public void canSetMultipleFieldsIntoTheEnvelope() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:payloadType,aggregateid:envelope:payloadId,type:header:payloadType");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Struct value = (Struct)eventRouted.value();
        Assertions.assertThat((Object)value.get("payloadType")).isEqualTo((Object)"UserCreated");
        Assertions.assertThat((Object)value.get("payloadId")).isEqualTo((Object)"10711fa5");
        Assertions.assertThat((Object)eventRouted.headers().lastWithName("payloadType").value()).isEqualTo((Object)"UserCreated");
    }

    @Test
    public void canSetPartitionWithAdditionalFields() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
        config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "my_route_field");
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "p:partition");
        router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("event_id", (Schema)SchemaBuilder.int32()).field("payload_id", (Schema)SchemaBuilder.int32()).field("my_route_field", (Schema)SchemaBuilder.string()).field("event_type", (Schema)SchemaBuilder.bytes()).field("payload_body", (Schema)SchemaBuilder.bytes()).field("p", (Schema)SchemaBuilder.int32()).build();
        Envelope envelope = Envelope.defineSchema().withName("event.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("event_id", (Object)2);
        before.put("payload_id", (Object)1232);
        before.put("event_type", (Object)"CoolSchemaCreated".getBytes());
        before.put("my_route_field", (Object)"routename");
        before.put("payload_body", (Object)"{}".getBytes());
        before.put("p", (Object)123);
        Struct payload = envelope.create((Object)before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Integer)eventRouted.kafkaPartition()).isEqualTo(123);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForTopicRegex() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name(), " [[a-z]");
        router.configure(config);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForAdditionalFields() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type");
        router.configure(config);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForAdditionalFieldsEmpty() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "");
        router.configure(config);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForOperationBehavior() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), "invalidOption");
        router.configure(config);
    }

    @Test
    @FixFor(value={"DBZ-2152"})
    public void canPassStringKey() {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
    }

    @Test
    @FixFor(value={"DBZ-2152"})
    public void canSetBinaryMessageKey() {
        byte[] eventType = "a UserCreated".getBytes(StandardCharsets.UTF_8);
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "type");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", SchemaBuilder.bytes(), eventType, SchemaBuilder.string(), "Some other payload id", "User", SchemaBuilder.string(), "{}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.BYTES);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)eventType);
    }

    @Test
    @FixFor(value={"DBZ-2152"})
    public void canPassBinaryKey() {
        byte[] key = "a binary key".getBytes(StandardCharsets.UTF_8);
        this.canPassKeyByType(SchemaBuilder.bytes(), key);
    }

    @Test
    @FixFor(value={"DBZ-2152"})
    public void canPassIntKey() {
        int key = 54321;
        this.canPassKeyByType(SchemaBuilder.int32(), 54321);
    }

    private void canPassKeyByType(SchemaBuilder keyType, Object key) {
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", SchemaBuilder.string(), "UserCreated", keyType, key, "User", SchemaBuilder.string(), "{}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)keyType.type());
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo(key);
    }

    @Test
    @FixFor(value={"DBZ-2152"})
    public void canPassBinaryMessage() {
        byte[] value = "a binary message".getBytes(StandardCharsets.UTF_8);
        String key = "a key";
        EventRouter router = new EventRouter();
        HashMap config = new HashMap();
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", SchemaBuilder.string(), "UserCreated", SchemaBuilder.string(), "a key", "User", SchemaBuilder.bytes(), value, new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)"a key");
        Assertions.assertThat((Object)eventRouted.valueSchema().type()).isEqualTo((Object)Schema.Type.BYTES);
        Assertions.assertThat((Object)eventRouted.value()).isEqualTo((Object)value);
    }

    @Test
    public void canMarkAnEventAsDeleted() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "is_deleted:envelope:deleted");
        config.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
        router.configure(config);
        HashMap<String, Schema> extraFields = new HashMap<String, Schema>();
        extraFields.put("deleted", Schema.OPTIONAL_BOOLEAN_SCHEMA);
        HashMap<String, Object> extraValues = new HashMap<String, Object>();
        extraValues.put("is_deleted", true);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{}", extraFields, extraValues);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Struct value = (Struct)eventRouted.value();
        Assertions.assertThat((Object)value).isNotNull();
        Assertions.assertThat((Object)value.get("deleted")).isEqualTo((Object)true);
        SourceRecord eventRecordTombstone = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "", extraFields, extraValues);
        SourceRecord eventRoutedTombstone = (SourceRecord)router.apply((ConnectRecord)eventRecordTombstone);
        Struct tombstone = (Struct)eventRoutedTombstone.value();
        Assertions.assertThat((Object)tombstone).isNull();
        VerifyRecord.isValidTombstone(eventRoutedTombstone);
    }

    @Test
    public void noTombstoneIfNotConfigured() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "is_deleted:envelope:deleted");
        router.configure(config);
        HashMap<String, Schema> extraFields = new HashMap<String, Schema>();
        extraFields.put("deleted", Schema.OPTIONAL_BOOLEAN_SCHEMA);
        HashMap<String, Object> extraValues = new HashMap<String, Object>();
        extraValues.put("is_deleted", true);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{}", extraFields, extraValues);
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Struct value = (Struct)eventRouted.value();
        Assertions.assertThat((Object)value).isNotNull();
        Assertions.assertThat((Object)value.get("deleted")).isEqualTo((Object)true);
        SourceRecord eventRecordTombstone = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "", extraFields, extraValues);
        SourceRecord eventRoutedTombstone = (SourceRecord)router.apply((ConnectRecord)eventRecordTombstone);
        Struct tombstone = (Struct)eventRoutedTombstone.value();
        Assertions.assertThat((Object)eventRoutedTombstone.key()).isNotNull();
        Assertions.assertThat((Object)eventRoutedTombstone.keySchema()).isNotNull();
        Assertions.assertThat((Object)tombstone).isNotNull();
        Assertions.assertThat((Object)tombstone.get("deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)eventRoutedTombstone.valueSchema()).isNotNull();
    }

    @Test
    public void canExpandJsonPayloadIfConfigured() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), "true");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{\"fullName\": \"John Doe\", \"enabled\": true, \"rating\": 4.9, \"age\": 42, \"pets\": [\"dog\", \"cat\"], \"petObjects\": [{\"type\": \"dog\"}, {\"type\": \"cat\"}]}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Object)valueSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)valueSchema.fields().size()).isEqualTo(6);
        Assertions.assertThat((String)valueSchema.field("fullName").schema().type().getName()).isEqualTo((Object)"string");
        Assertions.assertThat((String)valueSchema.field("enabled").schema().type().getName()).isEqualTo((Object)"boolean");
        Assertions.assertThat((String)valueSchema.field("rating").schema().type().getName()).isEqualTo((Object)"float64");
        Assertions.assertThat((String)valueSchema.field("age").schema().type().getName()).isEqualTo((Object)"int32");
        Assertions.assertThat((String)valueSchema.field("pets").schema().type().getName()).isEqualTo((Object)"array");
        Struct valueStruct = (Struct)eventRouted.value();
        Assertions.assertThat((Object)valueStruct.get("fullName")).isEqualTo((Object)"John Doe");
        Assertions.assertThat((Object)valueStruct.get("enabled")).isEqualTo((Object)true);
        Assertions.assertThat((Object)valueStruct.get("rating")).isEqualTo((Object)4.9);
        Assertions.assertThat((Object)valueStruct.get("age")).isEqualTo((Object)42);
        Assertions.assertThat((int)valueStruct.getArray("pets").size()).isEqualTo(2);
        Assertions.assertThat(valueStruct.getArray("pets").get(1)).isEqualTo((Object)"cat");
        List petObjects = valueStruct.getArray("petObjects");
        Assertions.assertThat((int)petObjects.size()).isEqualTo(2);
        Assertions.assertThat((Object)this.asStruct(petObjects.get(0)).get("type")).isEqualTo((Object)"dog");
        Assertions.assertThat((Object)this.asStruct(petObjects.get(1)).get("type")).isEqualTo((Object)"cat");
    }

    @Test
    public void canExpandJsonWithNestedArraysWhereFirstArrayIsEmpty() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), "true");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{\"fullName\": \"John Doe\", \"petObjects\": [{\"type\": \"dog\", \"colors\": []}, {\"type\": \"cat\", \"colors\": [{\"name\": \"white\"}]}]}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Object)valueSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)valueSchema.fields().size()).isEqualTo(2);
        Assertions.assertThat((String)valueSchema.field("fullName").schema().type().getName()).isEqualTo((Object)"string");
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().type().getName()).isEqualTo((Object)"array");
        Assertions.assertThat((int)valueSchema.field("petObjects").schema().valueSchema().fields().size()).isEqualTo(2);
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().valueSchema().type().getName()).isEqualTo((Object)"struct");
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().valueSchema().field("type").schema().type().getName()).isEqualTo((Object)"string");
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().type().getName()).isEqualTo((Object)"array");
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().type().getName()).isEqualTo((Object)"struct");
        Assertions.assertThat((int)valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().fields().size()).isEqualTo(1);
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().field("name").schema().type().getName()).isEqualTo((Object)"string");
        Struct valueStruct = (Struct)eventRouted.value();
        Assertions.assertThat((Object)valueStruct.get("fullName")).isEqualTo((Object)"John Doe");
        List petObjects = valueStruct.getArray("petObjects");
        Assertions.assertThat((int)petObjects.size()).isEqualTo(2);
        Assertions.assertThat((Object)this.asStruct(petObjects.get(0)).get("type")).isEqualTo((Object)"dog");
        Assertions.assertThat((List)this.asStruct(petObjects.get(0)).getArray("colors")).hasSize(0);
        Assertions.assertThat((Object)this.asStruct(petObjects.get(1)).get("type")).isEqualTo((Object)"cat");
        Assertions.assertThat((List)this.asStruct(petObjects.get(1)).getArray("colors")).hasSize(1);
        List colors = this.asStruct(petObjects.get(1)).getArray("colors");
        Assertions.assertThat((Object)this.asStruct(colors.get(0)).get("name")).isEqualTo((Object)"white");
    }

    @Test
    public void shouldExpandJSONPayloadWithEmptyArrayAndRemoveThatArray() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), "true");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{\"fullName\": \"John Doe\", \"petObjects\": [{\"type\": \"dog\", \"colors\": []}]}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Object)valueSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)valueSchema.fields().size()).isEqualTo(2);
        Assertions.assertThat((String)valueSchema.field("fullName").schema().type().getName()).isEqualTo((Object)"string");
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().type().getName()).isEqualTo((Object)"array");
        Assertions.assertThat((int)valueSchema.field("petObjects").schema().valueSchema().fields().size()).isEqualTo(1);
        Assertions.assertThat((String)valueSchema.field("petObjects").schema().valueSchema().field("type").schema().type().getName()).isEqualTo((Object)"string");
        Struct valueStruct = (Struct)eventRouted.value();
        Assertions.assertThat((Object)valueStruct.get("fullName")).isEqualTo((Object)"John Doe");
        List petObjects = valueStruct.getArray("petObjects");
        Assertions.assertThat((int)petObjects.size()).isEqualTo(1);
        Assertions.assertThat((Object)this.asStruct(petObjects.get(0)).get("type")).isEqualTo((Object)"dog");
    }

    @Test
    public void shouldNotExpandJSONPayloadIfNotConfigured() {
        EventRouter router = new EventRouter();
        router.configure(new HashMap());
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{\"fullName\": \"John Doe\", \"rating\": 4.9, \"age\": 42}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.valueSchema().type()).isEqualTo((Object)SchemaBuilder.string().type());
        Assertions.assertThat((Object)eventRouted.value()).isEqualTo((Object)"{\"fullName\": \"John Doe\", \"rating\": 4.9, \"age\": 42}");
    }

    @Test
    public void canExpandJsonPayloadWithAdditionalFieldInEnvelope() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), "true");
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{\"fullName\": \"John Doe\"}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Object)valueSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)valueSchema.fields().size()).isEqualTo(2);
        Assertions.assertThat((String)valueSchema.field("type").schema().type().getName()).isEqualTo((Object)"string");
        Schema payloadSchema = valueSchema.field("payload").schema();
        Assertions.assertThat((Object)payloadSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)payloadSchema.fields().size()).isEqualTo(1);
        Assertions.assertThat((String)payloadSchema.field("fullName").schema().type().getName()).isEqualTo((Object)"string");
        Struct valueStruct = (Struct)eventRouted.value();
        Assertions.assertThat((Object)valueStruct.get("type")).isEqualTo((Object)"UserCreated");
        Struct payloadStruct = valueStruct.getStruct("payload");
        Assertions.assertThat((Object)payloadStruct.get("fullName")).isEqualTo((Object)"John Doe");
    }

    @Test
    public void canExpandJsonArrayWithFirstElementNull() {
        EventRouter router = new EventRouter();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), "true");
        router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{\"fullName\": \"John Doe\", \"numbers\": [null, 2, 3]}", new HashMap<String, Schema>(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Object)valueSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)valueSchema.fields().size()).isEqualTo(2);
        Assertions.assertThat((String)valueSchema.field("fullName").schema().type().getName()).isEqualTo((Object)"string");
        Assertions.assertThat((String)valueSchema.field("numbers").schema().type().getName()).isEqualTo((Object)"array");
        Assertions.assertThat((String)valueSchema.field("numbers").schema().valueSchema().type().getName()).isEqualTo((Object)"int32");
        Struct valueStruct = (Struct)eventRouted.value();
        Assertions.assertThat((Object)valueStruct.get("fullName")).isEqualTo((Object)"John Doe");
        List numbers = valueStruct.getArray("numbers");
        Assertions.assertThat((int)numbers.size()).isEqualTo(3);
        Assertions.assertThat(numbers.get(0)).isEqualTo(null);
        Assertions.assertThat(numbers.get(1)).isEqualTo((Object)2);
        Assertions.assertThat(numbers.get(2)).isEqualTo((Object)3);
    }

    private SourceRecord createEventRecord() {
        return this.createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{}");
    }

    private SourceRecord createEventRecord(String eventId, String eventType, String payloadId, String payloadType, String payload) {
        return this.createEventRecord(eventId, eventType, payloadId, payloadType, payload, new HashMap<String, Schema>(), new HashMap<String, Object>());
    }

    private SourceRecord createEventRecord(String eventId, String eventType, String payloadId, String payloadType, String payload, Map<String, Schema> extraFields, Map<String, Object> extraValues) {
        return this.createEventRecord(eventId, SchemaBuilder.string(), eventType, SchemaBuilder.string(), payloadId, payloadType, SchemaBuilder.string(), payload, extraFields, extraValues);
    }

    private SourceRecord createEventRecord(String eventId, SchemaBuilder eventTypeSchemaType, Object eventType, SchemaBuilder payloadIdSchemaType, Object payloadId, String payloadType, SchemaBuilder payloadSchemaType, Object payload, Map<String, Schema> extraFields, Map<String, Object> extraValues) {
        SchemaBuilder schemaBuilder = SchemaBuilder.struct().field("id", (Schema)SchemaBuilder.string()).field("aggregatetype", (Schema)SchemaBuilder.string()).field("aggregateid", (Schema)payloadIdSchemaType).field("type", (Schema)eventTypeSchemaType).field("payload", (Schema)payloadSchemaType).field("is_deleted", (Schema)SchemaBuilder.bool().optional());
        extraFields.forEach((arg_0, arg_1) -> ((SchemaBuilder)schemaBuilder).field(arg_0, arg_1));
        Schema recordSchema = schemaBuilder.build();
        Envelope envelope = Envelope.defineSchema().withName("event.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct after = new Struct(recordSchema);
        after.put("id", (Object)eventId);
        after.put("aggregatetype", (Object)payloadType);
        after.put("aggregateid", payloadId);
        after.put("type", eventType);
        after.put("payload", payload);
        extraValues.forEach((arg_0, arg_1) -> ((Struct)after).put(arg_0, arg_1));
        Struct body = envelope.create((Object)after, null, Instant.now());
        return new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)body);
    }

    private Struct asStruct(Object object) {
        Assertions.assertThat((Object)object).isInstanceOf(Struct.class);
        return (Struct)object;
    }
}

