/*
 * Decompiled with CFR 0.152.
 */
package io.simplesource.kafka.testutils;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandError;
import io.simplesource.api.CommandId;
import io.simplesource.data.FutureResult;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.KafkaCommandAPI;
import io.simplesource.kafka.internal.client.RequestAPIContext;
import io.simplesource.kafka.internal.client.ResponseSubscription;
import io.simplesource.kafka.internal.streams.topology.EventSourcedTopology;
import io.simplesource.kafka.internal.streams.topology.TopologyContext;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.CommandResponse;
import io.simplesource.kafka.model.ValueWithSequence;
import io.simplesource.kafka.spec.AggregateSpec;
import io.simplesource.kafka.spec.CommandSpec;
import io.simplesource.kafka.testutils.TestPublisher;
import io.simplesource.kafka.testutils.TestTopologyReceiver;
import io.simplesource.kafka.util.SpecUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;

public final class AggregateTestDriver<K, C, E, A> {
    private final TopologyTestDriver driver;
    private final AggregateSpec<K, C, E, A> aggregateSpec;
    private final AggregateSerdes<K, C, E, A> aggregateSerdes;
    private final KafkaCommandAPI<K, C> commandAPI;
    private final ArrayList<Runnable> statePollers;

    public AggregateTestDriver(AggregateSpec<K, C, E, A> aggregateSpec, KafkaConfig kafkaConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        TopologyContext ctx = new TopologyContext(aggregateSpec);
        ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("QueryAPI-scheduler"));
        EventSourcedTopology.addTopology((TopologyContext)ctx, (StreamsBuilder)builder);
        this.aggregateSpec = aggregateSpec;
        this.aggregateSerdes = aggregateSpec.serialization().serdes();
        Properties streamConfig = new Properties();
        streamConfig.putAll((Map<?, ?>)kafkaConfig.streamsConfig());
        this.driver = new TopologyTestDriver(builder.build(), streamConfig, 0L);
        TestPublisher commandRequestPublisher = new TestPublisher(this.driver, this.aggregateSerdes.aggregateKey(), this.aggregateSerdes.commandRequest(), this.topicName(AggregateResources.TopicEntity.command_request));
        TestPublisher responseTopicMapPublisher = new TestPublisher(this.driver, this.aggregateSerdes.commandId(), Serdes.String(), this.topicName(AggregateResources.TopicEntity.command_response_topic_map));
        CommandSpec commandSpec = SpecUtils.getCommandSpec(aggregateSpec, (String)"localhost");
        RequestAPIContext requestCtx = KafkaCommandAPI.getRequestAPIContext((CommandSpec)commandSpec, (KafkaConfig)kafkaConfig, (ScheduledExecutorService)scheduledExecutor);
        TestTopologyReceiver.ReceiverSpec receiverSpec = new TestTopologyReceiver.ReceiverSpec(requestCtx.privateResponseTopic(), 400, 4, requestCtx.responseValueSerde(), stringKey -> CommandId.of((UUID)UUID.fromString(stringKey.substring(stringKey.length() - 36))));
        this.statePollers = new ArrayList();
        Function<BiConsumer, ResponseSubscription> receiverAttacher = updateTarget -> {
            TestTopologyReceiver receiver = new TestTopologyReceiver(updateTarget, this.driver, receiverSpec);
            this.statePollers.add(receiver::pollForState);
            return receiver;
        };
        this.commandAPI = new KafkaCommandAPI(commandSpec, kafkaConfig, scheduledExecutor, commandRequestPublisher, responseTopicMapPublisher, receiverAttacher);
    }

    public FutureResult<CommandError, CommandId> publishCommand(CommandAPI.Request<K, C> request) {
        return this.commandAPI.publishCommand(request);
    }

    public FutureResult<CommandError, Sequence> queryCommandResult(CommandId commandId, Duration timeout) {
        this.pollForApiResponse();
        return this.commandAPI.queryCommandResult(commandId, timeout);
    }

    Optional<KeyValue<K, AggregateUpdate<A>>> readAggregateTopic() {
        ProducerRecord maybeRecord = this.driver.readOutput(this.topicName(AggregateResources.TopicEntity.aggregate), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.aggregateUpdate().deserializer());
        return Optional.ofNullable(maybeRecord).map(record -> KeyValue.pair((Object)record.key(), (Object)((AggregateUpdate)record.value())));
    }

    Optional<KeyValue<K, CommandResponse<K>>> readCommandResponseTopic() {
        ProducerRecord maybeRecord = this.driver.readOutput(this.topicName(AggregateResources.TopicEntity.command_response), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.commandResponse().deserializer());
        return Optional.ofNullable(maybeRecord).map(record -> KeyValue.pair((Object)record.key(), (Object)((CommandResponse)record.value())));
    }

    Optional<KeyValue<K, ValueWithSequence<E>>> readEventTopic() {
        ProducerRecord maybeRecord = this.driver.readOutput(this.topicName(AggregateResources.TopicEntity.event), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.valueWithSequence().deserializer());
        return Optional.ofNullable(maybeRecord).map(record -> KeyValue.pair((Object)record.key(), (Object)((ValueWithSequence)record.value())));
    }

    public void pollForApiResponse() {
        this.statePollers.forEach(Runnable::run);
    }

    public void close() {
        if (this.driver != null) {
            this.driver.close();
        }
    }

    private String topicName(AggregateResources.TopicEntity topic) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().topicName(this.aggregateSpec.aggregateName(), topic.name());
    }
}

