001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertThat; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import java.util.Optional; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ForkJoinPool; 039import java.util.concurrent.Future; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.TimeoutException; 042import java.util.function.Function; 043import java.util.stream.Collectors; 044import java.util.stream.IntStream; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.coprocessor.ObserverContext; 050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 052import org.apache.hadoop.hbase.coprocessor.RegionObserver; 053import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.junit.After; 058import org.junit.AfterClass; 059import org.junit.Before; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.runner.RunWith; 065import org.junit.runners.Parameterized; 066import org.junit.runners.Parameterized.Parameter; 067import org.junit.runners.Parameterized.Parameters; 068 069@RunWith(Parameterized.class) 070@Category({ LargeTests.class, ClientTests.class }) 071public class TestAsyncTableBatch { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestAsyncTableBatch.class); 076 077 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 078 079 private static TableName TABLE_NAME = TableName.valueOf("async"); 080 081 private static byte[] FAMILY = Bytes.toBytes("cf"); 082 083 private static byte[] CQ = Bytes.toBytes("cq"); 084 private static byte[] CQ1 = Bytes.toBytes("cq1"); 085 086 private static int COUNT = 1000; 087 088 private static AsyncConnection CONN; 089 090 private static byte[][] SPLIT_KEYS; 091 092 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 093 094 @Parameter(0) 095 public String tableType; 096 097 @Parameter(1) 098 public Function<TableName, AsyncTable<?>> tableGetter; 099 100 private static AsyncTable<?> getRawTable(TableName tableName) { 101 return CONN.getTable(tableName); 102 } 103 104 private static AsyncTable<?> getTable(TableName tableName) { 105 return CONN.getTable(tableName, ForkJoinPool.commonPool()); 106 } 107 108 @Parameters(name = "{index}: type={0}") 109 public static List<Object[]> params() { 110 Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable; 111 Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable; 112 return Arrays.asList(new Object[] { "raw", rawTableGetter }, 113 new Object[] { "normal", tableGetter }); 114 } 115 116 @BeforeClass 117 public static void setUp() throws Exception { 118 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 119 MAX_KEY_VALUE_SIZE); 120 TEST_UTIL.startMiniCluster(3); 121 SPLIT_KEYS = new byte[8][]; 122 for (int i = 111; i < 999; i += 111) { 123 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 124 } 125 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 126 } 127 128 @AfterClass 129 public static void tearDown() throws Exception { 130 CONN.close(); 131 TEST_UTIL.shutdownMiniCluster(); 132 } 133 134 @Before 135 public void setUpBeforeTest() throws IOException, InterruptedException { 136 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 137 TEST_UTIL.waitTableAvailable(TABLE_NAME); 138 } 139 140 @After 141 public void tearDownAfterTest() throws IOException { 142 Admin admin = TEST_UTIL.getAdmin(); 143 if (admin.isTableEnabled(TABLE_NAME)) { 144 admin.disableTable(TABLE_NAME); 145 } 146 admin.deleteTable(TABLE_NAME); 147 } 148 149 private byte[] getRow(int i) { 150 return Bytes.toBytes(String.format("%03d", i)); 151 } 152 153 @Test 154 public void test() 155 throws InterruptedException, ExecutionException, IOException, TimeoutException { 156 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 157 table.putAll(IntStream.range(0, COUNT) 158 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 159 .collect(Collectors.toList())).get(); 160 List<Result> results = table.getAll(IntStream.range(0, COUNT) 161 .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) 162 .flatMap(l -> l.stream()).collect(Collectors.toList())).get(); 163 assertEquals(2 * COUNT, results.size()); 164 for (int i = 0; i < COUNT; i++) { 165 assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); 166 assertTrue(results.get(2 * i + 1).isEmpty()); 167 } 168 Admin admin = TEST_UTIL.getAdmin(); 169 admin.flush(TABLE_NAME); 170 List<Future<?>> splitFutures = 171 TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> { 172 byte[] startKey = r.getRegionInfo().getStartKey(); 173 int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); 174 byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); 175 try { 176 return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint); 177 } catch (IOException e) { 178 throw new UncheckedIOException(e); 179 } 180 }).collect(Collectors.toList()); 181 for (Future<?> future : splitFutures) { 182 future.get(30, TimeUnit.SECONDS); 183 } 184 table.deleteAll( 185 IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) 186 .get(); 187 results = table 188 .getAll( 189 IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 190 .get(); 191 assertEquals(COUNT, results.size()); 192 results.forEach(r -> assertTrue(r.isEmpty())); 193 } 194 195 @Test 196 public void testWithRegionServerFailover() throws Exception { 197 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 198 table.putAll(IntStream.range(0, COUNT) 199 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 200 .collect(Collectors.toList())).get(); 201 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); 202 Thread.sleep(100); 203 table.putAll(IntStream.range(COUNT, 2 * COUNT) 204 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 205 .collect(Collectors.toList())).get(); 206 List<Result> results = table.getAll( 207 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 208 .get(); 209 assertEquals(2 * COUNT, results.size()); 210 results.forEach(r -> assertFalse(r.isEmpty())); 211 table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) 212 .collect(Collectors.toList())).get(); 213 results = table.getAll( 214 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 215 .get(); 216 assertEquals(2 * COUNT, results.size()); 217 results.forEach(r -> assertTrue(r.isEmpty())); 218 } 219 220 @Test 221 public void testMixed() throws InterruptedException, ExecutionException, IOException { 222 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 223 table.putAll(IntStream.range(0, 7) 224 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) 225 .collect(Collectors.toList())).get(); 226 List<Row> actions = new ArrayList<>(); 227 actions.add(new Get(Bytes.toBytes(0))); 228 actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L))); 229 actions.add(new Delete(Bytes.toBytes(2))); 230 actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); 231 actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); 232 RowMutations rm = new RowMutations(Bytes.toBytes(5)); 233 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); 234 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); 235 actions.add(rm); 236 actions.add(new Get(Bytes.toBytes(6))); 237 238 List<Object> results = table.batchAll(actions).get(); 239 assertEquals(7, results.size()); 240 Result getResult = (Result) results.get(0); 241 assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 242 assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); 243 assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty()); 244 Result incrementResult = (Result) results.get(3); 245 assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ))); 246 Result appendResult = (Result) results.get(4); 247 byte[] appendValue = appendResult.getValue(FAMILY, CQ); 248 assertEquals(12, appendValue.length); 249 assertEquals(4, Bytes.toLong(appendValue)); 250 assertEquals(4, Bytes.toInt(appendValue, 8)); 251 assertEquals(100, 252 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); 253 assertEquals(200, 254 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); 255 getResult = (Result) results.get(6); 256 assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 257 } 258 259 public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver { 260 261 @Override 262 public Optional<RegionObserver> getRegionObserver() { 263 return Optional.of(this); 264 } 265 266 @Override 267 public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, 268 List<Cell> results) throws IOException { 269 if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) { 270 throw new DoNotRetryRegionException("Inject Error"); 271 } 272 } 273 } 274 275 @Test 276 public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { 277 Admin admin = TEST_UTIL.getAdmin(); 278 TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME)) 279 .setCoprocessor(ErrorInjectObserver.class.getName()).build(); 280 admin.modifyTable(htd); 281 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 282 table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) 283 .collect(Collectors.toList())).get(); 284 List<CompletableFuture<Result>> futures = table 285 .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList())); 286 for (int i = 0; i < SPLIT_KEYS.length - 1; i++) { 287 assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ)); 288 } 289 try { 290 futures.get(SPLIT_KEYS.length - 1).get(); 291 fail(); 292 } catch (ExecutionException e) { 293 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 294 } 295 } 296 297 @Test 298 public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException { 299 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 300 List<CompletableFuture<Object>> futures = table.batch(Arrays.asList( 301 new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ, 302 Bytes.toBytes("bad")), 303 new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1), 304 new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good")))); 305 try { 306 futures.get(0).get(); 307 fail(); 308 } catch (ExecutionException e) { 309 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 310 assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class)); 311 } 312 assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ))); 313 assertTrue(((Result) futures.get(2).get()).isEmpty()); 314 assertEquals("good", 315 Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ))); 316 } 317 318 @Test 319 public void testInvalidPut() { 320 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 321 try { 322 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0)))); 323 fail("Should fail since the put does not contain any cells"); 324 } catch (IllegalArgumentException e) { 325 assertThat(e.getMessage(), containsString("No columns to insert")); 326 } 327 328 try { 329 table.batch( 330 Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 331 new Delete(Bytes.toBytes(0)))); 332 fail("Should fail since the put exceeds the max key value size"); 333 } catch (IllegalArgumentException e) { 334 assertThat(e.getMessage(), containsString("KeyValue size too large")); 335 } 336 } 337 338 @Test 339 public void testWithCheckAndMutate() throws Exception { 340 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 341 342 byte[] row1 = Bytes.toBytes("row1"); 343 byte[] row2 = Bytes.toBytes("row2"); 344 byte[] row3 = Bytes.toBytes("row3"); 345 byte[] row4 = Bytes.toBytes("row4"); 346 byte[] row5 = Bytes.toBytes("row5"); 347 byte[] row6 = Bytes.toBytes("row6"); 348 byte[] row7 = Bytes.toBytes("row7"); 349 350 table.putAll(Arrays.asList( 351 new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 352 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 353 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 354 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 355 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 356 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 357 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))).get(); 358 359 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1) 360 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 361 .build(new RowMutations(row1) 362 .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 363 .add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 364 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 365 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 366 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 367 RowMutations mutations = new RowMutations(row3) 368 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 369 .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 370 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 371 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 372 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4) 373 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 374 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 375 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 376 CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6) 377 .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 378 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 379 CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7) 380 .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 381 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 382 383 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 384 checkAndMutate3, checkAndMutate4); 385 List<Object> results = table.batchAll(actions).get(); 386 387 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0); 388 assertTrue(checkAndMutateResult.isSuccess()); 389 assertEquals(3L, 390 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 391 assertEquals("d", 392 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 393 394 assertEquals("b", 395 Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B")))); 396 397 Result result = (Result) results.get(2); 398 assertTrue(result.getExists()); 399 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 400 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 401 402 checkAndMutateResult = (CheckAndMutateResult) results.get(3); 403 assertFalse(checkAndMutateResult.isSuccess()); 404 assertNull(checkAndMutateResult.getResult()); 405 406 assertTrue(((Result) results.get(4)).isEmpty()); 407 408 checkAndMutateResult = (CheckAndMutateResult) results.get(5); 409 assertTrue(checkAndMutateResult.isSuccess()); 410 assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult() 411 .getValue(FAMILY, Bytes.toBytes("F")))); 412 413 checkAndMutateResult = (CheckAndMutateResult) results.get(6); 414 assertTrue(checkAndMutateResult.isSuccess()); 415 assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult() 416 .getValue(FAMILY, Bytes.toBytes("G")))); 417 418 result = table.get(new Get(row1)).get(); 419 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 420 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 421 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 422 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 423 424 result = table.get(new Get(row3)).get(); 425 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 426 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 427 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 428 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 429 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 430 431 result = table.get(new Get(row4)).get(); 432 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 433 434 result = table.get(new Get(row5)).get(); 435 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 436 437 result = table.get(new Get(row6)).get(); 438 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 439 440 result = table.get(new Get(row7)).get(); 441 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 442 } 443}