001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.junit.Assert.assertArrayEquals; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertThat; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.List; 033import java.util.concurrent.CompletableFuture; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.ThreadLocalRandom; 036import java.util.concurrent.TimeUnit; 037import java.util.stream.Collectors; 038import java.util.stream.IntStream; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 052import org.apache.hbase.thirdparty.io.netty.util.Timeout; 053 054@Category({ MediumTests.class, ClientTests.class }) 055public class TestAsyncBufferMutator { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestAsyncBufferMutator.class); 060 061 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 063 private static TableName TABLE_NAME = TableName.valueOf("async"); 064 065 private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region"); 066 067 private static byte[] CF = Bytes.toBytes("cf"); 068 069 private static byte[] CQ = Bytes.toBytes("cq"); 070 071 private static int COUNT = 100; 072 073 private static byte[] VALUE = new byte[1024]; 074 075 private static AsyncConnection CONN; 076 077 @BeforeClass 078 public static void setUp() throws Exception { 079 TEST_UTIL.startMiniCluster(1); 080 TEST_UTIL.createTable(TABLE_NAME, CF); 081 TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF); 082 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 083 ThreadLocalRandom.current().nextBytes(VALUE); 084 } 085 086 @AfterClass 087 public static void tearDown() throws Exception { 088 CONN.close(); 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 092 @Test 093 public void testWithMultiRegionTable() throws InterruptedException { 094 test(MULTI_REGION_TABLE_NAME); 095 } 096 097 @Test 098 public void testWithSingleRegionTable() throws InterruptedException { 099 test(TABLE_NAME); 100 } 101 102 private void test(TableName tableName) throws InterruptedException { 103 List<CompletableFuture<Void>> futures = new ArrayList<>(); 104 try (AsyncBufferedMutator mutator = 105 CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) { 106 List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2) 107 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) 108 .collect(Collectors.toList())); 109 // exceeded the write buffer size, a flush will be called directly 110 fs.forEach(f -> f.join()); 111 IntStream.range(COUNT / 2, COUNT).forEach(i -> { 112 futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); 113 }); 114 // the first future should have been sent out. 115 futures.get(0).join(); 116 Thread.sleep(2000); 117 // the last one should still be in write buffer 118 assertFalse(futures.get(futures.size() - 1).isDone()); 119 } 120 // mutator.close will call mutator.flush automatically so all tasks should have been done. 121 futures.forEach(f -> f.join()); 122 AsyncTable<?> table = CONN.getTable(tableName); 123 IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) 124 .forEach(r -> { 125 assertArrayEquals(VALUE, r.getValue(CF, CQ)); 126 }); 127 } 128 129 @Test 130 public void testClosedMutate() throws InterruptedException { 131 AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME); 132 mutator.close(); 133 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 134 try { 135 mutator.mutate(put).get(); 136 fail("Close check failed"); 137 } catch (ExecutionException e) { 138 assertThat(e.getCause(), instanceOf(IOException.class)); 139 assertTrue(e.getCause().getMessage().startsWith("Already closed")); 140 } 141 for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) { 142 try { 143 f.get(); 144 fail("Close check failed"); 145 } catch (ExecutionException e) { 146 assertThat(e.getCause(), instanceOf(IOException.class)); 147 assertTrue(e.getCause().getMessage().startsWith("Already closed")); 148 } 149 } 150 } 151 152 @Test 153 public void testNoPeriodicFlush() throws InterruptedException, ExecutionException { 154 try (AsyncBufferedMutator mutator = 155 CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) { 156 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 157 CompletableFuture<?> future = mutator.mutate(put); 158 Thread.sleep(2000); 159 // assert that we have not flushed it out 160 assertFalse(future.isDone()); 161 mutator.flush(); 162 future.get(); 163 } 164 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 165 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 166 } 167 168 @Test 169 public void testPeriodicFlush() throws InterruptedException, ExecutionException { 170 AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME) 171 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build(); 172 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 173 CompletableFuture<?> future = mutator.mutate(put); 174 future.get(); 175 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 176 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 177 } 178 179 // a bit deep into the implementation 180 @Test 181 public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException { 182 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 183 try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN 184 .getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS) 185 .setWriteBufferSize(10 * put.heapSize()).build()) { 186 List<CompletableFuture<?>> futures = new ArrayList<>(); 187 futures.add(mutator.mutate(put)); 188 Timeout task = mutator.periodicFlushTask; 189 // we should have scheduled a periodic flush task 190 assertNotNull(task); 191 for (int i = 1;; i++) { 192 futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); 193 if (mutator.periodicFlushTask == null) { 194 break; 195 } 196 } 197 assertTrue(task.isCancelled()); 198 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); 199 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 200 for (int i = 0; i < futures.size(); i++) { 201 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ)); 202 } 203 } 204 } 205 206 @Test 207 public void testCancelPeriodicFlushByManuallyFlush() 208 throws InterruptedException, ExecutionException { 209 try (AsyncBufferedMutatorImpl mutator = 210 (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) 211 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { 212 CompletableFuture<?> future = 213 mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 214 Timeout task = mutator.periodicFlushTask; 215 // we should have scheduled a periodic flush task 216 assertNotNull(task); 217 mutator.flush(); 218 assertTrue(task.isCancelled()); 219 future.get(); 220 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 221 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 222 } 223 } 224 225 @Test 226 public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException { 227 CompletableFuture<?> future; 228 Timeout task; 229 try (AsyncBufferedMutatorImpl mutator = 230 (AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME) 231 .setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) { 232 future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); 233 task = mutator.periodicFlushTask; 234 // we should have scheduled a periodic flush task 235 assertNotNull(task); 236 } 237 assertTrue(task.isCancelled()); 238 future.get(); 239 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 240 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 241 } 242 243 private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl { 244 245 private int flushCount; 246 247 AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, 248 long writeBufferSize, long periodicFlushTimeoutNs) { 249 super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs); 250 } 251 252 @Override 253 protected void internalFlush() { 254 flushCount++; 255 super.internalFlush(); 256 } 257 } 258 259 @Test 260 public void testRaceBetweenNormalFlushAndPeriodicFlush() 261 throws InterruptedException, ExecutionException { 262 Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); 263 try (AsyncBufferMutatorForTest mutator = 264 new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), 265 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200))) { 266 CompletableFuture<?> future = mutator.mutate(put); 267 Timeout task = mutator.periodicFlushTask; 268 // we should have scheduled a periodic flush task 269 assertNotNull(task); 270 synchronized (mutator) { 271 // synchronized on mutator to prevent periodic flush to be executed 272 Thread.sleep(500); 273 // the timeout should be issued 274 assertTrue(task.isExpired()); 275 // but no flush is issued as we hold the lock 276 assertEquals(0, mutator.flushCount); 277 assertFalse(future.isDone()); 278 // manually flush, then release the lock 279 mutator.flush(); 280 } 281 // this is a bit deep into the implementation in netty but anyway let's add a check here to 282 // confirm that an issued timeout can not be canceled by netty framework. 283 assertFalse(task.isCancelled()); 284 // and the mutation is done 285 future.get(); 286 AsyncTable<?> table = CONN.getTable(TABLE_NAME); 287 assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); 288 // only the manual flush, the periodic flush should have been canceled by us 289 assertEquals(1, mutator.flushCount); 290 } 291 } 292}