001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertArrayEquals; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNotNull; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027import static org.junit.jupiter.api.Assertions.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.TimeUnit; 036import java.util.stream.Collectors; 037import java.util.stream.IntStream; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.jupiter.api.AfterAll; 044import org.junit.jupiter.api.BeforeAll; 045import org.junit.jupiter.api.Tag; 046import org.junit.jupiter.api.Test; 047 048import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 049import org.apache.hbase.thirdparty.io.netty.util.Timeout; 050 051@Tag(MediumTests.TAG) 052@Tag(ClientTests.TAG) 053public class TestAsyncBufferMutator { 054 055 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 056 057 private static TableName TABLE_NAME = TableName.valueOf("async"); 058 059 private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region"); 060 061 private static byte[] CF = Bytes.toBytes("cf"); 062 063 private static byte[] CQ = Bytes.toBytes("cq"); 064 065 private static int COUNT = 100; 066 067 private static byte[] VALUE = new byte[1024]; 068 069 private static AsyncConnection CONN; 070 071 @BeforeAll 072 public static void setUp() throws Exception { 073 TEST_UTIL.startMiniCluster(1); 074 TEST_UTIL.createTable(TABLE_NAME, CF); 075 TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF); 076 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 077 Bytes.random(VALUE); 078 } 079 080 @AfterAll 081 public static void tearDown() throws Exception { 082 CONN.close(); 083 TEST_UTIL.shutdownMiniCluster(); 084 } 085 086 @Test 087 public void testWithMultiRegionTable() throws InterruptedException { 088 test(MULTI_REGION_TABLE_NAME); 089 } 090 091 @Test 092 public void testWithSingleRegionTable() throws InterruptedException { 093 test(TABLE_NAME); 094 } 095 096 private void test(TableName tableName) throws InterruptedException { 097 List<CompletableFuture<Void>> futures = new ArrayList<>(); 098 try (AsyncBufferedMutator mutator = 099 CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) { 100 List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2) 101 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 102 .collect(Collectors.toList())); 103 // exceeded the write buffer size, a flush will be called directly 104 fs.forEach(f -> f.join()); 105 IntStream.range(COUNT / 2, COUNT).forEach(i -> { 106 futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); 107 }); 108 // the first future should have been sent out. 109 futures.get(0).join(); 110 Thread.sleep(2000); 111 // the last one should still be in write buffer 112 assertFalse(futures.get(futures.size() - 1).isDone()); 113 } 114 // mutator.close will call mutator.flush automatically so all tasks should have been done. 115 futures.forEach(f -> f.join()); 116 AsyncTable<?> table = CONN.getTable(tableName); 117 IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) 118 .forEach(r -> { 119 assertArrayEquals(VALUE, r.getValue(CF, CQ)); 120 }); 121 } 122 123 @Test 124 public void testClosedMutate() throws InterruptedException { 125 AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME); 126 mutator.close(); 127 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 128 try { 129 mutator.mutate(put).get(); 130 fail("Close check failed"); 131 } catch (ExecutionException e) { 132 assertThat(e.getCause(), instanceOf(IOException.class)); 133 assertTrue(e.getCause().getMessage().startsWith("Already closed")); 134 } 135 for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) { 136 try { 137 f.get(); 138 fail("Close check failed"); 139 } catch (ExecutionException e) { 140 assertThat(e.getCause(), instanceOf(IOException.class)); 141 assertTrue(e.getCause().getMessage().startsWith("Already closed")); 142 } 143 } 144 } 145 146 @Test 147 public void testNoPeriodicFlush() throws InterruptedException, ExecutionException { 148 try (AsyncBufferedMutator mutator = 149 CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) { 150 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 151 CompletableFuture<?> future = mutator.mutate(put); 152 Thread.sleep(2000); 153 // assert that we have not flushed it out 154 assertFalse(future.isDone()); 155 mutator.flush(); 156 future.get(); 157 } 158 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 159 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 160 } 161 162 @Test 163 public void testPeriodicFlush() throws InterruptedException, ExecutionException { 164 AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) 165 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build(); 166 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 167 CompletableFuture<?> future = mutator.mutate(put); 168 future.get(); 169 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 170 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 171 } 172 173 @Test 174 public void testMaxMutationsFlush() throws InterruptedException, ExecutionException { 175 AsyncBufferedMutator mutator = 176 CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build(); 177 CompletableFuture<?> future1 = 178 mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 179 CompletableFuture<?> future2 = 180 mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE)); 181 CompletableFuture<?> future3 = 182 mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE)); 183 CompletableFuture.allOf(future1, future2, future3).join(); 184 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 185 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 186 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).get().getValue(CF, CQ)); 187 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).get().getValue(CF, CQ)); 188 } 189 190 // a bit deep into the implementation 191 @Test 192 public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException { 193 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 194 try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN 195 .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS) 196 .setWriteBufferSize(10 * put.heapSize()).build()) { 197 List<CompletableFuture<?>> futures = new ArrayList<>(); 198 futures.add(mutator.mutate(put)); 199 Timeout task = mutator.periodicFlushTask; 200 // we should have scheduled a periodic flush task 201 assertNotNull(task); 202 for (int i = 1;; i++) { 203 futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); 204 if (mutator.periodicFlushTask == null) { 205 break; 206 } 207 } 208 assertTrue(task.isCancelled()); 209 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); 210 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 211 for (int i = 0; i < futures.size(); i++) { 212 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ)); 213 } 214 } 215 } 216 217 @Test 218 public void testCancelPeriodicFlushByManuallyFlush() 219 throws InterruptedException, ExecutionException { 220 try (AsyncBufferedMutatorImpl mutator = 221 (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) 222 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { 223 CompletableFuture<?> future = 224 mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 225 Timeout task = mutator.periodicFlushTask; 226 // we should have scheduled a periodic flush task 227 assertNotNull(task); 228 mutator.flush(); 229 assertTrue(task.isCancelled()); 230 future.get(); 231 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 232 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 233 } 234 } 235 236 @Test 237 public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException { 238 CompletableFuture<?> future; 239 Timeout task; 240 try (AsyncBufferedMutatorImpl mutator = 241 (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) 242 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { 243 future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 244 task = mutator.periodicFlushTask; 245 // we should have scheduled a periodic flush task 246 assertNotNull(task); 247 } 248 assertTrue(task.isCancelled()); 249 future.get(); 250 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 251 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 252 } 253 254 private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl { 255 256 private int drainCount; 257 258 AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, 259 long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) { 260 super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize, 261 maxMutation); 262 } 263 264 @Override 265 protected Batch drainBatch() { 266 drainCount++; 267 return super.drainBatch(); 268 } 269 } 270 271 @Test 272 public void testRaceBetweenNormalFlushAndPeriodicFlush() 273 throws InterruptedException, ExecutionException { 274 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 275 try (AsyncBufferMutatorForTest mutator = 276 new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), 277 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 100)) { 278 CompletableFuture<?> future = mutator.mutate(put); 279 Timeout task = mutator.periodicFlushTask; 280 // we should have scheduled a periodic flush task 281 assertNotNull(task); 282 // get the lock toprevent periodic flush to be executed 283 mutator.lock.lock(); 284 try { 285 Thread.sleep(500); 286 // the timeout should be issued 287 assertTrue(task.isExpired()); 288 // but no drain is issued as we hold the lock 289 assertEquals(0, mutator.drainCount); 290 assertFalse(future.isDone()); 291 // manually flush and drain, then release the lock 292 mutator.flush(); 293 } finally { 294 mutator.lock.unlock(); 295 } 296 // this is a bit deep into the implementation in netty but anyway let's add a check here to 297 // confirm that an issued timeout can not be canceled by netty framework. 298 assertFalse(task.isCancelled()); 299 // and the mutation is done 300 future.get(); 301 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 302 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 303 // only the manual flush, the periodic flush should have been canceled by us 304 assertEquals(1, mutator.drainCount); 305 } 306 } 307}