001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.jupiter.api.Assertions.assertArrayEquals;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertNull;
027import static org.junit.jupiter.api.Assertions.assertTrue;
028import static org.junit.jupiter.api.Assertions.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035import java.util.Optional;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ForkJoinPool;
039import java.util.concurrent.Future;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.TimeoutException;
042import java.util.function.Function;
043import java.util.stream.Collectors;
044import java.util.stream.IntStream;
045import java.util.stream.Stream;
046import org.apache.hadoop.hbase.Cell;
047import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
048import org.apache.hadoop.hbase.HBaseTestingUtil;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.ObserverContext;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.coprocessor.RegionObserver;
054import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.jupiter.api.AfterAll;
059import org.junit.jupiter.api.AfterEach;
060import org.junit.jupiter.api.BeforeAll;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.TestTemplate;
064import org.junit.jupiter.params.provider.Arguments;
065
066@HBaseParameterizedTestTemplate(name = "{index}: type={0}")
067@Tag(LargeTests.TAG)
068@Tag(ClientTests.TAG)
069public class TestAsyncTableBatch {
070
071  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
072
073  private static TableName TABLE_NAME = TableName.valueOf("async");
074
075  private static byte[] FAMILY = Bytes.toBytes("cf");
076
077  private static byte[] CQ = Bytes.toBytes("cq");
078  private static byte[] CQ1 = Bytes.toBytes("cq1");
079
080  private static int COUNT = 1000;
081
082  private static AsyncConnection CONN;
083
084  private static byte[][] SPLIT_KEYS;
085
086  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
087
088  private final Function<TableName, AsyncTable<?>> tableGetter;
089
090  public TestAsyncTableBatch(String tableType, Function<TableName, AsyncTable<?>> tableGetter) {
091    this.tableGetter = tableGetter;
092  }
093
094  private static AsyncTable<?> getRawTable(TableName tableName) {
095    return CONN.getTable(tableName);
096  }
097
098  private static AsyncTable<?> getTable(TableName tableName) {
099    return CONN.getTable(tableName, ForkJoinPool.commonPool());
100  }
101
102  public static Stream<Arguments> parameters() {
103    Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable;
104    Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable;
105    return Stream.of(Arguments.of("raw", rawTableGetter), Arguments.of("normal", tableGetter));
106  }
107
108  @BeforeAll
109  public static void setUp() throws Exception {
110    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
111      MAX_KEY_VALUE_SIZE);
112    TEST_UTIL.startMiniCluster(3);
113    SPLIT_KEYS = new byte[8][];
114    for (int i = 111; i < 999; i += 111) {
115      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
116    }
117    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
118  }
119
120  @AfterAll
121  public static void tearDown() throws Exception {
122    CONN.close();
123    TEST_UTIL.shutdownMiniCluster();
124  }
125
126  @BeforeEach
127  public void setUpBeforeTest() throws IOException, InterruptedException {
128    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
129    TEST_UTIL.waitTableAvailable(TABLE_NAME);
130  }
131
132  @AfterEach
133  public void tearDownAfterTest() throws IOException {
134    Admin admin = TEST_UTIL.getAdmin();
135    if (admin.isTableEnabled(TABLE_NAME)) {
136      admin.disableTable(TABLE_NAME);
137    }
138    admin.deleteTable(TABLE_NAME);
139  }
140
141  private byte[] getRow(int i) {
142    return Bytes.toBytes(String.format("%03d", i));
143  }
144
145  @TestTemplate
146  public void test()
147    throws InterruptedException, ExecutionException, IOException, TimeoutException {
148    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
149    table.putAll(IntStream.range(0, COUNT)
150      .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
151      .collect(Collectors.toList())).get();
152    List<Result> results = table.getAll(IntStream.range(0, COUNT)
153      .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
154      .flatMap(l -> l.stream()).collect(Collectors.toList())).get();
155    assertEquals(2 * COUNT, results.size());
156    for (int i = 0; i < COUNT; i++) {
157      assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
158      assertTrue(results.get(2 * i + 1).isEmpty());
159    }
160    Admin admin = TEST_UTIL.getAdmin();
161    admin.flush(TABLE_NAME);
162    List<Future<?>> splitFutures =
163      TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> {
164        byte[] startKey = r.getRegionInfo().getStartKey();
165        int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
166        byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
167        try {
168          return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint);
169        } catch (IOException e) {
170          throw new UncheckedIOException(e);
171        }
172      }).collect(Collectors.toList());
173    for (Future<?> future : splitFutures) {
174      future.get(30, TimeUnit.SECONDS);
175    }
176    table
177      .deleteAll(
178        IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
179      .get();
180    results = table
181      .getAll(
182        IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
183      .get();
184    assertEquals(COUNT, results.size());
185    results.forEach(r -> assertTrue(r.isEmpty()));
186  }
187
188  @TestTemplate
189  public void testWithRegionServerFailover() throws Exception {
190    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
191    table.putAll(IntStream.range(0, COUNT)
192      .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
193      .collect(Collectors.toList())).get();
194    TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests");
195    Thread.sleep(100);
196    table.putAll(IntStream.range(COUNT, 2 * COUNT)
197      .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
198      .collect(Collectors.toList())).get();
199    List<Result> results = table.getAll(
200      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
201      .get();
202    assertEquals(2 * COUNT, results.size());
203    results.forEach(r -> assertFalse(r.isEmpty()));
204    table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i)))
205      .collect(Collectors.toList())).get();
206    results = table.getAll(
207      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
208      .get();
209    assertEquals(2 * COUNT, results.size());
210    results.forEach(r -> assertTrue(r.isEmpty()));
211  }
212
213  @TestTemplate
214  public void testMixed() throws InterruptedException, ExecutionException, IOException {
215    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
216    table.putAll(IntStream.range(0, 7)
217      .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
218      .collect(Collectors.toList())).get();
219    List<Row> actions = new ArrayList<>();
220    actions.add(new Get(Bytes.toBytes(0)));
221    actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L)));
222    actions.add(new Delete(Bytes.toBytes(2)));
223    actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
224    actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
225    RowMutations rm = new RowMutations(Bytes.toBytes(5));
226    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
227    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
228    actions.add(rm);
229    actions.add(new Get(Bytes.toBytes(6)));
230
231    List<Object> results = table.batchAll(actions).get();
232    assertEquals(7, results.size());
233    Result getResult = (Result) results.get(0);
234    assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
235    assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
236    assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
237    Result incrementResult = (Result) results.get(3);
238    assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
239    Result appendResult = (Result) results.get(4);
240    byte[] appendValue = appendResult.getValue(FAMILY, CQ);
241    assertEquals(12, appendValue.length);
242    assertEquals(4, Bytes.toLong(appendValue));
243    assertEquals(4, Bytes.toInt(appendValue, 8));
244    assertEquals(100,
245      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
246    assertEquals(200,
247      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
248    getResult = (Result) results.get(6);
249    assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
250  }
251
252  public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
253
254    @Override
255    public Optional<RegionObserver> getRegionObserver() {
256      return Optional.of(this);
257    }
258
259    @Override
260    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
261      List<Cell> results) throws IOException {
262      if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
263        throw new DoNotRetryRegionException("Inject Error");
264      }
265    }
266  }
267
268  @TestTemplate
269  public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
270    Admin admin = TEST_UTIL.getAdmin();
271    TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME))
272      .setCoprocessor(ErrorInjectObserver.class.getName()).build();
273    admin.modifyTable(htd);
274    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
275    table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
276      .collect(Collectors.toList())).get();
277    List<CompletableFuture<Result>> futures = table
278      .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
279    for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
280      assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
281    }
282    try {
283      futures.get(SPLIT_KEYS.length - 1).get();
284      fail();
285    } catch (ExecutionException e) {
286      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
287    }
288  }
289
290  @TestTemplate
291  public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException {
292    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
293    List<CompletableFuture<Object>> futures = table.batch(Arrays.asList(
294      new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ,
295        Bytes.toBytes("bad")),
296      new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1),
297      new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good"))));
298    try {
299      futures.get(0).get();
300      fail();
301    } catch (ExecutionException e) {
302      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
303      assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class));
304    }
305    assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ)));
306    assertTrue(((Result) futures.get(2).get()).isEmpty());
307    assertEquals("good",
308      Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
309  }
310
311  @TestTemplate
312  public void testInvalidMutation() {
313    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
314
315    Mutation[] emptyMutations =
316      { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) };
317
318    String[] emptyMessages =
319      { "No columns to put", "No columns to increment", "No columns to append" };
320
321    Mutation[] oversizedMutations =
322      { new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
323        new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
324        new Append(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) };
325
326    for (int i = 0; i < emptyMutations.length; i++) {
327      // Test empty mutation
328      try {
329        table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), emptyMutations[i]));
330        fail("Should fail since the mutation does not contain any cells");
331      } catch (IllegalArgumentException e) {
332        assertThat(e.getMessage(), containsString(emptyMessages[i]));
333      }
334
335      // Test oversized mutation
336      try {
337        table.batch(Arrays.asList(oversizedMutations[i], new Delete(Bytes.toBytes(0))));
338        fail("Should fail since the mutation exceeds the max key value size");
339      } catch (IllegalArgumentException e) {
340        assertThat(e.getMessage(), containsString("KeyValue size too large"));
341      }
342    }
343  }
344
345  @TestTemplate
346  public void testInvalidMutationInRowMutations() throws IOException {
347    final byte[] row = Bytes.toBytes(0);
348    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
349
350    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
351
352    String[] emptyMessages =
353      { "No columns to put", "No columns to increment", "No columns to append" };
354
355    Mutation[] oversizedMutations =
356      { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
357        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
358        new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) };
359
360    for (int i = 0; i < emptyMutations.length; i++) {
361      // Test empty mutation
362      try {
363        table.batch(Arrays.asList(new Delete(row), new RowMutations(row).add(emptyMutations[i])));
364        fail("Should fail since the mutation does not contain any cells");
365      } catch (IllegalArgumentException e) {
366        assertThat(e.getMessage(), containsString(emptyMessages[i]));
367      }
368
369      // Test oversized mutation
370      try {
371        table
372          .batch(Arrays.asList(new RowMutations(row).add(oversizedMutations[i]), new Delete(row)));
373        fail("Should fail since the mutation exceeds the max key value size");
374      } catch (IllegalArgumentException e) {
375        assertThat(e.getMessage(), containsString("KeyValue size too large"));
376      }
377    }
378  }
379
380  @TestTemplate
381  public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException {
382    final byte[] row = Bytes.toBytes(0);
383    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
384
385    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
386
387    String[] emptyMessages =
388      { "No columns to put", "No columns to increment", "No columns to append" };
389
390    Mutation[] oversizedMutations =
391      { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
392        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
393        new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) };
394
395    for (int i = 0; i < emptyMutations.length; i++) {
396      // Test empty mutation
397      try {
398        table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row)
399          .ifNotExists(FAMILY, CQ).build(new RowMutations(row).add(emptyMutations[i]))));
400        fail("Should fail since the mutation does not contain any cells");
401      } catch (IllegalArgumentException e) {
402        assertThat(e.getMessage(), containsString(emptyMessages[i]));
403      }
404
405      // Test oversized mutation
406      try {
407        table.batch(Arrays.asList(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, CQ)
408          .build(new RowMutations(row).add(oversizedMutations[i])), new Delete(row)));
409        fail("Should fail since the mutation exceeds the max key value size");
410      } catch (IllegalArgumentException e) {
411        assertThat(e.getMessage(), containsString("KeyValue size too large"));
412      }
413    }
414  }
415
416  @TestTemplate
417  public void testWithCheckAndMutate() throws Exception {
418    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
419
420    byte[] row1 = Bytes.toBytes("row1");
421    byte[] row2 = Bytes.toBytes("row2");
422    byte[] row3 = Bytes.toBytes("row3");
423    byte[] row4 = Bytes.toBytes("row4");
424    byte[] row5 = Bytes.toBytes("row5");
425    byte[] row6 = Bytes.toBytes("row6");
426    byte[] row7 = Bytes.toBytes("row7");
427
428    table
429      .putAll(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
430        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
431        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
432        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
433        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
434        new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
435        new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))))
436      .get();
437
438    CheckAndMutate checkAndMutate1 =
439      CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
440        .build(new RowMutations(row1)
441          .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
442          .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
443          .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
444          .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
445    Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
446    RowMutations mutations =
447      new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
448        .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
449        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
450        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
451    CheckAndMutate checkAndMutate2 =
452      CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
453        .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
454    Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
455    CheckAndMutate checkAndMutate3 =
456      CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
457        .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
458    CheckAndMutate checkAndMutate4 =
459      CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
460        .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
461
462    List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
463      checkAndMutate3, checkAndMutate4);
464    List<Object> results = table.batchAll(actions).get();
465
466    CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0);
467    assertTrue(checkAndMutateResult.isSuccess());
468    assertEquals(3L,
469      Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
470    assertEquals("d",
471      Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
472
473    assertEquals("b",
474      Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
475
476    Result result = (Result) results.get(2);
477    assertTrue(result.getExists());
478    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
479    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
480
481    checkAndMutateResult = (CheckAndMutateResult) results.get(3);
482    assertFalse(checkAndMutateResult.isSuccess());
483    assertNull(checkAndMutateResult.getResult());
484
485    assertTrue(((Result) results.get(4)).isEmpty());
486
487    checkAndMutateResult = (CheckAndMutateResult) results.get(5);
488    assertTrue(checkAndMutateResult.isSuccess());
489    assertEquals(11,
490      Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F"))));
491
492    checkAndMutateResult = (CheckAndMutateResult) results.get(6);
493    assertTrue(checkAndMutateResult.isSuccess());
494    assertEquals("gg",
495      Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G"))));
496
497    result = table.get(new Get(row1)).get();
498    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
499    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
500    assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
501    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
502
503    result = table.get(new Get(row3)).get();
504    assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
505    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
506    assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
507    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
508    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
509
510    result = table.get(new Get(row4)).get();
511    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
512
513    result = table.get(new Get(row5)).get();
514    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
515
516    result = table.get(new Get(row6)).get();
517    assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
518
519    result = table.get(new Get(row7)).get();
520    assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
521  }
522}