001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import java.util.Optional; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ForkJoinPool; 039import java.util.concurrent.Future; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.TimeoutException; 042import java.util.function.Function; 043import java.util.stream.Collectors; 044import java.util.stream.IntStream; 045import org.apache.hadoop.hbase.Cell; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtil; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.coprocessor.ObserverContext; 050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 052import org.apache.hadoop.hbase.coprocessor.RegionObserver; 053import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.junit.After; 058import org.junit.AfterClass; 059import org.junit.Before; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.runner.RunWith; 065import org.junit.runners.Parameterized; 066import org.junit.runners.Parameterized.Parameter; 067import org.junit.runners.Parameterized.Parameters; 068 069@RunWith(Parameterized.class) 070@Category({ LargeTests.class, ClientTests.class }) 071public class TestAsyncTableBatch { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestAsyncTableBatch.class); 076 077 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 078 079 private static TableName TABLE_NAME = TableName.valueOf("async"); 080 081 private static byte[] FAMILY = Bytes.toBytes("cf"); 082 083 private static byte[] CQ = Bytes.toBytes("cq"); 084 private static byte[] CQ1 = Bytes.toBytes("cq1"); 085 086 private static int COUNT = 1000; 087 088 private static AsyncConnection CONN; 089 090 private static byte[][] SPLIT_KEYS; 091 092 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 093 094 @Parameter(0) 095 public String tableType; 096 097 @Parameter(1) 098 public Function<TableName, AsyncTable<?>> tableGetter; 099 100 private static AsyncTable<?> getRawTable(TableName tableName) { 101 return CONN.getTable(tableName); 102 } 103 104 private static AsyncTable<?> getTable(TableName tableName) { 105 return CONN.getTable(tableName, ForkJoinPool.commonPool()); 106 } 107 108 @Parameters(name = "{index}: type={0}") 109 public static List<Object[]> params() { 110 Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable; 111 Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable; 112 return Arrays.asList(new Object[] { "raw", rawTableGetter }, 113 new Object[] { "normal", tableGetter }); 114 } 115 116 @BeforeClass 117 public static void setUp() throws Exception { 118 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 119 MAX_KEY_VALUE_SIZE); 120 TEST_UTIL.startMiniCluster(3); 121 SPLIT_KEYS = new byte[8][]; 122 for (int i = 111; i < 999; i += 111) { 123 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 124 } 125 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 126 } 127 128 @AfterClass 129 public static void tearDown() throws Exception { 130 CONN.close(); 131 TEST_UTIL.shutdownMiniCluster(); 132 } 133 134 @Before 135 public void setUpBeforeTest() throws IOException, InterruptedException { 136 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 137 TEST_UTIL.waitTableAvailable(TABLE_NAME); 138 } 139 140 @After 141 public void tearDownAfterTest() throws IOException { 142 Admin admin = TEST_UTIL.getAdmin(); 143 if (admin.isTableEnabled(TABLE_NAME)) { 144 admin.disableTable(TABLE_NAME); 145 } 146 admin.deleteTable(TABLE_NAME); 147 } 148 149 private byte[] getRow(int i) { 150 return Bytes.toBytes(String.format("%03d", i)); 151 } 152 153 @Test 154 public void test() 155 throws InterruptedException, ExecutionException, IOException, TimeoutException { 156 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 157 table.putAll(IntStream.range(0, COUNT) 158 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 159 .collect(Collectors.toList())).get(); 160 List<Result> results = table.getAll(IntStream.range(0, COUNT) 161 .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) 162 .flatMap(l -> l.stream()).collect(Collectors.toList())).get(); 163 assertEquals(2 * COUNT, results.size()); 164 for (int i = 0; i < COUNT; i++) { 165 assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); 166 assertTrue(results.get(2 * i + 1).isEmpty()); 167 } 168 Admin admin = TEST_UTIL.getAdmin(); 169 admin.flush(TABLE_NAME); 170 List<Future<?>> splitFutures = 171 TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> { 172 byte[] startKey = r.getRegionInfo().getStartKey(); 173 int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); 174 byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); 175 try { 176 return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint); 177 } catch (IOException e) { 178 throw new UncheckedIOException(e); 179 } 180 }).collect(Collectors.toList()); 181 for (Future<?> future : splitFutures) { 182 future.get(30, TimeUnit.SECONDS); 183 } 184 table 185 .deleteAll( 186 IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) 187 .get(); 188 results = table 189 .getAll( 190 IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 191 .get(); 192 assertEquals(COUNT, results.size()); 193 results.forEach(r -> assertTrue(r.isEmpty())); 194 } 195 196 @Test 197 public void testWithRegionServerFailover() throws Exception { 198 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 199 table.putAll(IntStream.range(0, COUNT) 200 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 201 .collect(Collectors.toList())).get(); 202 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); 203 Thread.sleep(100); 204 table.putAll(IntStream.range(COUNT, 2 * COUNT) 205 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 206 .collect(Collectors.toList())).get(); 207 List<Result> results = table.getAll( 208 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 209 .get(); 210 assertEquals(2 * COUNT, results.size()); 211 results.forEach(r -> assertFalse(r.isEmpty())); 212 table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) 213 .collect(Collectors.toList())).get(); 214 results = table.getAll( 215 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 216 .get(); 217 assertEquals(2 * COUNT, results.size()); 218 results.forEach(r -> assertTrue(r.isEmpty())); 219 } 220 221 @Test 222 public void testMixed() throws InterruptedException, ExecutionException, IOException { 223 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 224 table.putAll(IntStream.range(0, 7) 225 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) 226 .collect(Collectors.toList())).get(); 227 List<Row> actions = new ArrayList<>(); 228 actions.add(new Get(Bytes.toBytes(0))); 229 actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L))); 230 actions.add(new Delete(Bytes.toBytes(2))); 231 actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); 232 actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); 233 RowMutations rm = new RowMutations(Bytes.toBytes(5)); 234 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); 235 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); 236 actions.add(rm); 237 actions.add(new Get(Bytes.toBytes(6))); 238 239 List<Object> results = table.batchAll(actions).get(); 240 assertEquals(7, results.size()); 241 Result getResult = (Result) results.get(0); 242 assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 243 assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); 244 assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty()); 245 Result incrementResult = (Result) results.get(3); 246 assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ))); 247 Result appendResult = (Result) results.get(4); 248 byte[] appendValue = appendResult.getValue(FAMILY, CQ); 249 assertEquals(12, appendValue.length); 250 assertEquals(4, Bytes.toLong(appendValue)); 251 assertEquals(4, Bytes.toInt(appendValue, 8)); 252 assertEquals(100, 253 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); 254 assertEquals(200, 255 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); 256 getResult = (Result) results.get(6); 257 assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 258 } 259 260 public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver { 261 262 @Override 263 public Optional<RegionObserver> getRegionObserver() { 264 return Optional.of(this); 265 } 266 267 @Override 268 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get, 269 List<Cell> results) throws IOException { 270 if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) { 271 throw new DoNotRetryRegionException("Inject Error"); 272 } 273 } 274 } 275 276 @Test 277 public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { 278 Admin admin = TEST_UTIL.getAdmin(); 279 TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME)) 280 .setCoprocessor(ErrorInjectObserver.class.getName()).build(); 281 admin.modifyTable(htd); 282 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 283 table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) 284 .collect(Collectors.toList())).get(); 285 List<CompletableFuture<Result>> futures = table 286 .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList())); 287 for (int i = 0; i < SPLIT_KEYS.length - 1; i++) { 288 assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ)); 289 } 290 try { 291 futures.get(SPLIT_KEYS.length - 1).get(); 292 fail(); 293 } catch (ExecutionException e) { 294 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 295 } 296 } 297 298 @Test 299 public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException { 300 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 301 List<CompletableFuture<Object>> futures = table.batch(Arrays.asList( 302 new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ, 303 Bytes.toBytes("bad")), 304 new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1), 305 new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good")))); 306 try { 307 futures.get(0).get(); 308 fail(); 309 } catch (ExecutionException e) { 310 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 311 assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class)); 312 } 313 assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ))); 314 assertTrue(((Result) futures.get(2).get()).isEmpty()); 315 assertEquals("good", 316 Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ))); 317 } 318 319 @Test 320 public void testInvalidMutation() { 321 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 322 323 Mutation[] emptyMutations = 324 { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) }; 325 326 String[] emptyMessages = 327 { "No columns to put", "No columns to increment", "No columns to append" }; 328 329 Mutation[] oversizedMutations = 330 { new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 331 new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 332 new Append(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) }; 333 334 for (int i = 0; i < emptyMutations.length; i++) { 335 // Test empty mutation 336 try { 337 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), emptyMutations[i])); 338 fail("Should fail since the mutation does not contain any cells"); 339 } catch (IllegalArgumentException e) { 340 assertThat(e.getMessage(), containsString(emptyMessages[i])); 341 } 342 343 // Test oversized mutation 344 try { 345 table.batch(Arrays.asList(oversizedMutations[i], new Delete(Bytes.toBytes(0)))); 346 fail("Should fail since the mutation exceeds the max key value size"); 347 } catch (IllegalArgumentException e) { 348 assertThat(e.getMessage(), containsString("KeyValue size too large")); 349 } 350 } 351 } 352 353 @Test 354 public void testInvalidMutationInRowMutations() throws IOException { 355 final byte[] row = Bytes.toBytes(0); 356 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 357 358 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 359 360 String[] emptyMessages = 361 { "No columns to put", "No columns to increment", "No columns to append" }; 362 363 Mutation[] oversizedMutations = 364 { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 365 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 366 new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) }; 367 368 for (int i = 0; i < emptyMutations.length; i++) { 369 // Test empty mutation 370 try { 371 table.batch(Arrays.asList(new Delete(row), new RowMutations(row).add(emptyMutations[i]))); 372 fail("Should fail since the mutation does not contain any cells"); 373 } catch (IllegalArgumentException e) { 374 assertThat(e.getMessage(), containsString(emptyMessages[i])); 375 } 376 377 // Test oversized mutation 378 try { 379 table 380 .batch(Arrays.asList(new RowMutations(row).add(oversizedMutations[i]), new Delete(row))); 381 fail("Should fail since the mutation exceeds the max key value size"); 382 } catch (IllegalArgumentException e) { 383 assertThat(e.getMessage(), containsString("KeyValue size too large")); 384 } 385 } 386 } 387 388 @Test 389 public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException { 390 final byte[] row = Bytes.toBytes(0); 391 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 392 393 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 394 395 String[] emptyMessages = 396 { "No columns to put", "No columns to increment", "No columns to append" }; 397 398 Mutation[] oversizedMutations = 399 { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 400 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 401 new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) }; 402 403 for (int i = 0; i < emptyMutations.length; i++) { 404 // Test empty mutation 405 try { 406 table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row) 407 .ifNotExists(FAMILY, CQ).build(new RowMutations(row).add(emptyMutations[i])))); 408 fail("Should fail since the mutation does not contain any cells"); 409 } catch (IllegalArgumentException e) { 410 assertThat(e.getMessage(), containsString(emptyMessages[i])); 411 } 412 413 // Test oversized mutation 414 try { 415 table.batch(Arrays.asList(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, CQ) 416 .build(new RowMutations(row).add(oversizedMutations[i])), new Delete(row))); 417 fail("Should fail since the mutation exceeds the max key value size"); 418 } catch (IllegalArgumentException e) { 419 assertThat(e.getMessage(), containsString("KeyValue size too large")); 420 } 421 } 422 } 423 424 @Test 425 public void testWithCheckAndMutate() throws Exception { 426 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 427 428 byte[] row1 = Bytes.toBytes("row1"); 429 byte[] row2 = Bytes.toBytes("row2"); 430 byte[] row3 = Bytes.toBytes("row3"); 431 byte[] row4 = Bytes.toBytes("row4"); 432 byte[] row5 = Bytes.toBytes("row5"); 433 byte[] row6 = Bytes.toBytes("row6"); 434 byte[] row7 = Bytes.toBytes("row7"); 435 436 table 437 .putAll(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 438 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 439 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 440 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 441 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 442 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 443 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))) 444 .get(); 445 446 CheckAndMutate checkAndMutate1 = 447 CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 448 .build(new RowMutations(row1) 449 .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 450 .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 451 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 452 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 453 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 454 RowMutations mutations = 455 new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 456 .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 457 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 458 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 459 CheckAndMutate checkAndMutate2 = 460 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 461 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 462 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 463 CheckAndMutate checkAndMutate3 = 464 CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 465 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 466 CheckAndMutate checkAndMutate4 = 467 CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 468 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 469 470 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 471 checkAndMutate3, checkAndMutate4); 472 List<Object> results = table.batchAll(actions).get(); 473 474 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0); 475 assertTrue(checkAndMutateResult.isSuccess()); 476 assertEquals(3L, 477 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 478 assertEquals("d", 479 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 480 481 assertEquals("b", 482 Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B")))); 483 484 Result result = (Result) results.get(2); 485 assertTrue(result.getExists()); 486 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 487 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 488 489 checkAndMutateResult = (CheckAndMutateResult) results.get(3); 490 assertFalse(checkAndMutateResult.isSuccess()); 491 assertNull(checkAndMutateResult.getResult()); 492 493 assertTrue(((Result) results.get(4)).isEmpty()); 494 495 checkAndMutateResult = (CheckAndMutateResult) results.get(5); 496 assertTrue(checkAndMutateResult.isSuccess()); 497 assertEquals(11, 498 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F")))); 499 500 checkAndMutateResult = (CheckAndMutateResult) results.get(6); 501 assertTrue(checkAndMutateResult.isSuccess()); 502 assertEquals("gg", 503 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G")))); 504 505 result = table.get(new Get(row1)).get(); 506 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 507 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 508 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 509 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 510 511 result = table.get(new Get(row3)).get(); 512 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 513 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 514 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 515 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 516 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 517 518 result = table.get(new Get(row4)).get(); 519 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 520 521 result = table.get(new Get(row5)).get(); 522 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 523 524 result = table.get(new Get(row6)).get(); 525 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 526 527 result = table.get(new Get(row7)).get(); 528 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 529 } 530}