001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.junit.jupiter.api.Assertions.assertArrayEquals; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertNull; 027import static org.junit.jupiter.api.Assertions.assertTrue; 028import static org.junit.jupiter.api.Assertions.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.List; 035import java.util.Optional; 036import java.util.concurrent.CompletableFuture; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ForkJoinPool; 039import java.util.concurrent.Future; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.TimeoutException; 042import java.util.function.Function; 043import java.util.stream.Collectors; 044import java.util.stream.IntStream; 045import java.util.stream.Stream; 046import org.apache.hadoop.hbase.Cell; 047import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 048import org.apache.hadoop.hbase.HBaseTestingUtil; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.coprocessor.ObserverContext; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 053import org.apache.hadoop.hbase.coprocessor.RegionObserver; 054import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.jupiter.api.AfterAll; 059import org.junit.jupiter.api.AfterEach; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.TestTemplate; 064import org.junit.jupiter.params.provider.Arguments; 065 066@HBaseParameterizedTestTemplate(name = "{index}: type={0}") 067@Tag(LargeTests.TAG) 068@Tag(ClientTests.TAG) 069public class TestAsyncTableBatch { 070 071 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 072 073 private static TableName TABLE_NAME = TableName.valueOf("async"); 074 075 private static byte[] FAMILY = Bytes.toBytes("cf"); 076 077 private static byte[] CQ = Bytes.toBytes("cq"); 078 private static byte[] CQ1 = Bytes.toBytes("cq1"); 079 080 private static int COUNT = 1000; 081 082 private static AsyncConnection CONN; 083 084 private static byte[][] SPLIT_KEYS; 085 086 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 087 088 private final Function<TableName, AsyncTable<?>> tableGetter; 089 090 public TestAsyncTableBatch(String tableType, Function<TableName, AsyncTable<?>> tableGetter) { 091 this.tableGetter = tableGetter; 092 } 093 094 private static AsyncTable<?> getRawTable(TableName tableName) { 095 return CONN.getTable(tableName); 096 } 097 098 private static AsyncTable<?> getTable(TableName tableName) { 099 return CONN.getTable(tableName, ForkJoinPool.commonPool()); 100 } 101 102 public static Stream<Arguments> parameters() { 103 Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable; 104 Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable; 105 return Stream.of(Arguments.of("raw", rawTableGetter), Arguments.of("normal", tableGetter)); 106 } 107 108 @BeforeAll 109 public static void setUp() throws Exception { 110 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 111 MAX_KEY_VALUE_SIZE); 112 TEST_UTIL.startMiniCluster(3); 113 SPLIT_KEYS = new byte[8][]; 114 for (int i = 111; i < 999; i += 111) { 115 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 116 } 117 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 118 } 119 120 @AfterAll 121 public static void tearDown() throws Exception { 122 CONN.close(); 123 TEST_UTIL.shutdownMiniCluster(); 124 } 125 126 @BeforeEach 127 public void setUpBeforeTest() throws IOException, InterruptedException { 128 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 129 TEST_UTIL.waitTableAvailable(TABLE_NAME); 130 } 131 132 @AfterEach 133 public void tearDownAfterTest() throws IOException { 134 Admin admin = TEST_UTIL.getAdmin(); 135 if (admin.isTableEnabled(TABLE_NAME)) { 136 admin.disableTable(TABLE_NAME); 137 } 138 admin.deleteTable(TABLE_NAME); 139 } 140 141 private byte[] getRow(int i) { 142 return Bytes.toBytes(String.format("%03d", i)); 143 } 144 145 @TestTemplate 146 public void test() 147 throws InterruptedException, ExecutionException, IOException, TimeoutException { 148 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 149 table.putAll(IntStream.range(0, COUNT) 150 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 151 .collect(Collectors.toList())).get(); 152 List<Result> results = table.getAll(IntStream.range(0, COUNT) 153 .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4)))) 154 .flatMap(l -> l.stream()).collect(Collectors.toList())).get(); 155 assertEquals(2 * COUNT, results.size()); 156 for (int i = 0; i < COUNT; i++) { 157 assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ))); 158 assertTrue(results.get(2 * i + 1).isEmpty()); 159 } 160 Admin admin = TEST_UTIL.getAdmin(); 161 admin.flush(TABLE_NAME); 162 List<Future<?>> splitFutures = 163 TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> { 164 byte[] startKey = r.getRegionInfo().getStartKey(); 165 int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey)); 166 byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55)); 167 try { 168 return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint); 169 } catch (IOException e) { 170 throw new UncheckedIOException(e); 171 } 172 }).collect(Collectors.toList()); 173 for (Future<?> future : splitFutures) { 174 future.get(30, TimeUnit.SECONDS); 175 } 176 table 177 .deleteAll( 178 IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList())) 179 .get(); 180 results = table 181 .getAll( 182 IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 183 .get(); 184 assertEquals(COUNT, results.size()); 185 results.forEach(r -> assertTrue(r.isEmpty())); 186 } 187 188 @TestTemplate 189 public void testWithRegionServerFailover() throws Exception { 190 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 191 table.putAll(IntStream.range(0, COUNT) 192 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 193 .collect(Collectors.toList())).get(); 194 TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests"); 195 Thread.sleep(100); 196 table.putAll(IntStream.range(COUNT, 2 * COUNT) 197 .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 198 .collect(Collectors.toList())).get(); 199 List<Result> results = table.getAll( 200 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 201 .get(); 202 assertEquals(2 * COUNT, results.size()); 203 results.forEach(r -> assertFalse(r.isEmpty())); 204 table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i))) 205 .collect(Collectors.toList())).get(); 206 results = table.getAll( 207 IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList())) 208 .get(); 209 assertEquals(2 * COUNT, results.size()); 210 results.forEach(r -> assertTrue(r.isEmpty())); 211 } 212 213 @TestTemplate 214 public void testMixed() throws InterruptedException, ExecutionException, IOException { 215 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 216 table.putAll(IntStream.range(0, 7) 217 .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i))) 218 .collect(Collectors.toList())).get(); 219 List<Row> actions = new ArrayList<>(); 220 actions.add(new Get(Bytes.toBytes(0))); 221 actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L))); 222 actions.add(new Delete(Bytes.toBytes(2))); 223 actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1)); 224 actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4))); 225 RowMutations rm = new RowMutations(Bytes.toBytes(5)); 226 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L))); 227 rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L))); 228 actions.add(rm); 229 actions.add(new Get(Bytes.toBytes(6))); 230 231 List<Object> results = table.batchAll(actions).get(); 232 assertEquals(7, results.size()); 233 Result getResult = (Result) results.get(0); 234 assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 235 assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ))); 236 assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty()); 237 Result incrementResult = (Result) results.get(3); 238 assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ))); 239 Result appendResult = (Result) results.get(4); 240 byte[] appendValue = appendResult.getValue(FAMILY, CQ); 241 assertEquals(12, appendValue.length); 242 assertEquals(4, Bytes.toLong(appendValue)); 243 assertEquals(4, Bytes.toInt(appendValue, 8)); 244 assertEquals(100, 245 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ))); 246 assertEquals(200, 247 Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1))); 248 getResult = (Result) results.get(6); 249 assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ))); 250 } 251 252 public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver { 253 254 @Override 255 public Optional<RegionObserver> getRegionObserver() { 256 return Optional.of(this); 257 } 258 259 @Override 260 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get, 261 List<Cell> results) throws IOException { 262 if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) { 263 throw new DoNotRetryRegionException("Inject Error"); 264 } 265 } 266 } 267 268 @TestTemplate 269 public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException { 270 Admin admin = TEST_UTIL.getAdmin(); 271 TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME)) 272 .setCoprocessor(ErrorInjectObserver.class.getName()).build(); 273 admin.modifyTable(htd); 274 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 275 table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k)) 276 .collect(Collectors.toList())).get(); 277 List<CompletableFuture<Result>> futures = table 278 .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList())); 279 for (int i = 0; i < SPLIT_KEYS.length - 1; i++) { 280 assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ)); 281 } 282 try { 283 futures.get(SPLIT_KEYS.length - 1).get(); 284 fail(); 285 } catch (ExecutionException e) { 286 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 287 } 288 } 289 290 @TestTemplate 291 public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException { 292 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 293 List<CompletableFuture<Object>> futures = table.batch(Arrays.asList( 294 new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ, 295 Bytes.toBytes("bad")), 296 new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1), 297 new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good")))); 298 try { 299 futures.get(0).get(); 300 fail(); 301 } catch (ExecutionException e) { 302 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 303 assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class)); 304 } 305 assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ))); 306 assertTrue(((Result) futures.get(2).get()).isEmpty()); 307 assertEquals("good", 308 Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ))); 309 } 310 311 @TestTemplate 312 public void testInvalidMutation() { 313 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 314 315 Mutation[] emptyMutations = 316 { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) }; 317 318 String[] emptyMessages = 319 { "No columns to put", "No columns to increment", "No columns to append" }; 320 321 Mutation[] oversizedMutations = 322 { new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 323 new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 324 new Append(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) }; 325 326 for (int i = 0; i < emptyMutations.length; i++) { 327 // Test empty mutation 328 try { 329 table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), emptyMutations[i])); 330 fail("Should fail since the mutation does not contain any cells"); 331 } catch (IllegalArgumentException e) { 332 assertThat(e.getMessage(), containsString(emptyMessages[i])); 333 } 334 335 // Test oversized mutation 336 try { 337 table.batch(Arrays.asList(oversizedMutations[i], new Delete(Bytes.toBytes(0)))); 338 fail("Should fail since the mutation exceeds the max key value size"); 339 } catch (IllegalArgumentException e) { 340 assertThat(e.getMessage(), containsString("KeyValue size too large")); 341 } 342 } 343 } 344 345 @TestTemplate 346 public void testInvalidMutationInRowMutations() throws IOException { 347 final byte[] row = Bytes.toBytes(0); 348 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 349 350 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 351 352 String[] emptyMessages = 353 { "No columns to put", "No columns to increment", "No columns to append" }; 354 355 Mutation[] oversizedMutations = 356 { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 357 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 358 new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) }; 359 360 for (int i = 0; i < emptyMutations.length; i++) { 361 // Test empty mutation 362 try { 363 table.batch(Arrays.asList(new Delete(row), new RowMutations(row).add(emptyMutations[i]))); 364 fail("Should fail since the mutation does not contain any cells"); 365 } catch (IllegalArgumentException e) { 366 assertThat(e.getMessage(), containsString(emptyMessages[i])); 367 } 368 369 // Test oversized mutation 370 try { 371 table 372 .batch(Arrays.asList(new RowMutations(row).add(oversizedMutations[i]), new Delete(row))); 373 fail("Should fail since the mutation exceeds the max key value size"); 374 } catch (IllegalArgumentException e) { 375 assertThat(e.getMessage(), containsString("KeyValue size too large")); 376 } 377 } 378 } 379 380 @TestTemplate 381 public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException { 382 final byte[] row = Bytes.toBytes(0); 383 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 384 385 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 386 387 String[] emptyMessages = 388 { "No columns to put", "No columns to increment", "No columns to append" }; 389 390 Mutation[] oversizedMutations = 391 { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]), 392 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 393 new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) }; 394 395 for (int i = 0; i < emptyMutations.length; i++) { 396 // Test empty mutation 397 try { 398 table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row) 399 .ifNotExists(FAMILY, CQ).build(new RowMutations(row).add(emptyMutations[i])))); 400 fail("Should fail since the mutation does not contain any cells"); 401 } catch (IllegalArgumentException e) { 402 assertThat(e.getMessage(), containsString(emptyMessages[i])); 403 } 404 405 // Test oversized mutation 406 try { 407 table.batch(Arrays.asList(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, CQ) 408 .build(new RowMutations(row).add(oversizedMutations[i])), new Delete(row))); 409 fail("Should fail since the mutation exceeds the max key value size"); 410 } catch (IllegalArgumentException e) { 411 assertThat(e.getMessage(), containsString("KeyValue size too large")); 412 } 413 } 414 } 415 416 @TestTemplate 417 public void testWithCheckAndMutate() throws Exception { 418 AsyncTable<?> table = tableGetter.apply(TABLE_NAME); 419 420 byte[] row1 = Bytes.toBytes("row1"); 421 byte[] row2 = Bytes.toBytes("row2"); 422 byte[] row3 = Bytes.toBytes("row3"); 423 byte[] row4 = Bytes.toBytes("row4"); 424 byte[] row5 = Bytes.toBytes("row5"); 425 byte[] row6 = Bytes.toBytes("row6"); 426 byte[] row7 = Bytes.toBytes("row7"); 427 428 table 429 .putAll(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 430 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 431 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 432 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 433 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 434 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)), 435 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))) 436 .get(); 437 438 CheckAndMutate checkAndMutate1 = 439 CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 440 .build(new RowMutations(row1) 441 .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g"))) 442 .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A"))) 443 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L)) 444 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))); 445 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B")); 446 RowMutations mutations = 447 new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))) 448 .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 449 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L)) 450 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 451 CheckAndMutate checkAndMutate2 = 452 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a")) 453 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h"))); 454 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f")); 455 CheckAndMutate checkAndMutate3 = 456 CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)) 457 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1)); 458 CheckAndMutate checkAndMutate4 = 459 CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")) 460 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))); 461 462 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put, 463 checkAndMutate3, checkAndMutate4); 464 List<Object> results = table.batchAll(actions).get(); 465 466 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0); 467 assertTrue(checkAndMutateResult.isSuccess()); 468 assertEquals(3L, 469 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C")))); 470 assertEquals("d", 471 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D")))); 472 473 assertEquals("b", 474 Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B")))); 475 476 Result result = (Result) results.get(2); 477 assertTrue(result.getExists()); 478 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 479 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 480 481 checkAndMutateResult = (CheckAndMutateResult) results.get(3); 482 assertFalse(checkAndMutateResult.isSuccess()); 483 assertNull(checkAndMutateResult.getResult()); 484 485 assertTrue(((Result) results.get(4)).isEmpty()); 486 487 checkAndMutateResult = (CheckAndMutateResult) results.get(5); 488 assertTrue(checkAndMutateResult.isSuccess()); 489 assertEquals(11, 490 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F")))); 491 492 checkAndMutateResult = (CheckAndMutateResult) results.get(6); 493 assertTrue(checkAndMutateResult.isSuccess()); 494 assertEquals("gg", 495 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G")))); 496 497 result = table.get(new Get(row1)).get(); 498 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 499 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 500 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 501 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 502 503 result = table.get(new Get(row3)).get(); 504 assertNull(result.getValue(FAMILY, Bytes.toBytes("C"))); 505 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 506 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 507 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A")))); 508 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 509 510 result = table.get(new Get(row4)).get(); 511 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 512 513 result = table.get(new Get(row5)).get(); 514 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E")))); 515 516 result = table.get(new Get(row6)).get(); 517 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F")))); 518 519 result = table.get(new Get(row7)).get(); 520 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G")))); 521 } 522}