/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsub;

import com.google.cloud.GrpcServiceOptions;
import com.google.cloud.pubsub.AckDeadlineRenewer;
import com.google.cloud.pubsub.Message;
import com.google.cloud.pubsub.MessageConsumerImpl;
import com.google.cloud.pubsub.PubSub;
import com.google.cloud.pubsub.PubSubOptions;
import com.google.cloud.pubsub.spi.PubSubRpc;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class MessageConsumerImplTest {
    private static final String PROJECT = "project";
    private static final String SUBSCRIPTION = "subscription";
    private static final String SUBSCRIPTION_PB = "projects/project/subscriptions/subscription";
    private static final int MAX_QUEUED_CALLBACKS = 42;
    private static final Message MESSAGE1 = Message.of((String)"payload1");
    private static final Message MESSAGE2 = Message.of((String)"payload2");
    private static final String ACK_ID1 = "ack-id1";
    private static final String ACK_ID2 = "ack-id2";
    private static final ReceivedMessage MESSAGE1_PB = ReceivedMessage.newBuilder().setAckId("ack-id1").setMessage(MESSAGE1.toPb()).build();
    private static final ReceivedMessage MESSAGE2_PB = ReceivedMessage.newBuilder().setAckId("ack-id2").setMessage(MESSAGE2.toPb()).build();
    private static final PullResponse PULL_RESPONSE = PullResponse.newBuilder().addReceivedMessages(MESSAGE1_PB).addReceivedMessages(MESSAGE2_PB).build();
    private static final PubSub.MessageProcessor DO_NOTHING_PROCESSOR = new PubSub.MessageProcessor(){

        public void process(Message message) throws Exception {
        }
    };
    private static final PubSub.MessageProcessor THROW_PROCESSOR = new PubSub.MessageProcessor(){

        public void process(Message message) throws Exception {
            throw new RuntimeException();
        }
    };
    private static final PullResponse EMPTY_RESPONSE = PullResponse.getDefaultInstance();
    private PubSubRpc pubsubRpc;
    private PubSub pubsub;
    private PubSubOptions options;
    private AckDeadlineRenewer renewer;
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)60L);

    @Before
    public void setUp() {
        this.pubsubRpc = (PubSubRpc)EasyMock.createStrictMock(PubSubRpc.class);
        this.pubsub = (PubSub)EasyMock.createMock(PubSub.class);
        this.options = (PubSubOptions)EasyMock.createStrictMock(PubSubOptions.class);
        this.renewer = (AckDeadlineRenewer)EasyMock.createMock(AckDeadlineRenewer.class);
    }

    @After
    public void tearDown() {
        EasyMock.verify((Object[])new Object[]{this.pubsubRpc});
        EasyMock.verify((Object[])new Object[]{this.pubsub});
        EasyMock.verify((Object[])new Object[]{this.options});
        EasyMock.verify((Object[])new Object[]{this.renewer});
    }

    private static PullRequest pullRequest(int maxQueuedCallbacks) {
        return PullRequest.newBuilder().setMaxMessages(maxQueuedCallbacks).setSubscription(SUBSCRIPTION_PB).setReturnImmediately(false).build();
    }

    private static IAnswer<Void> createAnswer(final CountDownLatch latch) {
        return new IAnswer<Void>(){

            public Void answer() throws Throwable {
                latch.countDown();
                return null;
            }
        };
    }

    @Test
    public void testMessageConsumerAck() throws Exception {
        PullRequest request = MessageConsumerImplTest.pullRequest(42);
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        EasyMock.expect((Object)this.options.projectId()).andReturn((Object)PROJECT).anyTimes();
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options).times(2);
        CountDownLatch latch = new CountDownLatch(2);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andReturn(null);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID2, new String[0])).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.pubsub});
        EasyMock.expect((Object)this.pubsubRpc.pull(request)).andReturn((Object)new TestPullFuture(PULL_RESPONSE));
        EasyMock.expect((Object)this.pubsubRpc.pull((PullRequest)EasyMock.anyObject())).andReturn((Object)new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.add(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.remove(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.options, this.renewer});
        try (MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)DO_NOTHING_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(42)).build();){
            latch.await();
        }
    }

    @Test
    public void testMessageConsumerNack() throws Exception {
        PullRequest request = MessageConsumerImplTest.pullRequest(42);
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        EasyMock.expect((Object)this.options.projectId()).andReturn((Object)PROJECT).anyTimes();
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options).times(2);
        CountDownLatch latch = new CountDownLatch(2);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andReturn(null);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID2, new String[0])).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.pubsub});
        EasyMock.expect((Object)this.pubsubRpc.pull(request)).andReturn((Object)new TestPullFuture(PULL_RESPONSE));
        EasyMock.expect((Object)this.pubsubRpc.pull((PullRequest)EasyMock.anyObject())).andReturn((Object)new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.add(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.remove(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.options, this.renewer});
        try (MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)THROW_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(42)).build();){
            latch.await();
        }
    }

    @Test
    public void testMessageConsumerMultipleCallsAck() throws Exception {
        PullRequest request1 = MessageConsumerImplTest.pullRequest(42);
        PullRequest request2 = MessageConsumerImplTest.pullRequest(41);
        PullResponse response1 = PullResponse.newBuilder().addReceivedMessages(MESSAGE1_PB).build();
        final PullResponse response2 = PullResponse.newBuilder().addReceivedMessages(MESSAGE2_PB).build();
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        EasyMock.expect((Object)this.options.projectId()).andReturn((Object)PROJECT).anyTimes();
        final CountDownLatch nextPullLatch = new CountDownLatch(1);
        CountDownLatch latch = new CountDownLatch(2);
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andAnswer((IAnswer)new IAnswer<Future<Void>>(){

            public Future<Void> answer() throws Throwable {
                nextPullLatch.await();
                return null;
            }
        });
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID2, new String[0])).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.pubsub});
        EasyMock.expect((Object)this.pubsubRpc.pull(request1)).andReturn((Object)new TestPullFuture(response1));
        EasyMock.expect((Object)this.pubsubRpc.pull(request2)).andAnswer((IAnswer)new IAnswer<PubSubRpc.PullFuture>(){

            public PubSubRpc.PullFuture answer() throws Throwable {
                nextPullLatch.countDown();
                return new TestPullFuture(response2);
            }
        });
        EasyMock.expect((Object)this.pubsubRpc.pull((PullRequest)EasyMock.anyObject())).andReturn((Object)new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.add(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.options, this.renewer});
        try (MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)DO_NOTHING_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(42)).build();){
            latch.await();
        }
    }

    @Test
    public void testMessageConsumerMultipleCallsNack() throws Exception {
        PullRequest request1 = MessageConsumerImplTest.pullRequest(42);
        PullRequest request2 = MessageConsumerImplTest.pullRequest(41);
        PullResponse response1 = PullResponse.newBuilder().addReceivedMessages(MESSAGE1_PB).build();
        final PullResponse response2 = PullResponse.newBuilder().addReceivedMessages(MESSAGE2_PB).build();
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        EasyMock.expect((Object)this.options.projectId()).andReturn((Object)PROJECT).anyTimes();
        final CountDownLatch nextPullLatch = new CountDownLatch(1);
        CountDownLatch latch = new CountDownLatch(2);
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andAnswer((IAnswer)new IAnswer<Future<Void>>(){

            public Future<Void> answer() throws Throwable {
                nextPullLatch.await();
                return null;
            }
        });
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID2, new String[0])).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.pubsub});
        EasyMock.expect((Object)this.pubsubRpc.pull(request1)).andReturn((Object)new TestPullFuture(response1));
        EasyMock.expect((Object)this.pubsubRpc.pull(request2)).andAnswer((IAnswer)new IAnswer<PubSubRpc.PullFuture>(){

            public PubSubRpc.PullFuture answer() throws Throwable {
                nextPullLatch.countDown();
                return new TestPullFuture(response2);
            }
        });
        EasyMock.expect((Object)this.pubsubRpc.pull((PullRequest)EasyMock.anyObject())).andReturn((Object)new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.add(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.options, this.renewer});
        try (MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)THROW_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(42)).build();){
            latch.await();
        }
    }

    @Test
    public void testMessageConsumerMaxCallbacksAck() throws Exception {
        PullRequest request1 = MessageConsumerImplTest.pullRequest(2);
        PullRequest request2 = MessageConsumerImplTest.pullRequest(1);
        final PullResponse otherPullResponse = PullResponse.newBuilder().addReceivedMessages(MESSAGE1_PB).build();
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        EasyMock.expect((Object)this.options.projectId()).andReturn((Object)PROJECT).anyTimes();
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options).times(2);
        final CountDownLatch nextPullLatch = new CountDownLatch(1);
        CountDownLatch latch = new CountDownLatch(3);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andReturn(null);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID2, new String[0])).andAnswer((IAnswer)new IAnswer<Future<Void>>(){

            public Future<Void> answer() throws Throwable {
                nextPullLatch.await();
                return null;
            }
        });
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options);
        EasyMock.expect((Object)this.pubsub.ackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.pubsub});
        EasyMock.expect((Object)this.pubsubRpc.pull(request1)).andReturn((Object)new TestPullFuture(PULL_RESPONSE));
        EasyMock.expect((Object)this.pubsubRpc.pull(request2)).andAnswer((IAnswer)new IAnswer<PubSubRpc.PullFuture>(){

            public PubSubRpc.PullFuture answer() throws Throwable {
                nextPullLatch.countDown();
                return new TestPullFuture(otherPullResponse);
            }
        });
        EasyMock.expect((Object)this.pubsubRpc.pull((PullRequest)EasyMock.anyObject())).andReturn((Object)new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.add(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.remove(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.options, this.renewer});
        try (MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)DO_NOTHING_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(2)).build();){
            latch.await();
        }
    }

    @Test
    public void testMessageConsumerMaxCallbacksNack() throws Exception {
        PullRequest request1 = MessageConsumerImplTest.pullRequest(2);
        PullRequest request2 = MessageConsumerImplTest.pullRequest(1);
        final PullResponse otherPullResponse = PullResponse.newBuilder().addReceivedMessages(MESSAGE1_PB).build();
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        EasyMock.expect((Object)this.options.projectId()).andReturn((Object)PROJECT).anyTimes();
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options).times(2);
        final CountDownLatch nextPullLatch = new CountDownLatch(1);
        CountDownLatch latch = new CountDownLatch(3);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andReturn(null);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID2, new String[0])).andAnswer((IAnswer)new IAnswer<Future<Void>>(){

            public Future<Void> answer() throws Throwable {
                nextPullLatch.await();
                return null;
            }
        });
        EasyMock.expect((Object)this.pubsub.options()).andReturn((Object)this.options);
        EasyMock.expect((Object)this.pubsub.nackAsync(SUBSCRIPTION, ACK_ID1, new String[0])).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.pubsub});
        EasyMock.expect((Object)this.pubsubRpc.pull(request1)).andReturn((Object)new TestPullFuture(PULL_RESPONSE));
        EasyMock.expect((Object)this.pubsubRpc.pull(request2)).andAnswer((IAnswer)new IAnswer<PubSubRpc.PullFuture>(){

            public PubSubRpc.PullFuture answer() throws Throwable {
                nextPullLatch.countDown();
                return new TestPullFuture(otherPullResponse);
            }
        });
        EasyMock.expect((Object)this.pubsubRpc.pull((PullRequest)EasyMock.anyObject())).andReturn((Object)new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.add(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.remove(SUBSCRIPTION, ACK_ID2);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        this.renewer.add(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall();
        this.renewer.remove(SUBSCRIPTION, ACK_ID1);
        EasyMock.expectLastCall().andAnswer(MessageConsumerImplTest.createAnswer(latch));
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.options, this.renewer});
        try (MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)THROW_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(2)).build();){
            latch.await();
        }
    }

    @Test
    public void testClose() throws Exception {
        EasyMock.expect((Object)this.options.rpc()).andReturn((Object)this.pubsubRpc);
        EasyMock.expect((Object)this.options.service()).andReturn((Object)this.pubsub);
        final ExecutorService executor = (ExecutorService)EasyMock.createStrictMock(ExecutorService.class);
        executor.shutdown();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.pubsubRpc, this.pubsub, this.options, executor, this.renewer});
        MessageConsumerImpl consumer = MessageConsumerImpl.builder((PubSubOptions)this.options, (String)SUBSCRIPTION, (AckDeadlineRenewer)this.renewer, (PubSub.MessageProcessor)DO_NOTHING_PROCESSOR).maxQueuedCallbacks(Integer.valueOf(42)).executorFactory((GrpcServiceOptions.ExecutorFactory)new GrpcServiceOptions.ExecutorFactory<ExecutorService>(){

            public ExecutorService get() {
                return executor;
            }

            public void release(ExecutorService executor2) {
                executor2.shutdown();
            }
        }).build();
        consumer.close();
        consumer.close();
        EasyMock.verify((Object[])new Object[]{executor});
    }

    static final class TestPullFuture
    extends ForwardingListenableFuture.SimpleForwardingListenableFuture<PullResponse>
    implements PubSubRpc.PullFuture {
        TestPullFuture(PullResponse response) {
            super(Futures.immediateFuture((Object)response));
        }

        public void addCallback(final PubSubRpc.PullCallback callback) {
            Futures.addCallback((ListenableFuture)this.delegate(), (FutureCallback)new FutureCallback<PullResponse>(){

                public void onSuccess(PullResponse result) {
                    callback.success(result);
                }

                public void onFailure(Throwable error) {
                    callback.failure(error);
                }
            });
        }
    }
}

