/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.examples.complete.kafkatopubsub;

import com.google.auth.Credentials;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.beam.examples.complete.kafkatopubsub.KafkaToPubsub;
import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions;
import org.apache.beam.examples.complete.kafkatopubsub.transforms.FormatTransform;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PubSubEmulatorContainer;
import org.testcontainers.utility.DockerImageName;

public class KafkaToPubsubE2ETest {
    private static final @UnknownKeyFor @NonNull @Initialized String PUBSUB_EMULATOR_IMAGE = "gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators";
    private static final @UnknownKeyFor @NonNull @Initialized String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:5.4.3";
    private static final @UnknownKeyFor @NonNull @Initialized String PUBSUB_MESSAGE = "test pubsub message";
    private static final @UnknownKeyFor @NonNull @Initialized String KAFKA_TOPIC_NAME = "messages-topic";
    private static final @UnknownKeyFor @NonNull @Initialized String PROJECT_ID = "try-kafka-pubsub";
    private static final @UnknownKeyFor @NonNull @Initialized PipelineOptions OPTIONS = TestPipeline.testingPipelineOptions();
    @ClassRule
    public static final @UnknownKeyFor @NonNull @Initialized PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = new PubSubEmulatorContainer(DockerImageName.parse((String)"gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators"));
    @ClassRule
    public static final @UnknownKeyFor @NonNull @Initialized KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:5.4.3"));
    @Rule
    public final transient @UnknownKeyFor @NonNull @Initialized TestPipeline pipeline = TestPipeline.fromOptions((PipelineOptions)OPTIONS);
    @Rule
    public final transient @UnknownKeyFor @NonNull @Initialized TestPubsub testPubsub = TestPubsub.fromOptions((PipelineOptions)OPTIONS);

    @BeforeClass
    public static void beforeClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Credentials credentials = NoopCredentialFactory.fromOptions((PipelineOptions)OPTIONS).getCredential();
        ((DirectOptions)OPTIONS.as(DirectOptions.class)).setBlockOnRun(false);
        ((GcpOptions)OPTIONS.as(GcpOptions.class)).setGcpCredential(credentials);
        ((GcpOptions)OPTIONS.as(GcpOptions.class)).setProject(PROJECT_ID);
        ((PubsubOptions)OPTIONS.as(PubsubOptions.class)).setPubsubRootUrl("http://" + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
        ((KafkaToPubsubOptions)OPTIONS.as(KafkaToPubsubOptions.class)).setOutputFormat(FormatTransform.FORMAT.PUBSUB);
        ((KafkaToPubsubOptions)OPTIONS.as(KafkaToPubsubOptions.class)).setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers());
        ((KafkaToPubsubOptions)OPTIONS.as(KafkaToPubsubOptions.class)).setInputTopics(KAFKA_TOPIC_NAME);
        ((KafkaToPubsubOptions)OPTIONS.as(KafkaToPubsubOptions.class)).setKafkaConsumerConfig("auto.offset.reset=earliest");
    }

    @Before
    public void setUp() {
        ((KafkaToPubsubOptions)OPTIONS.as(KafkaToPubsubOptions.class)).setOutputTopic(this.testPubsub.topicPath().getPath());
    }

    @Test
    public void testKafkaToPubsubE2E() throws @UnknownKeyFor @NonNull @Initialized Exception {
        PipelineResult job = KafkaToPubsub.run((Pipeline)this.pipeline, (KafkaToPubsubOptions)((KafkaToPubsubOptions)OPTIONS.as(KafkaToPubsubOptions.class)));
        this.sendKafkaMessage();
        this.testPubsub.assertThatTopicEventuallyReceives(new Matcher[]{Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)PUBSUB_MESSAGE.getBytes(StandardCharsets.UTF_8)))}).waitForUpTo(Duration.standardMinutes((long)1L));
        try {
            job.cancel();
        }
        catch (UnsupportedOperationException e) {
            throw new AssertionError("Could not stop pipeline.", e);
        }
    }

    private void sendKafkaMessage() {
        try (KafkaProducer producer = new KafkaProducer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)KAFKA_CONTAINER.getBootstrapServers(), (Object)"client.id", (Object)UUID.randomUUID().toString()), (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
            producer.send(new ProducerRecord(KAFKA_TOPIC_NAME, (Object)"testcontainers", (Object)PUBSUB_MESSAGE)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Something went wrong in kafka producer", e);
        }
    }
}

