001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.instanceOf; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertThrows; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyList; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.mockStatic; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Executors; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.Pair; 045import org.junit.After; 046import org.junit.Before; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.mockito.MockedStatic; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053 054@Category({ ClientTests.class, SmallTests.class }) 055public class TestBufferedMutatorOverAsyncBufferedMutator { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestBufferedMutatorOverAsyncBufferedMutator.class); 060 061 private AsyncBufferedMutator asyncMutator; 062 063 private BufferedMutatorOverAsyncBufferedMutator mutator; 064 065 private ExecutorService executor; 066 067 private MockedStatic<Pair> mockedPair; 068 069 @Before 070 public void setUp() { 071 asyncMutator = mock(AsyncBufferedMutator.class); 072 when(asyncMutator.getWriteBufferSize()).thenReturn(1024L * 1024); 073 mutator = new BufferedMutatorOverAsyncBufferedMutator(asyncMutator, (e, m) -> { 074 throw e; 075 }); 076 executor = 077 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); 078 mockedPair = mockStatic(Pair.class); 079 } 080 081 @After 082 public void tearDown() { 083 mockedPair.closeOnDemand(); 084 executor.shutdown(); 085 } 086 087 @Test 088 public void testRace() throws IOException { 089 CompletableFuture<Void> future = new CompletableFuture<>(); 090 when(asyncMutator.mutate(anyList())).thenReturn(Arrays.asList(future)); 091 mutator.mutate(new Put(Bytes.toBytes("aaa"))); 092 verify(asyncMutator).mutate(anyList()); 093 CountDownLatch beforeFlush = new CountDownLatch(1); 094 CountDownLatch afterFlush = new CountDownLatch(1); 095 Future<?> flushFuture = executor.submit(() -> { 096 beforeFlush.await(); 097 mutator.flush(); 098 afterFlush.countDown(); 099 return null; 100 }); 101 mockedPair.when(() -> Pair.newPair(any(), any())).then(i -> { 102 beforeFlush.countDown(); 103 afterFlush.await(5, TimeUnit.SECONDS); 104 return i.callRealMethod(); 105 }); 106 future.completeExceptionally(new IOException("inject error")); 107 ExecutionException error = assertThrows(ExecutionException.class, () -> flushFuture.get()); 108 assertThat(error.getCause(), instanceOf(RetriesExhaustedWithDetailsException.class)); 109 assertEquals("inject error", 110 ((RetriesExhaustedWithDetailsException) error.getCause()).getCause(0).getMessage()); 111 } 112}