/*
 * Decompiled with CFR 0.152.
 */
package io.simplesource.kafka.serialization.avro;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.simplesource.api.CommandAPI;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Reason;
import io.simplesource.data.Result;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.AggregateUpdateResult;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.ValueWithSequence;
import io.simplesource.kafka.serialization.avro.AvroGenericUtils;
import io.simplesource.kafka.serialization.avro.AvroSpecificGenericMapper;
import io.simplesource.kafka.serialization.util.GenericMapper;
import io.simplesource.kafka.serialization.util.GenericSerde;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;

public final class AvroAggregateSerdes<K, C, E, A>
implements AggregateSerdes<K, C, E, A> {
    private final Serde<GenericRecord> keySerde;
    private final Serde<GenericRecord> valueSerde;
    private final Map<String, Object> serializerConfig;
    private final Serde<K> ak;
    private final Serde<CommandRequest<C>> cr;
    private final Serde<UUID> crk;
    private final Serde<ValueWithSequence<E>> vws;
    private final Serde<AggregateUpdate<A>> au;
    private final Serde<AggregateUpdateResult<A>> aur;

    public static <A extends GenericRecord, E extends GenericRecord, C extends GenericRecord, K extends GenericRecord> AvroAggregateSerdes<A, E, C, K> of(String schemaRegistryUrl, Schema aggregateSchema) {
        return AvroAggregateSerdes.of(schemaRegistryUrl, false, aggregateSchema);
    }

    public static <A extends GenericRecord, E extends GenericRecord, C extends GenericRecord, K extends GenericRecord> AvroAggregateSerdes<A, E, C, K> of(String schemaRegistryUrl, boolean useMockSchemaRegistry, Schema aggregateSchema) {
        return new AvroAggregateSerdes(AvroSpecificGenericMapper.specificDomainMapper(), AvroSpecificGenericMapper.specificDomainMapper(), AvroSpecificGenericMapper.specificDomainMapper(), AvroSpecificGenericMapper.specificDomainMapper(), schemaRegistryUrl, useMockSchemaRegistry, aggregateSchema);
    }

    public AvroAggregateSerdes(GenericMapper<A, GenericRecord> aggregateMapper, GenericMapper<E, GenericRecord> eventMapper, GenericMapper<C, GenericRecord> commandMapper, GenericMapper<K, GenericRecord> keyMapper, String schemaRegistryUrl, boolean useMockSchemaRegistry, Schema aggregateSchema) {
        GenericMapper aggregateMapper1 = aggregateMapper;
        GenericMapper eventMapper1 = eventMapper;
        GenericMapper commandMapper1 = commandMapper;
        GenericMapper<K, GenericRecord> keyMapper1 = keyMapper;
        Schema aggregateSchema1 = aggregateSchema;
        this.keySerde = AvroGenericUtils.genericAvroSerde(schemaRegistryUrl, useMockSchemaRegistry, true);
        this.valueSerde = AvroGenericUtils.genericAvroSerde(schemaRegistryUrl, useMockSchemaRegistry, false);
        Consumed consumed = Consumed.with(this.keySerde, this.valueSerde);
        Produced produced = Produced.with(this.keySerde, this.valueSerde);
        Serialized serialized = Serialized.with(this.keySerde, this.valueSerde);
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("key.serializer", KafkaAvroSerializer.class);
        configs.put("value.serializer", KafkaAvroSerializer.class);
        configs.put("schema.registry.url", schemaRegistryUrl);
        this.serializerConfig = Collections.unmodifiableMap(configs);
        this.ak = GenericSerde.of(this.keySerde, keyMapper::toGeneric, keyMapper::fromGeneric);
        this.cr = GenericSerde.of(this.valueSerde, v -> CommandRequestAvroHelper.toGenericRecord((CommandRequest<GenericRecord>)v.map(commandMapper::toGeneric)), (S s) -> CommandRequestAvroHelper.fromGenericRecord(s).map(commandMapper::fromGeneric));
        this.crk = GenericSerde.of(this.valueSerde, CommandResponseKeyAvroHelper::toGenericRecord, CommandResponseKeyAvroHelper::fromGenericRecord);
        this.vws = GenericSerde.of(this.valueSerde, v -> AvroGenericUtils.ValueWithSequenceAvroHelper.toGenericRecord((ValueWithSequence<GenericRecord>)v.map(eventMapper::toGeneric)), (S s) -> AvroGenericUtils.ValueWithSequenceAvroHelper.fromGenericRecord(s).map(eventMapper::fromGeneric));
        this.au = GenericSerde.of(this.valueSerde, v -> AggregateUpdateAvroHelper.toGenericRecord((AggregateUpdate<GenericRecord>)v.map(aggregateMapper::toGeneric), aggregateSchema), (S s) -> AggregateUpdateAvroHelper.fromGenericRecord(s).map(aggregateMapper::fromGeneric));
        this.aur = GenericSerde.of(this.valueSerde, v -> CommandResponseAvroHelper.toCommandResponse((AggregateUpdateResult<GenericRecord>)v.map(aggregateMapper::toGeneric), aggregateSchema), (S s) -> CommandResponseAvroHelper.fromCommandResponse(s).map(aggregateMapper::fromGeneric));
    }

    public Serde<K> aggregateKey() {
        return this.ak;
    }

    public Serde<CommandRequest<C>> commandRequest() {
        return this.cr;
    }

    public Serde<UUID> commandResponseKey() {
        return this.crk;
    }

    public Serde<ValueWithSequence<E>> valueWithSequence() {
        return this.vws;
    }

    public Serde<AggregateUpdate<A>> aggregateUpdate() {
        return this.au;
    }

    public Serde<AggregateUpdateResult<A>> updateResult() {
        return this.aur;
    }

    public Map<String, Object> serializerConfig() {
        return this.serializerConfig;
    }

    private static Schema toNullableSchema(Schema schema) {
        return Schema.createUnion(Arrays.asList(Schema.create((Schema.Type)Schema.Type.NULL), schema));
    }

    private static class CommandResponseAvroHelper {
        private static final Map<Schema, Schema> schemaCache = new ConcurrentHashMap<Schema, Schema>();
        private static final String READ_SEQUENCE = "readSequence";
        private static final String COMMAND_ID = "commandId";
        private static final String RESULT = "result";
        private static final String REASON = "reason";
        private static final String ADDITIONAL_REASONS = "additionalReasons";
        private static final String ERROR_MESSAGE = "errorMessage";
        private static final String ERROR_CODE = "errorCode";
        private static final String WRITE_SEQUENCE = "writeSequence";
        private static final String AGGREGATION = "aggregate_update";

        private CommandResponseAvroHelper() {
        }

        static GenericRecord toCommandResponse(AggregateUpdateResult<GenericRecord> aggregateUpdateResult, Schema aggregateSchema) {
            Schema schema = schemaCache.computeIfAbsent(aggregateSchema, CommandResponseAvroHelper::commandResponseSchema);
            Schema resultSchema = schema.getField(RESULT).schema();
            Schema responseFailureSchema = (Schema)resultSchema.getTypes().get(0);
            Schema reasonSchema = responseFailureSchema.getField(REASON).schema();
            Schema responseSuccessSchema = (Schema)resultSchema.getTypes().get(1);
            return new GenericRecordBuilder(schema).set(READ_SEQUENCE, (Object)aggregateUpdateResult.readSequence().getSeq()).set(COMMAND_ID, (Object)aggregateUpdateResult.commandId().toString()).set(RESULT, aggregateUpdateResult.updatedAggregateResult().fold(reasons -> new GenericRecordBuilder(responseFailureSchema).set(REASON, (Object)CommandResponseAvroHelper.fromReason(reasonSchema, (Reason<CommandAPI.CommandError>)((Reason)reasons.head()))).set(ADDITIONAL_REASONS, reasons.tail().stream().map(reason -> CommandResponseAvroHelper.fromReason(reasonSchema, (Reason<CommandAPI.CommandError>)reason)).collect(Collectors.toList())).build(), aggregateUpdate -> new GenericRecordBuilder(responseSuccessSchema).set(WRITE_SEQUENCE, (Object)aggregateUpdate.sequence().getSeq()).set(AGGREGATION, aggregateUpdate.aggregate()).build())).build();
        }

        private static GenericRecord fromReason(Schema schema, Reason<CommandAPI.CommandError> reason) {
            return new GenericRecordBuilder(schema).set(ERROR_MESSAGE, (Object)reason.getMessage()).set(ERROR_CODE, (Object)((CommandAPI.CommandError)reason.getError()).name()).build();
        }

        static AggregateUpdateResult<GenericRecord> fromCommandResponse(GenericRecord record) {
            Result result;
            Sequence readSequence = Sequence.position((long)((Long)record.get(READ_SEQUENCE)));
            UUID commandId = UUID.fromString(String.valueOf(record.get(COMMAND_ID)));
            GenericRecord genericResult = (GenericRecord)record.get(RESULT);
            if (Objects.nonNull(genericResult.get(WRITE_SEQUENCE))) {
                Sequence writeSequence = Sequence.position((long)((Long)genericResult.get(WRITE_SEQUENCE)));
                GenericRecord genericAggregate = (GenericRecord)genericResult.get(AGGREGATION);
                result = Result.success((Object)new AggregateUpdate((Object)genericAggregate, writeSequence));
            } else {
                Reason<CommandAPI.CommandError> reason = CommandResponseAvroHelper.toReason((GenericRecord)genericResult.get(REASON));
                List additionalReasons = ((List)genericResult.get(ADDITIONAL_REASONS)).stream().map(CommandResponseAvroHelper::toReason).collect(Collectors.toList());
                result = Result.failure((NonEmptyList)new NonEmptyList(reason, additionalReasons));
            }
            return new AggregateUpdateResult(commandId, readSequence, result);
        }

        private static Reason<CommandAPI.CommandError> toReason(GenericRecord record) {
            CommandAPI.CommandError error;
            String errorMessage = String.valueOf(record.get(ERROR_MESSAGE));
            String errorCodeStr = String.valueOf(record.get(ERROR_CODE));
            try {
                error = CommandAPI.CommandError.valueOf((String)errorCodeStr);
            }
            catch (IllegalArgumentException e) {
                error = CommandAPI.CommandError.UnexpectedErrorCode;
                errorMessage = errorMessage + "Unexpected errorCode " + errorCodeStr;
            }
            return Reason.of((Object)error, (String)errorMessage);
        }

        private static Schema commandResponseSchema(Schema aggregateSchema) {
            Schema reasonSchema = (Schema)SchemaBuilder.record((String)(aggregateSchema.getName() + "Reason")).fields().name(ERROR_MESSAGE).type().stringType().noDefault().name(ERROR_CODE).type().stringType().noDefault().endRecord();
            Schema updateFailure = (Schema)((SchemaBuilder.ArrayDefault)SchemaBuilder.record((String)(aggregateSchema.getName() + "CommandResponseFailure")).fields().name(REASON).type(reasonSchema).noDefault().name(ADDITIONAL_REASONS).type().array().items(reasonSchema)).noDefault().endRecord();
            Schema updateSuccess = (Schema)SchemaBuilder.record((String)(aggregateSchema.getName() + "CommandResponseSuccess")).fields().name(WRITE_SEQUENCE).type().longType().noDefault().name(AGGREGATION).type(AvroAggregateSerdes.toNullableSchema(aggregateSchema)).withDefault(null).endRecord();
            return (Schema)((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)(aggregateSchema.getName() + "CommandResponse")).namespace(aggregateSchema.getNamespace())).fields().name(READ_SEQUENCE).type().longType().noDefault().name(COMMAND_ID).type().stringType().noDefault().name(RESULT).type(Schema.createUnion(Arrays.asList(updateFailure, updateSuccess))).noDefault().endRecord();
        }
    }

    private static class AggregateUpdateAvroHelper {
        private static final Map<Schema, Schema> schemaCache = new ConcurrentHashMap<Schema, Schema>();
        private static final String AGGREGATION = "aggregate_update";
        private static final String SEQUENCE = "sequence";

        private AggregateUpdateAvroHelper() {
        }

        static GenericRecord toGenericRecord(AggregateUpdate<GenericRecord> aggregateUpdate, Schema aggregateSchema) {
            Schema schema = schemaCache.computeIfAbsent(aggregateSchema, AggregateUpdateAvroHelper::generateSchema);
            GenericRecordBuilder builder = new GenericRecordBuilder(schema);
            return builder.set(AGGREGATION, aggregateUpdate.aggregate()).set(SEQUENCE, (Object)aggregateUpdate.sequence().getSeq()).build();
        }

        static AggregateUpdate<GenericRecord> fromGenericRecord(GenericRecord record) {
            GenericRecord genericAggregate = (GenericRecord)record.get(AGGREGATION);
            Sequence sequence = Sequence.position((long)((Long)record.get(SEQUENCE)));
            return new AggregateUpdate((Object)genericAggregate, sequence);
        }

        private static Schema generateSchema(Schema aggregateSchema) {
            return (Schema)((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)(aggregateSchema.getName() + "OptionalAggregateWithSequence")).namespace(aggregateSchema.getNamespace())).fields().name(AGGREGATION).type(AvroAggregateSerdes.toNullableSchema(aggregateSchema)).withDefault(null).name(SEQUENCE).type().longType().noDefault().endRecord();
        }
    }

    private static class CommandResponseKeyAvroHelper {
        private static final Schema schema = CommandResponseKeyAvroHelper.commandResponseKeySchema();
        private static final String COMMAND_ID = "commandId";

        private CommandResponseKeyAvroHelper() {
        }

        static GenericRecord toGenericRecord(UUID commandResponseKey) {
            GenericRecordBuilder builder = new GenericRecordBuilder(schema);
            return builder.set(COMMAND_ID, (Object)commandResponseKey.toString()).build();
        }

        static UUID fromGenericRecord(GenericRecord record) {
            return UUID.fromString(String.valueOf(record.get(COMMAND_ID)));
        }

        private static Schema commandResponseKeySchema() {
            return (Schema)((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)"CommandResponseKey").namespace("io.simplesource.kafka.serialization.avro")).fields().name(COMMAND_ID).type().stringType().noDefault().endRecord();
        }
    }

    private static class CommandRequestAvroHelper {
        private static final Map<Schema, Schema> schemaCache = new ConcurrentHashMap<Schema, Schema>();
        private static final String READ_SEQUENCE = "readSequence";
        private static final String COMMAND_ID = "commandId";
        private static final String COMMAND = "command";

        private CommandRequestAvroHelper() {
        }

        static GenericRecord toGenericRecord(CommandRequest<GenericRecord> commandRequest) {
            GenericRecord command = (GenericRecord)commandRequest.command();
            Schema schema = schemaCache.computeIfAbsent(command.getSchema(), k -> CommandRequestAvroHelper.commandRequestSchema(command));
            GenericRecordBuilder builder = new GenericRecordBuilder(schema);
            return builder.set(READ_SEQUENCE, (Object)commandRequest.readSequence().getSeq()).set(COMMAND_ID, (Object)commandRequest.commandId().toString()).set(COMMAND, (Object)command).build();
        }

        static CommandRequest<GenericRecord> fromGenericRecord(GenericRecord record) {
            Sequence readSequence = Sequence.position((long)((Long)record.get(READ_SEQUENCE)));
            UUID commandId = UUID.fromString(String.valueOf(record.get(COMMAND_ID)));
            GenericRecord command = (GenericRecord)record.get(COMMAND);
            return new CommandRequest((Object)command, readSequence, commandId);
        }

        private static Schema commandRequestSchema(GenericRecord command) {
            return (Schema)((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)(command.getSchema().getName() + "CommandRequest")).namespace(command.getClass().getPackage().getName())).fields().name(READ_SEQUENCE).type().longType().noDefault().name(COMMAND_ID).type().stringType().noDefault().name(COMMAND).type(command.getSchema()).noDefault().endRecord();
        }
    }
}

