001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.instanceOf;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertThrows;
024import static org.mockito.ArgumentMatchers.any;
025import static org.mockito.ArgumentMatchers.anyList;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.mockStatic;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.Executors;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.hbase.util.Pair;
045import org.junit.After;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.mockito.MockedStatic;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053
054@Category({ ClientTests.class, SmallTests.class })
055public class TestBufferedMutatorOverAsyncBufferedMutator {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestBufferedMutatorOverAsyncBufferedMutator.class);
060
061  private AsyncBufferedMutator asyncMutator;
062
063  private BufferedMutatorOverAsyncBufferedMutator mutator;
064
065  private ExecutorService executor;
066
067  private MockedStatic<Pair> mockedPair;
068
069  @Before
070  public void setUp() {
071    asyncMutator = mock(AsyncBufferedMutator.class);
072    when(asyncMutator.getWriteBufferSize()).thenReturn(1024L * 1024);
073    mutator = new BufferedMutatorOverAsyncBufferedMutator(asyncMutator, (e, m) -> {
074      throw e;
075    });
076    executor =
077      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
078    mockedPair = mockStatic(Pair.class);
079  }
080
081  @After
082  public void tearDown() {
083    mockedPair.closeOnDemand();
084    executor.shutdown();
085  }
086
087  @Test
088  public void testRace() throws IOException {
089    CompletableFuture<Void> future = new CompletableFuture<>();
090    when(asyncMutator.mutate(anyList())).thenReturn(Arrays.asList(future));
091    mutator.mutate(new Put(Bytes.toBytes("aaa")));
092    verify(asyncMutator).mutate(anyList());
093    CountDownLatch beforeFlush = new CountDownLatch(1);
094    CountDownLatch afterFlush = new CountDownLatch(1);
095    Future<?> flushFuture = executor.submit(() -> {
096      beforeFlush.await();
097      mutator.flush();
098      afterFlush.countDown();
099      return null;
100    });
101    mockedPair.when(() -> Pair.newPair(any(), any())).then(i -> {
102      beforeFlush.countDown();
103      afterFlush.await(5, TimeUnit.SECONDS);
104      return i.callRealMethod();
105    });
106    future.completeExceptionally(new IOException("inject error"));
107    ExecutionException error = assertThrows(ExecutionException.class, () -> flushFuture.get());
108    assertThat(error.getCause(), instanceOf(RetriesExhaustedWithDetailsException.class));
109    assertEquals("inject error",
110      ((RetriesExhaustedWithDetailsException) error.getCause()).getCause(0).getMessage());
111  }
112}