/*
 * Decompiled with CFR 0.152.
 */
package cloud.localstack.awssdkv2;

import cloud.localstack.awssdkv2.PowerMockLocalStack;
import cloud.localstack.awssdkv2.consumer.DeliveryStatusRecordProcessorFactory;
import cloud.localstack.awssdkv2.consumer.EventProcessor;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
import io.thundra.jexter.junit4.core.sysprop.SystemPropertySandboxRule;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

@LocalstackDockerProperties(ignoreDockerRunErrors=true)
public class KinesisSchedulerTest
extends PowerMockLocalStack {
    String streamName = "test" + UUID.randomUUID().toString();
    String workerId = UUID.randomUUID().toString();
    String testMessage = "hello, world";
    Integer consumerCreationTime = 15;
    @Rule
    public SystemPropertySandboxRule systemPropertySandboxRule = new SystemPropertySandboxRule();

    @Before
    public void mockServicesForScheduler() {
        System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
        PowerMockLocalStack.mockCloudWatchAsyncClient();
        PowerMockLocalStack.mockDynamoDBAsync();
        PowerMockLocalStack.mockKinesisAsync();
    }

    @Test
    public void schedulerTest() throws Exception {
        KinesisAsyncClient kinesisAsyncClient = KinesisAsyncClient.create();
        DynamoDbAsyncClient dynamoAsyncClient = DynamoDbAsyncClient.create();
        CloudWatchAsyncClient cloudWatchAsyncClient = CloudWatchAsyncClient.create();
        this.createStream(kinesisAsyncClient);
        TimeUnit.SECONDS.sleep(2L);
        EventProcessor eventProcessor = new EventProcessor();
        DeliveryStatusRecordProcessorFactory processorFactory = new DeliveryStatusRecordProcessorFactory(eventProcessor);
        ConfigsBuilder configsBuilder = new ConfigsBuilder(this.streamName, this.streamName, kinesisAsyncClient, dynamoAsyncClient, cloudWatchAsyncClient, this.workerId, processorFactory){

            public RetrievalConfig retrievalConfig() {
                RetrievalConfig retrievalConfig = super.retrievalConfig();
                retrievalConfig.retrievalSpecificConfig((RetrievalSpecificConfig)new PollingConfig(this.streamName(), this.kinesisClient()));
                return retrievalConfig;
            }
        };
        Scheduler scheduler = this.createScheduler(configsBuilder);
        new Thread((Runnable)scheduler).start();
        TimeUnit.SECONDS.sleep(this.consumerCreationTime.intValue());
        this.putRecord(kinesisAsyncClient);
        TimeUnit.SECONDS.sleep(5L);
        scheduler.shutdown();
        Assert.assertTrue((boolean)eventProcessor.CONSUMER_CREATED);
        Assert.assertTrue((boolean)eventProcessor.RECORD_RECEIVED);
        Assert.assertTrue((eventProcessor.messages.size() > 0 ? 1 : 0) != 0);
        Assert.assertEquals((Object)eventProcessor.messages.get(0), (Object)this.testMessage);
    }

    public Scheduler createScheduler(ConfigsBuilder configsBuilder) {
        return new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig().metricsFactory((MetricsFactory)new NullMetricsFactory()), configsBuilder.processorConfig(), configsBuilder.retrievalConfig());
    }

    public void createStream(KinesisAsyncClient kinesisClient) throws Exception {
        CreateStreamRequest request = (CreateStreamRequest)CreateStreamRequest.builder().streamName(this.streamName).shardCount(Integer.valueOf(1)).build();
        CreateStreamResponse response = (CreateStreamResponse)kinesisClient.createStream(request).get();
        Assert.assertNotNull((Object)response);
    }

    public void putRecord(KinesisAsyncClient kinesisClient) throws Exception {
        PutRecordRequest request = (PutRecordRequest)PutRecordRequest.builder().partitionKey("partitionkey").streamName(this.streamName).data(SdkBytes.fromUtf8String((String)this.testMessage)).build();
        PutRecordResponse response = (PutRecordResponse)kinesisClient.putRecord(request).get();
        Assert.assertNotNull((Object)response);
    }
}

