001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035import java.util.Optional;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ForkJoinPool;
039import java.util.concurrent.Future;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.TimeoutException;
042import java.util.function.Function;
043import java.util.stream.Collectors;
044import java.util.stream.IntStream;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.coprocessor.ObserverContext;
050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
052import org.apache.hadoop.hbase.coprocessor.RegionObserver;
053import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.junit.After;
058import org.junit.AfterClass;
059import org.junit.Before;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.runner.RunWith;
065import org.junit.runners.Parameterized;
066import org.junit.runners.Parameterized.Parameter;
067import org.junit.runners.Parameterized.Parameters;
068
069@RunWith(Parameterized.class)
070@Category({ LargeTests.class, ClientTests.class })
071public class TestAsyncTableBatch {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestAsyncTableBatch.class);
076
077  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
078
079  private static TableName TABLE_NAME = TableName.valueOf("async");
080
081  private static byte[] FAMILY = Bytes.toBytes("cf");
082
083  private static byte[] CQ = Bytes.toBytes("cq");
084  private static byte[] CQ1 = Bytes.toBytes("cq1");
085
086  private static int COUNT = 1000;
087
088  private static AsyncConnection CONN;
089
090  private static byte[][] SPLIT_KEYS;
091
092  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
093
094  @Parameter(0)
095  public String tableType;
096
097  @Parameter(1)
098  public Function<TableName, AsyncTable<?>> tableGetter;
099
100  private static AsyncTable<?> getRawTable(TableName tableName) {
101    return CONN.getTable(tableName);
102  }
103
104  private static AsyncTable<?> getTable(TableName tableName) {
105    return CONN.getTable(tableName, ForkJoinPool.commonPool());
106  }
107
108  @Parameters(name = "{index}: type={0}")
109  public static List<Object[]> params() {
110    Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable;
111    Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable;
112    return Arrays.asList(new Object[] { "raw", rawTableGetter },
113      new Object[] { "normal", tableGetter });
114  }
115
116  @BeforeClass
117  public static void setUp() throws Exception {
118    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
119      MAX_KEY_VALUE_SIZE);
120    TEST_UTIL.startMiniCluster(3);
121    SPLIT_KEYS = new byte[8][];
122    for (int i = 111; i < 999; i += 111) {
123      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
124    }
125    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
126  }
127
128  @AfterClass
129  public static void tearDown() throws Exception {
130    CONN.close();
131    TEST_UTIL.shutdownMiniCluster();
132  }
133
134  @Before
135  public void setUpBeforeTest() throws IOException, InterruptedException {
136    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
137    TEST_UTIL.waitTableAvailable(TABLE_NAME);
138  }
139
140  @After
141  public void tearDownAfterTest() throws IOException {
142    Admin admin = TEST_UTIL.getAdmin();
143    if (admin.isTableEnabled(TABLE_NAME)) {
144      admin.disableTable(TABLE_NAME);
145    }
146    admin.deleteTable(TABLE_NAME);
147  }
148
149  private byte[] getRow(int i) {
150    return Bytes.toBytes(String.format("%03d", i));
151  }
152
153  @Test
154  public void test()
155    throws InterruptedException, ExecutionException, IOException, TimeoutException {
156    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
157    table.putAll(IntStream.range(0, COUNT)
158      .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
159      .collect(Collectors.toList())).get();
160    List<Result> results = table.getAll(IntStream.range(0, COUNT)
161      .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
162      .flatMap(l -> l.stream()).collect(Collectors.toList())).get();
163    assertEquals(2 * COUNT, results.size());
164    for (int i = 0; i < COUNT; i++) {
165      assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
166      assertTrue(results.get(2 * i + 1).isEmpty());
167    }
168    Admin admin = TEST_UTIL.getAdmin();
169    admin.flush(TABLE_NAME);
170    List<Future<?>> splitFutures =
171      TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> {
172        byte[] startKey = r.getRegionInfo().getStartKey();
173        int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
174        byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
175        try {
176          return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint);
177        } catch (IOException e) {
178          throw new UncheckedIOException(e);
179        }
180      }).collect(Collectors.toList());
181    for (Future<?> future : splitFutures) {
182      future.get(30, TimeUnit.SECONDS);
183    }
184    table
185      .deleteAll(
186        IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
187      .get();
188    results = table
189      .getAll(
190        IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
191      .get();
192    assertEquals(COUNT, results.size());
193    results.forEach(r -> assertTrue(r.isEmpty()));
194  }
195
196  @Test
197  public void testWithRegionServerFailover() throws Exception {
198    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
199    table.putAll(IntStream.range(0, COUNT)
200      .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
201      .collect(Collectors.toList())).get();
202    TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests");
203    Thread.sleep(100);
204    table.putAll(IntStream.range(COUNT, 2 * COUNT)
205      .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
206      .collect(Collectors.toList())).get();
207    List<Result> results = table.getAll(
208      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
209      .get();
210    assertEquals(2 * COUNT, results.size());
211    results.forEach(r -> assertFalse(r.isEmpty()));
212    table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i)))
213      .collect(Collectors.toList())).get();
214    results = table.getAll(
215      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
216      .get();
217    assertEquals(2 * COUNT, results.size());
218    results.forEach(r -> assertTrue(r.isEmpty()));
219  }
220
221  @Test
222  public void testMixed() throws InterruptedException, ExecutionException, IOException {
223    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
224    table.putAll(IntStream.range(0, 7)
225      .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
226      .collect(Collectors.toList())).get();
227    List<Row> actions = new ArrayList<>();
228    actions.add(new Get(Bytes.toBytes(0)));
229    actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L)));
230    actions.add(new Delete(Bytes.toBytes(2)));
231    actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
232    actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
233    RowMutations rm = new RowMutations(Bytes.toBytes(5));
234    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
235    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
236    actions.add(rm);
237    actions.add(new Get(Bytes.toBytes(6)));
238
239    List<Object> results = table.batchAll(actions).get();
240    assertEquals(7, results.size());
241    Result getResult = (Result) results.get(0);
242    assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
243    assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
244    assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
245    Result incrementResult = (Result) results.get(3);
246    assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
247    Result appendResult = (Result) results.get(4);
248    byte[] appendValue = appendResult.getValue(FAMILY, CQ);
249    assertEquals(12, appendValue.length);
250    assertEquals(4, Bytes.toLong(appendValue));
251    assertEquals(4, Bytes.toInt(appendValue, 8));
252    assertEquals(100,
253      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
254    assertEquals(200,
255      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
256    getResult = (Result) results.get(6);
257    assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
258  }
259
260  public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
261
262    @Override
263    public Optional<RegionObserver> getRegionObserver() {
264      return Optional.of(this);
265    }
266
267    @Override
268    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
269      List<Cell> results) throws IOException {
270      if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
271        throw new DoNotRetryRegionException("Inject Error");
272      }
273    }
274  }
275
276  @Test
277  public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
278    Admin admin = TEST_UTIL.getAdmin();
279    TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME))
280      .setCoprocessor(ErrorInjectObserver.class.getName()).build();
281    admin.modifyTable(htd);
282    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
283    table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
284      .collect(Collectors.toList())).get();
285    List<CompletableFuture<Result>> futures = table
286      .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
287    for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
288      assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
289    }
290    try {
291      futures.get(SPLIT_KEYS.length - 1).get();
292      fail();
293    } catch (ExecutionException e) {
294      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
295    }
296  }
297
298  @Test
299  public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException {
300    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
301    List<CompletableFuture<Object>> futures = table.batch(Arrays.asList(
302      new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ,
303        Bytes.toBytes("bad")),
304      new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1),
305      new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good"))));
306    try {
307      futures.get(0).get();
308      fail();
309    } catch (ExecutionException e) {
310      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
311      assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class));
312    }
313    assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ)));
314    assertTrue(((Result) futures.get(2).get()).isEmpty());
315    assertEquals("good",
316      Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
317  }
318
319  @Test
320  public void testInvalidMutation() {
321    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
322
323    Mutation[] emptyMutations =
324      { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) };
325
326    String[] emptyMessages =
327      { "No columns to put", "No columns to increment", "No columns to append" };
328
329    Mutation[] oversizedMutations =
330      { new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
331        new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
332        new Append(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) };
333
334    for (int i = 0; i < emptyMutations.length; i++) {
335      // Test empty mutation
336      try {
337        table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), emptyMutations[i]));
338        fail("Should fail since the mutation does not contain any cells");
339      } catch (IllegalArgumentException e) {
340        assertThat(e.getMessage(), containsString(emptyMessages[i]));
341      }
342
343      // Test oversized mutation
344      try {
345        table.batch(Arrays.asList(oversizedMutations[i], new Delete(Bytes.toBytes(0))));
346        fail("Should fail since the mutation exceeds the max key value size");
347      } catch (IllegalArgumentException e) {
348        assertThat(e.getMessage(), containsString("KeyValue size too large"));
349      }
350    }
351  }
352
353  @Test
354  public void testInvalidMutationInRowMutations() throws IOException {
355    final byte[] row = Bytes.toBytes(0);
356    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
357
358    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
359
360    String[] emptyMessages =
361      { "No columns to put", "No columns to increment", "No columns to append" };
362
363    Mutation[] oversizedMutations =
364      { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
365        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
366        new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) };
367
368    for (int i = 0; i < emptyMutations.length; i++) {
369      // Test empty mutation
370      try {
371        table.batch(Arrays.asList(new Delete(row), new RowMutations(row).add(emptyMutations[i])));
372        fail("Should fail since the mutation does not contain any cells");
373      } catch (IllegalArgumentException e) {
374        assertThat(e.getMessage(), containsString(emptyMessages[i]));
375      }
376
377      // Test oversized mutation
378      try {
379        table
380          .batch(Arrays.asList(new RowMutations(row).add(oversizedMutations[i]), new Delete(row)));
381        fail("Should fail since the mutation exceeds the max key value size");
382      } catch (IllegalArgumentException e) {
383        assertThat(e.getMessage(), containsString("KeyValue size too large"));
384      }
385    }
386  }
387
388  @Test
389  public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException {
390    final byte[] row = Bytes.toBytes(0);
391    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
392
393    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
394
395    String[] emptyMessages =
396      { "No columns to put", "No columns to increment", "No columns to append" };
397
398    Mutation[] oversizedMutations =
399      { new Put(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
400        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
401        new Append(row).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]) };
402
403    for (int i = 0; i < emptyMutations.length; i++) {
404      // Test empty mutation
405      try {
406        table.batch(Arrays.asList(new Delete(row), CheckAndMutate.newBuilder(row)
407          .ifNotExists(FAMILY, CQ).build(new RowMutations(row).add(emptyMutations[i]))));
408        fail("Should fail since the mutation does not contain any cells");
409      } catch (IllegalArgumentException e) {
410        assertThat(e.getMessage(), containsString(emptyMessages[i]));
411      }
412
413      // Test oversized mutation
414      try {
415        table.batch(Arrays.asList(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, CQ)
416          .build(new RowMutations(row).add(oversizedMutations[i])), new Delete(row)));
417        fail("Should fail since the mutation exceeds the max key value size");
418      } catch (IllegalArgumentException e) {
419        assertThat(e.getMessage(), containsString("KeyValue size too large"));
420      }
421    }
422  }
423
424  @Test
425  public void testWithCheckAndMutate() throws Exception {
426    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
427
428    byte[] row1 = Bytes.toBytes("row1");
429    byte[] row2 = Bytes.toBytes("row2");
430    byte[] row3 = Bytes.toBytes("row3");
431    byte[] row4 = Bytes.toBytes("row4");
432    byte[] row5 = Bytes.toBytes("row5");
433    byte[] row6 = Bytes.toBytes("row6");
434    byte[] row7 = Bytes.toBytes("row7");
435
436    table
437      .putAll(Arrays.asList(new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
438        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
439        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
440        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
441        new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
442        new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
443        new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))))
444      .get();
445
446    CheckAndMutate checkAndMutate1 =
447      CheckAndMutate.newBuilder(row1).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
448        .build(new RowMutations(row1)
449          .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
450          .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
451          .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
452          .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
453    Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
454    RowMutations mutations =
455      new RowMutations(row3).add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
456        .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
457        .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
458        .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
459    CheckAndMutate checkAndMutate2 =
460      CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
461        .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
462    Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
463    CheckAndMutate checkAndMutate3 =
464      CheckAndMutate.newBuilder(row6).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
465        .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
466    CheckAndMutate checkAndMutate4 =
467      CheckAndMutate.newBuilder(row7).ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
468        .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
469
470    List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
471      checkAndMutate3, checkAndMutate4);
472    List<Object> results = table.batchAll(actions).get();
473
474    CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0);
475    assertTrue(checkAndMutateResult.isSuccess());
476    assertEquals(3L,
477      Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
478    assertEquals("d",
479      Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
480
481    assertEquals("b",
482      Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
483
484    Result result = (Result) results.get(2);
485    assertTrue(result.getExists());
486    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
487    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
488
489    checkAndMutateResult = (CheckAndMutateResult) results.get(3);
490    assertFalse(checkAndMutateResult.isSuccess());
491    assertNull(checkAndMutateResult.getResult());
492
493    assertTrue(((Result) results.get(4)).isEmpty());
494
495    checkAndMutateResult = (CheckAndMutateResult) results.get(5);
496    assertTrue(checkAndMutateResult.isSuccess());
497    assertEquals(11,
498      Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("F"))));
499
500    checkAndMutateResult = (CheckAndMutateResult) results.get(6);
501    assertTrue(checkAndMutateResult.isSuccess());
502    assertEquals("gg",
503      Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("G"))));
504
505    result = table.get(new Get(row1)).get();
506    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
507    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
508    assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
509    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
510
511    result = table.get(new Get(row3)).get();
512    assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
513    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
514    assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
515    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
516    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
517
518    result = table.get(new Get(row4)).get();
519    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
520
521    result = table.get(new Get(row5)).get();
522    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
523
524    result = table.get(new Get(row6)).get();
525    assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
526
527    result = table.get(new Get(row7)).get();
528    assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
529  }
530}