001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.MatcherAssert.assertThat; 021import static org.hamcrest.Matchers.instanceOf; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertThrows; 024import static org.mockito.ArgumentMatchers.any; 025import static org.mockito.ArgumentMatchers.anyList; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.mockStatic; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.ExecutorService; 037import java.util.concurrent.Executors; 038import java.util.concurrent.Future; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.Pair; 044import org.junit.jupiter.api.AfterEach; 045import org.junit.jupiter.api.BeforeEach; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048import org.mockito.MockedStatic; 049 050import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 051 052@Tag(ClientTests.TAG) 053@Tag(SmallTests.TAG) 054public class TestBufferedMutatorOverAsyncBufferedMutator { 055 056 private AsyncBufferedMutator asyncMutator; 057 058 private BufferedMutatorOverAsyncBufferedMutator mutator; 059 060 private ExecutorService executor; 061 062 private MockedStatic<Pair> mockedPair; 063 064 @BeforeEach 065 public void setUp() { 066 asyncMutator = mock(AsyncBufferedMutator.class); 067 when(asyncMutator.getWriteBufferSize()).thenReturn(1024L * 1024); 068 mutator = new BufferedMutatorOverAsyncBufferedMutator(asyncMutator, (e, m) -> { 069 throw e; 070 }); 071 executor = 072 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); 073 mockedPair = mockStatic(Pair.class); 074 } 075 076 @AfterEach 077 public void tearDown() { 078 mockedPair.closeOnDemand(); 079 executor.shutdown(); 080 } 081 082 @Test 083 public void testRace() throws IOException { 084 CompletableFuture<Void> future = new CompletableFuture<>(); 085 when(asyncMutator.mutate(anyList())).thenReturn(Arrays.asList(future)); 086 mutator.mutate(new Put(Bytes.toBytes("aaa"))); 087 verify(asyncMutator).mutate(anyList()); 088 CountDownLatch beforeFlush = new CountDownLatch(1); 089 CountDownLatch afterFlush = new CountDownLatch(1); 090 Future<?> flushFuture = executor.submit(() -> { 091 beforeFlush.await(); 092 mutator.flush(); 093 afterFlush.countDown(); 094 return null; 095 }); 096 mockedPair.when(() -> Pair.newPair(any(), any())).then(i -> { 097 beforeFlush.countDown(); 098 afterFlush.await(5, TimeUnit.SECONDS); 099 return i.callRealMethod(); 100 }); 101 future.completeExceptionally(new IOException("inject error")); 102 ExecutionException error = assertThrows(ExecutionException.class, () -> flushFuture.get()); 103 assertThat(error.getCause(), instanceOf(RetriesExhaustedWithDetailsException.class)); 104 assertEquals("inject error", 105 ((RetriesExhaustedWithDetailsException) error.getCause()).getCause(0).getMessage()); 106 } 107}