001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.function.Consumer;
043import java.util.function.Supplier;
044import java.util.stream.IntStream;
045import org.apache.hadoop.hbase.CompareOperator;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtil;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNotEnabledException;
050import org.apache.hadoop.hbase.filter.BinaryComparator;
051import org.apache.hadoop.hbase.filter.FamilyFilter;
052import org.apache.hadoop.hbase.filter.FilterList;
053import org.apache.hadoop.hbase.filter.QualifierFilter;
054import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
055import org.apache.hadoop.hbase.filter.TimestampsFilter;
056import org.apache.hadoop.hbase.io.TimeRange;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
061import org.apache.hadoop.hbase.util.Pair;
062import org.junit.AfterClass;
063import org.junit.Before;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.junit.rules.TestName;
070import org.junit.runner.RunWith;
071import org.junit.runners.Parameterized;
072import org.junit.runners.Parameterized.Parameter;
073import org.junit.runners.Parameterized.Parameters;
074
075import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
076
077@RunWith(Parameterized.class)
078@Category({ MediumTests.class, ClientTests.class })
079public class TestAsyncTable {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestAsyncTable.class);
084
085  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
086
087  private static TableName TABLE_NAME = TableName.valueOf("async");
088
089  private static byte[] FAMILY = Bytes.toBytes("cf");
090
091  private static byte[] QUALIFIER = Bytes.toBytes("cq");
092
093  private static byte[] VALUE = Bytes.toBytes("value");
094
095  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
096
097  private static AsyncConnection ASYNC_CONN;
098
099  @Rule
100  public TestName testName = new TestName();
101
102  private byte[] row;
103
104  @Parameter
105  public Supplier<AsyncTable<?>> getTable;
106
107  private static AsyncTable<?> getRawTable() {
108    return ASYNC_CONN.getTable(TABLE_NAME);
109  }
110
111  private static AsyncTable<?> getTable() {
112    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
113  }
114
115  @Parameters
116  public static List<Object[]> params() {
117    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
118      new Supplier<?>[] { TestAsyncTable::getTable });
119  }
120
121  @BeforeClass
122  public static void setUpBeforeClass() throws Exception {
123    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
124      MAX_KEY_VALUE_SIZE);
125    TEST_UTIL.startMiniCluster(1);
126    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
127    TEST_UTIL.waitTableAvailable(TABLE_NAME);
128    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
129    assertFalse(ASYNC_CONN.isClosed());
130  }
131
132  @AfterClass
133  public static void tearDownAfterClass() throws Exception {
134    Closeables.close(ASYNC_CONN, true);
135    assertTrue(ASYNC_CONN.isClosed());
136    TEST_UTIL.shutdownMiniCluster();
137  }
138
139  @Before
140  public void setUp() throws IOException, InterruptedException, ExecutionException {
141    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
142    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
143      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
144    }
145  }
146
147  @Test
148  public void testSimple() throws Exception {
149    AsyncTable<?> table = getTable.get();
150    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
151    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
152    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
153    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
154    table.delete(new Delete(row)).get();
155    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
156    assertTrue(result.isEmpty());
157    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
158  }
159
160  private byte[] concat(byte[] base, int index) {
161    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
162  }
163
164  @SuppressWarnings("FutureReturnValueIgnored")
165  @Test
166  public void testSimpleMultiple() throws Exception {
167    AsyncTable<?> table = getTable.get();
168    int count = 100;
169    CountDownLatch putLatch = new CountDownLatch(count);
170    IntStream.range(0, count).forEach(
171      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
172        .thenAccept(x -> putLatch.countDown()));
173    putLatch.await();
174    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
175    IntStream.range(0, count)
176      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
177        .thenAccept(x -> existsResp.add(x)));
178    for (int i = 0; i < count; i++) {
179      assertTrue(existsResp.take());
180    }
181    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
182    IntStream.range(0, count)
183      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
184        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
185    for (int i = 0; i < count; i++) {
186      Pair<Integer, Result> pair = getResp.take();
187      assertArrayEquals(concat(VALUE, pair.getFirst()),
188        pair.getSecond().getValue(FAMILY, QUALIFIER));
189    }
190    CountDownLatch deleteLatch = new CountDownLatch(count);
191    IntStream.range(0, count).forEach(
192      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
193    deleteLatch.await();
194    IntStream.range(0, count)
195      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
196        .thenAccept(x -> existsResp.add(x)));
197    for (int i = 0; i < count; i++) {
198      assertFalse(existsResp.take());
199    }
200    IntStream.range(0, count)
201      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
202        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
203    for (int i = 0; i < count; i++) {
204      Pair<Integer, Result> pair = getResp.take();
205      assertTrue(pair.getSecond().isEmpty());
206    }
207  }
208
209  @SuppressWarnings("FutureReturnValueIgnored")
210  @Test
211  public void testIncrement() throws InterruptedException, ExecutionException {
212    AsyncTable<?> table = getTable.get();
213    int count = 100;
214    CountDownLatch latch = new CountDownLatch(count);
215    AtomicLong sum = new AtomicLong(0L);
216    IntStream.range(0, count)
217      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
218        sum.addAndGet(x);
219        latch.countDown();
220      }));
221    latch.await();
222    assertEquals(count, Bytes.toLong(
223      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
224    assertEquals((1 + count) * count / 2, sum.get());
225  }
226
227  @SuppressWarnings("FutureReturnValueIgnored")
228  @Test
229  public void testAppend() throws InterruptedException, ExecutionException {
230    AsyncTable<?> table = getTable.get();
231    int count = 10;
232    CountDownLatch latch = new CountDownLatch(count);
233    char suffix = ':';
234    AtomicLong suffixCount = new AtomicLong(0L);
235    IntStream.range(0, count)
236      .forEachOrdered(i -> table
237        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
238        .thenAccept(r -> {
239          suffixCount.addAndGet(
240            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
241          latch.countDown();
242        }));
243    latch.await();
244    assertEquals((1 + count) * count / 2, suffixCount.get());
245    String value = Bytes.toString(
246      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
247    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
248      .sorted().toArray();
249    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
250  }
251
252  @Test
253  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
254    AsyncTable<?> table = getTable.get();
255    RowMutations mutation = new RowMutations(row);
256    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
257    Result result = table.mutateRow(mutation).get();
258    assertTrue(result.getExists());
259    assertTrue(result.isEmpty());
260
261    result = table.get(new Get(row)).get();
262    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
263
264    mutation = new RowMutations(row);
265    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
266    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
267    mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
268    mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
269    result = table.mutateRow(mutation).get();
270    assertTrue(result.getExists());
271    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
272    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
273
274    result = table.get(new Get(row)).get();
275    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
276    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
277    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
278    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
279  }
280
281  // Tests for old checkAndMutate API
282
283  @SuppressWarnings("FutureReturnValueIgnored")
284  @Test
285  @Deprecated
286  public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
287    AsyncTable<?> table = getTable.get();
288    AtomicInteger successCount = new AtomicInteger(0);
289    AtomicInteger successIndex = new AtomicInteger(-1);
290    int count = 10;
291    CountDownLatch latch = new CountDownLatch(count);
292    IntStream.range(0, count)
293      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
294        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
295          if (x) {
296            successCount.incrementAndGet();
297            successIndex.set(i);
298          }
299          latch.countDown();
300        }));
301    latch.await();
302    assertEquals(1, successCount.get());
303    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
304    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
305  }
306
307  @SuppressWarnings("FutureReturnValueIgnored")
308  @Test
309  @Deprecated
310  public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
311    AsyncTable<?> table = getTable.get();
312    int count = 10;
313    CountDownLatch putLatch = new CountDownLatch(count + 1);
314    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
315    IntStream.range(0, count)
316      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
317        .thenRun(() -> putLatch.countDown()));
318    putLatch.await();
319
320    AtomicInteger successCount = new AtomicInteger(0);
321    AtomicInteger successIndex = new AtomicInteger(-1);
322    CountDownLatch deleteLatch = new CountDownLatch(count);
323    IntStream.range(0, count)
324      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
325        .thenDelete(
326          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
327        .thenAccept(x -> {
328          if (x) {
329            successCount.incrementAndGet();
330            successIndex.set(i);
331          }
332          deleteLatch.countDown();
333        }));
334    deleteLatch.await();
335    assertEquals(1, successCount.get());
336    Result result = table.get(new Get(row)).get();
337    IntStream.range(0, count).forEach(i -> {
338      if (i == successIndex.get()) {
339        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
340      } else {
341        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
342      }
343    });
344  }
345
346  @SuppressWarnings("FutureReturnValueIgnored")
347  @Test
348  @Deprecated
349  public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
350    AsyncTable<?> table = getTable.get();
351    int count = 10;
352    CountDownLatch putLatch = new CountDownLatch(count + 1);
353    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
354    IntStream.range(0, count)
355      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
356        .thenRun(() -> putLatch.countDown()));
357    putLatch.await();
358
359    AtomicInteger successCount = new AtomicInteger(0);
360    AtomicInteger successIndex = new AtomicInteger(-1);
361    CountDownLatch mutateLatch = new CountDownLatch(count);
362    IntStream.range(0, count).forEach(i -> {
363      RowMutations mutation = new RowMutations(row);
364      try {
365        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
366        mutation
367          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
368      } catch (IOException e) {
369        throw new UncheckedIOException(e);
370      }
371      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
372        .thenAccept(x -> {
373          if (x) {
374            successCount.incrementAndGet();
375            successIndex.set(i);
376          }
377          mutateLatch.countDown();
378        });
379    });
380    mutateLatch.await();
381    assertEquals(1, successCount.get());
382    Result result = table.get(new Get(row)).get();
383    IntStream.range(0, count).forEach(i -> {
384      if (i == successIndex.get()) {
385        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
386      } else {
387        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
388      }
389    });
390  }
391
392  @Test
393  @Deprecated
394  public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
395    AsyncTable<?> table = getTable.get();
396    final long ts = EnvironmentEdgeManager.currentTime() / 2;
397    Put put = new Put(row);
398    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
399
400    boolean ok =
401      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
402    assertTrue(ok);
403
404    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
405      .ifEquals(VALUE).thenPut(put).get();
406    assertFalse(ok);
407
408    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
409      .ifEquals(VALUE).thenPut(put).get();
410    assertTrue(ok);
411
412    RowMutations rm = new RowMutations(row).add((Mutation) put);
413
414    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
415      .ifEquals(VALUE).thenMutate(rm).get();
416    assertFalse(ok);
417
418    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
419      .ifEquals(VALUE).thenMutate(rm).get();
420    assertTrue(ok);
421
422    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
423
424    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
425      .ifEquals(VALUE).thenDelete(delete).get();
426    assertFalse(ok);
427
428    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
429      .ifEquals(VALUE).thenDelete(delete).get();
430    assertTrue(ok);
431  }
432
433  @Test
434  @Deprecated
435  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
436    AsyncTable<?> table = getTable.get();
437
438    // Put one row
439    Put put = new Put(row);
440    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
441    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
442    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
443    table.put(put).get();
444
445    // Put with success
446    boolean ok = table
447      .checkAndMutate(row,
448        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
449          Bytes.toBytes("a")))
450      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get();
451    assertTrue(ok);
452
453    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
454    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
455
456    // Put with failure
457    ok = table
458      .checkAndMutate(row,
459        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
460          Bytes.toBytes("b")))
461      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get();
462    assertFalse(ok);
463
464    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
465
466    // Delete with success
467    ok = table
468      .checkAndMutate(row,
469        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
470          Bytes.toBytes("a")))
471      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get();
472    assertTrue(ok);
473
474    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
475
476    // Mutate with success
477    ok = table
478      .checkAndMutate(row,
479        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
480          Bytes.toBytes("b")))
481      .thenMutate(new RowMutations(row)
482        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
483        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
484      .get();
485    assertTrue(ok);
486
487    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
488    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
489
490    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
491  }
492
493  @Test
494  @Deprecated
495  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
496    AsyncTable<?> table = getTable.get();
497
498    // Put one row
499    Put put = new Put(row);
500    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
501    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
502    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
503    table.put(put).get();
504
505    // Put with success
506    boolean ok = table
507      .checkAndMutate(row,
508        new FilterList(
509          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
510            Bytes.toBytes("a")),
511          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
512            Bytes.toBytes("b"))))
513      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get();
514    assertTrue(ok);
515
516    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
517    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
518
519    // Put with failure
520    ok = table
521      .checkAndMutate(row,
522        new FilterList(
523          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
524            Bytes.toBytes("a")),
525          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
526            Bytes.toBytes("c"))))
527      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get();
528    assertFalse(ok);
529
530    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
531
532    // Delete with success
533    ok = table
534      .checkAndMutate(row,
535        new FilterList(
536          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
537            Bytes.toBytes("a")),
538          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
539            Bytes.toBytes("b"))))
540      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get();
541    assertTrue(ok);
542
543    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
544
545    // Mutate with success
546    ok = table
547      .checkAndMutate(row,
548        new FilterList(
549          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
550            Bytes.toBytes("a")),
551          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
552            Bytes.toBytes("b"))))
553      .thenMutate(new RowMutations(row)
554        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
555        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
556      .get();
557    assertTrue(ok);
558
559    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
560    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
561
562    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
563  }
564
565  @Test
566  @Deprecated
567  public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
568    AsyncTable<?> table = getTable.get();
569
570    // Put with specifying the timestamp
571    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
572
573    // Put with success
574    boolean ok = table
575      .checkAndMutate(row,
576        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
577          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
578          new TimestampsFilter(Collections.singletonList(100L))))
579      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get();
580    assertTrue(ok);
581
582    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
583    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
584
585    // Put with failure
586    ok = table
587      .checkAndMutate(row,
588        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
589          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
590          new TimestampsFilter(Collections.singletonList(101L))))
591      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get();
592    assertFalse(ok);
593
594    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
595  }
596
597  @Test
598  @Deprecated
599  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
600    AsyncTable<?> table = getTable.get();
601
602    // Put with specifying the timestamp
603    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
604
605    // Put with success
606    boolean ok = table
607      .checkAndMutate(row,
608        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
609          Bytes.toBytes("a")))
610      .timeRange(TimeRange.between(0, 101))
611      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get();
612    assertTrue(ok);
613
614    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
615    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
616
617    // Put with failure
618    ok = table
619      .checkAndMutate(row,
620        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
621          Bytes.toBytes("a")))
622      .timeRange(TimeRange.between(0, 100))
623      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get();
624    assertFalse(ok);
625
626    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
627  }
628
629  @Test(expected = NullPointerException.class)
630  @Deprecated
631  public void testCheckAndMutateWithoutConditionForOldApi() {
632    getTable.get().checkAndMutate(row, FAMILY)
633      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
634  }
635
636  // Tests for new CheckAndMutate API
637
638  @SuppressWarnings("FutureReturnValueIgnored")
639  @Test
640  public void testCheckAndPut() throws InterruptedException, ExecutionException {
641    AsyncTable<?> table = getTable.get();
642    AtomicInteger successCount = new AtomicInteger(0);
643    AtomicInteger successIndex = new AtomicInteger(-1);
644    int count = 10;
645    CountDownLatch latch = new CountDownLatch(count);
646
647    IntStream.range(0, count).forEach(
648      i -> table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
649        .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))).thenAccept(x -> {
650          if (x.isSuccess()) {
651            successCount.incrementAndGet();
652            successIndex.set(i);
653          }
654          assertNull(x.getResult());
655          latch.countDown();
656        }));
657    latch.await();
658    assertEquals(1, successCount.get());
659    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
660    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
661  }
662
663  @SuppressWarnings("FutureReturnValueIgnored")
664  @Test
665  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
666    AsyncTable<?> table = getTable.get();
667    int count = 10;
668    CountDownLatch putLatch = new CountDownLatch(count + 1);
669    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
670    IntStream.range(0, count)
671      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
672        .thenRun(() -> putLatch.countDown()));
673    putLatch.await();
674
675    AtomicInteger successCount = new AtomicInteger(0);
676    AtomicInteger successIndex = new AtomicInteger(-1);
677    CountDownLatch deleteLatch = new CountDownLatch(count);
678
679    IntStream.range(0, count)
680      .forEach(i -> table
681        .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(
682          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
683        .thenAccept(x -> {
684          if (x.isSuccess()) {
685            successCount.incrementAndGet();
686            successIndex.set(i);
687          }
688          assertNull(x.getResult());
689          deleteLatch.countDown();
690        }));
691    deleteLatch.await();
692    assertEquals(1, successCount.get());
693    Result result = table.get(new Get(row)).get();
694    IntStream.range(0, count).forEach(i -> {
695      if (i == successIndex.get()) {
696        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
697      } else {
698        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
699      }
700    });
701  }
702
703  @SuppressWarnings("FutureReturnValueIgnored")
704  @Test
705  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
706    AsyncTable<?> table = getTable.get();
707    int count = 10;
708    CountDownLatch putLatch = new CountDownLatch(count + 1);
709    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
710    IntStream.range(0, count)
711      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
712        .thenRun(() -> putLatch.countDown()));
713    putLatch.await();
714
715    AtomicInteger successCount = new AtomicInteger(0);
716    AtomicInteger successIndex = new AtomicInteger(-1);
717    CountDownLatch mutateLatch = new CountDownLatch(count);
718    IntStream.range(0, count).forEach(i -> {
719      RowMutations mutation = new RowMutations(row);
720      try {
721        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
722        mutation
723          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
724      } catch (IOException e) {
725        throw new UncheckedIOException(e);
726      }
727
728      table
729        .checkAndMutate(
730          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(mutation))
731        .thenAccept(x -> {
732          if (x.isSuccess()) {
733            successCount.incrementAndGet();
734            successIndex.set(i);
735          }
736          assertNull(x.getResult());
737          mutateLatch.countDown();
738        });
739    });
740    mutateLatch.await();
741    assertEquals(1, successCount.get());
742    Result result = table.get(new Get(row)).get();
743    IntStream.range(0, count).forEach(i -> {
744      if (i == successIndex.get()) {
745        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
746      } else {
747        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
748      }
749    });
750  }
751
752  @Test
753  public void testCheckAndMutateWithTimeRange() throws Exception {
754    AsyncTable<?> table = getTable.get();
755    final long ts = EnvironmentEdgeManager.currentTime() / 2;
756    Put put = new Put(row);
757    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
758
759    CheckAndMutateResult result =
760      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(put))
761        .get();
762    assertTrue(result.isSuccess());
763    assertNull(result.getResult());
764
765    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
766      .timeRange(TimeRange.at(ts + 10000)).build(put)).get();
767    assertFalse(result.isSuccess());
768    assertNull(result.getResult());
769
770    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
771      .timeRange(TimeRange.at(ts)).build(put)).get();
772    assertTrue(result.isSuccess());
773    assertNull(result.getResult());
774
775    RowMutations rm = new RowMutations(row).add((Mutation) put);
776
777    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
778      .timeRange(TimeRange.at(ts + 10000)).build(rm)).get();
779    assertFalse(result.isSuccess());
780    assertNull(result.getResult());
781
782    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
783      .timeRange(TimeRange.at(ts)).build(rm)).get();
784    assertTrue(result.isSuccess());
785    assertNull(result.getResult());
786
787    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
788
789    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
790      .timeRange(TimeRange.at(ts + 10000)).build(delete)).get();
791    assertFalse(result.isSuccess());
792    assertNull(result.getResult());
793
794    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
795      .timeRange(TimeRange.at(ts)).build(delete)).get();
796    assertTrue(result.isSuccess());
797    assertNull(result.getResult());
798  }
799
800  @Test
801  public void testCheckAndMutateWithSingleFilter() throws Throwable {
802    AsyncTable<?> table = getTable.get();
803
804    // Put one row
805    Put put = new Put(row);
806    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
807    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
808    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
809    table.put(put).get();
810
811    // Put with success
812    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
813      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
814        Bytes.toBytes("a")))
815      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
816    assertTrue(result.isSuccess());
817    assertNull(result.getResult());
818
819    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
820    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
821
822    // Put with failure
823    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
824      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
825        Bytes.toBytes("b")))
826      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
827    assertFalse(result.isSuccess());
828    assertNull(result.getResult());
829
830    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
831
832    // Delete with success
833    result =
834      table
835        .checkAndMutate(CheckAndMutate.newBuilder(row)
836          .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
837            Bytes.toBytes("a")))
838          .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))))
839        .get();
840    assertTrue(result.isSuccess());
841    assertNull(result.getResult());
842
843    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
844
845    // Mutate with success
846    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
847      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
848        Bytes.toBytes("b")))
849      .build(new RowMutations(row)
850        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
851        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))))
852      .get();
853    assertTrue(result.isSuccess());
854    assertNull(result.getResult());
855
856    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
857    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
858
859    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
860  }
861
862  @Test
863  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
864    AsyncTable<?> table = getTable.get();
865
866    // Put one row
867    Put put = new Put(row);
868    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
869    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
870    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
871    table.put(put).get();
872
873    // Put with success
874    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
875      .ifMatches(new FilterList(
876        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
877          Bytes.toBytes("a")),
878        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
879          Bytes.toBytes("b"))))
880      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
881    assertTrue(result.isSuccess());
882    assertNull(result.getResult());
883
884    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
885    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
886
887    // Put with failure
888    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
889      .ifMatches(new FilterList(
890        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
891          Bytes.toBytes("a")),
892        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
893          Bytes.toBytes("c"))))
894      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
895    assertFalse(result.isSuccess());
896    assertNull(result.getResult());
897
898    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
899
900    // Delete with success
901    result = table
902      .checkAndMutate(CheckAndMutate.newBuilder(row)
903        .ifMatches(new FilterList(
904          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
905            Bytes.toBytes("a")),
906          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
907            Bytes.toBytes("b"))))
908        .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))))
909      .get();
910    assertTrue(result.isSuccess());
911    assertNull(result.getResult());
912
913    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
914
915    // Mutate with success
916    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
917      .ifMatches(new FilterList(
918        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
919          Bytes.toBytes("a")),
920        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
921          Bytes.toBytes("b"))))
922      .build(new RowMutations(row)
923        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
924        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))))
925      .get();
926    assertTrue(result.isSuccess());
927    assertNull(result.getResult());
928
929    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
930    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
931
932    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
933  }
934
935  @Test
936  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
937    AsyncTable<?> table = getTable.get();
938
939    // Put with specifying the timestamp
940    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
941
942    // Put with success
943    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
944      .ifMatches(
945        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
946          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
947          new TimestampsFilter(Collections.singletonList(100L))))
948      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
949    assertTrue(result.isSuccess());
950    assertNull(result.getResult());
951
952    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
953    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
954
955    // Put with failure
956    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
957      .ifMatches(
958        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
959          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
960          new TimestampsFilter(Collections.singletonList(101L))))
961      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
962    assertFalse(result.isSuccess());
963    assertNull(result.getResult());
964
965    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
966  }
967
968  @Test
969  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
970    AsyncTable<?> table = getTable.get();
971
972    // Put with specifying the timestamp
973    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
974
975    // Put with success
976    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
977      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
978        Bytes.toBytes("a")))
979      .timeRange(TimeRange.between(0, 101))
980      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
981    assertTrue(result.isSuccess());
982    assertNull(result.getResult());
983
984    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
985    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
986
987    // Put with failure
988    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
989      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
990        Bytes.toBytes("a")))
991      .timeRange(TimeRange.between(0, 100))
992      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
993    assertFalse(result.isSuccess());
994    assertNull(result.getResult());
995
996    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
997  }
998
999  @Test
1000  public void testCheckAndIncrement() throws Throwable {
1001    AsyncTable<?> table = getTable.get();
1002
1003    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1004
1005    // CheckAndIncrement with correct value
1006    CheckAndMutateResult res = table.checkAndMutate(
1007      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1008        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)))
1009      .get();
1010    assertTrue(res.isSuccess());
1011    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1012
1013    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1014    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1015
1016    // CheckAndIncrement with wrong value
1017    res = table.checkAndMutate(
1018      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1019        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)))
1020      .get();
1021    assertFalse(res.isSuccess());
1022    assertNull(res.getResult());
1023
1024    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1025    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1026
1027    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1028
1029    // CheckAndIncrement with a filter and correct value
1030    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1031      .ifMatches(new FilterList(
1032        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1033          Bytes.toBytes("a")),
1034        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1035          Bytes.toBytes("c"))))
1036      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1037    assertTrue(res.isSuccess());
1038    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1039
1040    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1041    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1042
1043    // CheckAndIncrement with a filter and correct value
1044    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1045      .ifMatches(new FilterList(
1046        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1047          Bytes.toBytes("b")),
1048        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1049          Bytes.toBytes("d"))))
1050      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1051    assertFalse(res.isSuccess());
1052    assertNull(res.getResult());
1053
1054    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1055    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1056  }
1057
1058  @Test
1059  public void testCheckAndAppend() throws Throwable {
1060    AsyncTable<?> table = getTable.get();
1061
1062    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1063
1064    // CheckAndAppend with correct value
1065    CheckAndMutateResult res =
1066      table
1067        .checkAndMutate(
1068          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1069            .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))))
1070        .get();
1071    assertTrue(res.isSuccess());
1072    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1073
1074    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1075    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1076
1077    // CheckAndAppend with correct value
1078    res =
1079      table
1080        .checkAndMutate(
1081          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1082            .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))))
1083        .get();
1084    assertFalse(res.isSuccess());
1085    assertNull(res.getResult());
1086
1087    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1088    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1089
1090    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1091
1092    // CheckAndAppend with a filter and correct value
1093    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1094      .ifMatches(new FilterList(
1095        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1096          Bytes.toBytes("a")),
1097        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1098          Bytes.toBytes("c"))))
1099      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1100    assertTrue(res.isSuccess());
1101    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1102
1103    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1104    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1105
1106    // CheckAndAppend with a filter and wrong value
1107    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1108      .ifMatches(new FilterList(
1109        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1110          Bytes.toBytes("b")),
1111        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1112          Bytes.toBytes("d"))))
1113      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1114    assertFalse(res.isSuccess());
1115    assertNull(res.getResult());
1116
1117    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1118    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1119  }
1120
1121  @Test
1122  public void testCheckAndRowMutations() throws Throwable {
1123    final byte[] q1 = Bytes.toBytes("q1");
1124    final byte[] q2 = Bytes.toBytes("q2");
1125    final byte[] q3 = Bytes.toBytes("q3");
1126    final byte[] q4 = Bytes.toBytes("q4");
1127    final String v1 = "v1";
1128
1129    AsyncTable<?> table = getTable.get();
1130
1131    // Initial values
1132    table.putAll(Arrays.asList(new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
1133      new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
1134      new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
1135
1136    // Do CheckAndRowMutations
1137    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1).build(
1138      new RowMutations(row).add(Arrays.asList(new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
1139        new Delete(row).addColumns(FAMILY, q2), new Increment(row).addColumn(FAMILY, q3, 1),
1140        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))));
1141
1142    CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
1143    assertTrue(result.isSuccess());
1144    assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
1145    assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
1146
1147    // Verify the value
1148    Result r = table.get(new Get(row)).get();
1149    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1150    assertNull(r.getValue(FAMILY, q2));
1151    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1152    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1153
1154    // Do CheckAndRowMutations again
1155    checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1)
1156      .build(new RowMutations(row).add(Arrays.asList(new Delete(row).addColumns(FAMILY, q1),
1157        new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
1158        new Increment(row).addColumn(FAMILY, q3, 1),
1159        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))));
1160
1161    result = table.checkAndMutate(checkAndMutate).get();
1162    assertFalse(result.isSuccess());
1163    assertNull(result.getResult());
1164
1165    // Verify the value
1166    r = table.get(new Get(row)).get();
1167    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1168    assertNull(r.getValue(FAMILY, q2));
1169    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1170    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1171  }
1172
1173  // Tests for batch version of checkAndMutate
1174
1175  @Test
1176  public void testCheckAndMutateBatch() throws Throwable {
1177    AsyncTable<?> table = getTable.get();
1178    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1179    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1180    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1181
1182    table
1183      .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1184        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1185        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1186        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))))
1187      .get();
1188
1189    // Test for Put
1190    CheckAndMutate checkAndMutate1 =
1191      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1192        .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1193
1194    CheckAndMutate checkAndMutate2 =
1195      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1196        .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1197
1198    List<CheckAndMutateResult> results =
1199      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1200
1201    assertTrue(results.get(0).isSuccess());
1202    assertNull(results.get(0).getResult());
1203    assertFalse(results.get(1).isSuccess());
1204    assertNull(results.get(1).getResult());
1205
1206    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1207    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1208
1209    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1210    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1211
1212    // Test for Delete
1213    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1214      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")).build(new Delete(row));
1215
1216    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1217      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")).build(new Delete(row2));
1218
1219    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1220
1221    assertTrue(results.get(0).isSuccess());
1222    assertNull(results.get(0).getResult());
1223    assertFalse(results.get(1).isSuccess());
1224    assertNull(results.get(1).getResult());
1225
1226    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
1227
1228    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1229    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1230
1231    // Test for RowMutations
1232    checkAndMutate1 =
1233      CheckAndMutate.newBuilder(row3).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1234        .build(new RowMutations(row3)
1235          .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1236          .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
1237
1238    checkAndMutate2 =
1239      CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
1240        .build(new RowMutations(row4)
1241          .add((Mutation) new Put(row4).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1242          .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
1243
1244    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1245
1246    assertTrue(results.get(0).isSuccess());
1247    assertNull(results.get(0).getResult());
1248    assertFalse(results.get(1).isSuccess());
1249    assertNull(results.get(1).getResult());
1250
1251    result = table.get(new Get(row3)).get();
1252    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1253    assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
1254
1255    result = table.get(new Get(row4)).get();
1256    assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
1257    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1258  }
1259
1260  @Test
1261  public void testCheckAndMutateBatch2() throws Throwable {
1262    AsyncTable<?> table = getTable.get();
1263    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1264    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1265    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1266
1267    table
1268      .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1269        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1270        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
1271        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))))
1272      .get();
1273
1274    // Test for ifNotExists()
1275    CheckAndMutate checkAndMutate1 =
1276      CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, Bytes.toBytes("B"))
1277        .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1278
1279    CheckAndMutate checkAndMutate2 =
1280      CheckAndMutate.newBuilder(row2).ifNotExists(FAMILY, Bytes.toBytes("B"))
1281        .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1282
1283    List<CheckAndMutateResult> results =
1284      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1285
1286    assertTrue(results.get(0).isSuccess());
1287    assertNull(results.get(0).getResult());
1288    assertFalse(results.get(1).isSuccess());
1289    assertNull(results.get(1).getResult());
1290
1291    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1292    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1293
1294    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1295    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1296
1297    // Test for ifMatches()
1298    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1299      .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
1300      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
1301
1302    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1303      .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
1304      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1305
1306    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1307
1308    assertTrue(results.get(0).isSuccess());
1309    assertNull(results.get(0).getResult());
1310    assertFalse(results.get(1).isSuccess());
1311    assertNull(results.get(1).getResult());
1312
1313    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1314    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1315
1316    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1317    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1318
1319    // Test for timeRange()
1320    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1321      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).timeRange(TimeRange.between(0, 101))
1322      .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
1323
1324    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1325      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")).timeRange(TimeRange.between(0, 100))
1326      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
1327
1328    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1329
1330    assertTrue(results.get(0).isSuccess());
1331    assertNull(results.get(0).getResult());
1332    assertFalse(results.get(1).isSuccess());
1333    assertNull(results.get(1).getResult());
1334
1335    result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1336    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1337
1338    result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1339    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1340  }
1341
1342  @Test
1343  public void testCheckAndMutateBatchWithFilter() throws Throwable {
1344    AsyncTable<?> table = getTable.get();
1345    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1346
1347    table.putAll(Arrays.asList(
1348      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1349        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1350        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1351      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1352        .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
1353        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))))
1354      .get();
1355
1356    // Test for Put
1357    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1358      .ifMatches(new FilterList(
1359        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1360          Bytes.toBytes("a")),
1361        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1362          Bytes.toBytes("b"))))
1363      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1364
1365    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1366      .ifMatches(new FilterList(
1367        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1368          Bytes.toBytes("a")),
1369        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1370          Bytes.toBytes("b"))))
1371      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1372
1373    List<CheckAndMutateResult> results =
1374      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1375
1376    assertTrue(results.get(0).isSuccess());
1377    assertNull(results.get(0).getResult());
1378    assertFalse(results.get(1).isSuccess());
1379    assertNull(results.get(1).getResult());
1380
1381    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1382    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1383
1384    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1385    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1386
1387    // Test for Delete
1388    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1389      .ifMatches(new FilterList(
1390        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1391          Bytes.toBytes("a")),
1392        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1393          Bytes.toBytes("b"))))
1394      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
1395
1396    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1397      .ifMatches(new FilterList(
1398        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1399          Bytes.toBytes("a")),
1400        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1401          Bytes.toBytes("b"))))
1402      .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
1403
1404    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1405
1406    assertTrue(results.get(0).isSuccess());
1407    assertNull(results.get(0).getResult());
1408    assertFalse(results.get(1).isSuccess());
1409    assertNull(results.get(1).getResult());
1410
1411    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1412
1413    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1414    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1415
1416    // Test for RowMutations
1417    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1418      .ifMatches(new FilterList(
1419        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1420          Bytes.toBytes("a")),
1421        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1422          Bytes.toBytes("b"))))
1423      .build(new RowMutations(row)
1424        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
1425        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
1426
1427    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1428      .ifMatches(new FilterList(
1429        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1430          Bytes.toBytes("a")),
1431        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1432          Bytes.toBytes("b"))))
1433      .build(new RowMutations(row2)
1434        .add((Mutation) new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
1435        .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
1436
1437    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1438
1439    assertTrue(results.get(0).isSuccess());
1440    assertNull(results.get(0).getResult());
1441    assertFalse(results.get(1).isSuccess());
1442    assertNull(results.get(1).getResult());
1443
1444    result = table.get(new Get(row)).get();
1445    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
1446    assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1447
1448    result = table.get(new Get(row2)).get();
1449    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1450    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1451  }
1452
1453  @Test
1454  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
1455    AsyncTable<?> table = getTable.get();
1456    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1457
1458    table.putAll(Arrays.asList(
1459      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
1460        .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
1461        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1462      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
1463        .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
1464        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))))
1465      .get();
1466
1467    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1468      .ifMatches(new FilterList(
1469        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1470          Bytes.toBytes("a")),
1471        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1472          Bytes.toBytes("b"))))
1473      .timeRange(TimeRange.between(0, 101))
1474      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1475
1476    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1477      .ifMatches(new FilterList(
1478        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1479          Bytes.toBytes("d")),
1480        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1481          Bytes.toBytes("e"))))
1482      .timeRange(TimeRange.between(0, 100))
1483      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1484
1485    List<CheckAndMutateResult> results =
1486      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1487
1488    assertTrue(results.get(0).isSuccess());
1489    assertNull(results.get(0).getResult());
1490    assertFalse(results.get(1).isSuccess());
1491    assertNull(results.get(1).getResult());
1492
1493    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1494    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1495
1496    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1497    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1498  }
1499
1500  @Test
1501  public void testCheckAndIncrementBatch() throws Throwable {
1502    AsyncTable<?> table = getTable.get();
1503    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1504
1505    table.putAll(Arrays.asList(
1506      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY,
1507        Bytes.toBytes("B"), Bytes.toBytes(0L)),
1508      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY,
1509        Bytes.toBytes("D"), Bytes.toBytes(0L))))
1510      .get();
1511
1512    // CheckAndIncrement with correct value
1513    CheckAndMutate checkAndMutate1 =
1514      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1515        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
1516
1517    // CheckAndIncrement with wrong value
1518    CheckAndMutate checkAndMutate2 =
1519      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1520        .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
1521
1522    List<CheckAndMutateResult> results =
1523      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1524
1525    assertTrue(results.get(0).isSuccess());
1526    assertEquals(1, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1527    assertFalse(results.get(1).isSuccess());
1528    assertNull(results.get(1).getResult());
1529
1530    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1531    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1532
1533    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1534    assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
1535  }
1536
1537  @Test
1538  public void testCheckAndAppendBatch() throws Throwable {
1539    AsyncTable<?> table = getTable.get();
1540    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1541
1542    table.putAll(Arrays.asList(
1543      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY,
1544        Bytes.toBytes("B"), Bytes.toBytes("b")),
1545      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY,
1546        Bytes.toBytes("D"), Bytes.toBytes("d"))))
1547      .get();
1548
1549    // CheckAndAppend with correct value
1550    CheckAndMutate checkAndMutate1 =
1551      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1552        .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
1553
1554    // CheckAndAppend with wrong value
1555    CheckAndMutate checkAndMutate2 =
1556      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1557        .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
1558
1559    List<CheckAndMutateResult> results =
1560      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1561
1562    assertTrue(results.get(0).isSuccess());
1563    assertEquals("bb",
1564      Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1565    assertFalse(results.get(1).isSuccess());
1566    assertNull(results.get(1).getResult());
1567
1568    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1569    assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1570
1571    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1572    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1573  }
1574
1575  @Test
1576  public void testCheckAndRowMutationsBatch() throws Throwable {
1577    AsyncTable<?> table = getTable.get();
1578    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1579
1580    table.putAll(Arrays.asList(
1581      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1582        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
1583        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
1584      new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
1585        .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
1586        .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))))
1587      .get();
1588
1589    // CheckAndIncrement with correct value
1590    CheckAndMutate checkAndMutate1 =
1591      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1592        .build(new RowMutations(row)
1593          .add(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1594            new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
1595            new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
1596            new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))));
1597
1598    // CheckAndIncrement with wrong value
1599    CheckAndMutate checkAndMutate2 =
1600      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
1601        .build(new RowMutations(row2).add(
1602          Arrays.asList(new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
1603            new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
1604            new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
1605            new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))));
1606
1607    List<CheckAndMutateResult> results =
1608      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1609
1610    assertTrue(results.get(0).isSuccess());
1611    assertEquals(2, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("C"))));
1612    assertEquals("dd",
1613      Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("D"))));
1614
1615    assertFalse(results.get(1).isSuccess());
1616    assertNull(results.get(1).getResult());
1617
1618    Result result = table.get(new Get(row)).get();
1619    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1620    assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
1621    assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
1622    assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1623
1624    result = table.get(new Get(row2)).get();
1625    assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
1626    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1627    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
1628    assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
1629  }
1630
1631  @Test
1632  public void testDisabled() throws InterruptedException, ExecutionException {
1633    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
1634    try {
1635      getTable.get().get(new Get(row)).get();
1636      fail("Should fail since table has been disabled");
1637    } catch (ExecutionException e) {
1638      Throwable cause = e.getCause();
1639      assertThat(cause, instanceOf(TableNotEnabledException.class));
1640      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
1641    }
1642  }
1643
1644  @Test
1645  public void testInvalidMutation() throws Exception {
1646    Consumer<Mutation> executeMutation = mutation -> {
1647      if (mutation instanceof Put) {
1648        getTable.get().put((Put) mutation);
1649      } else if (mutation instanceof Increment) {
1650        getTable.get().increment((Increment) mutation);
1651      } else if (mutation instanceof Append) {
1652        getTable.get().append((Append) mutation);
1653      }
1654    };
1655
1656    Mutation[] emptyMutations =
1657      { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) };
1658
1659    String[] emptyMessages =
1660      { "No columns to put", "No columns to increment", "No columns to append" };
1661
1662    Mutation[] oversizedMutations =
1663      { new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]),
1664        new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
1665        new Append(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) };
1666
1667    for (int i = 0; i < emptyMutations.length; i++) {
1668      // Test empty mutation
1669      try {
1670        executeMutation.accept(emptyMutations[i]);
1671        fail("Should fail since the mutation does not contain any cells");
1672      } catch (IllegalArgumentException e) {
1673        assertThat(e.getMessage(), containsString(emptyMessages[i]));
1674      }
1675
1676      // Test oversized mutation
1677      try {
1678        executeMutation.accept(oversizedMutations[i]);
1679        fail("Should fail since the mutation exceeds the max key value size");
1680      } catch (IllegalArgumentException e) {
1681        assertThat(e.getMessage(), containsString("KeyValue size too large"));
1682      }
1683    }
1684  }
1685
1686  @Test
1687  public void testInvalidMutationInRowMutations() throws IOException {
1688    final byte[] row = Bytes.toBytes(0);
1689
1690    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
1691
1692    String[] emptyMessages =
1693      { "No columns to put", "No columns to increment", "No columns to append" };
1694
1695    Mutation[] oversizedMutations =
1696      { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]),
1697        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
1698        new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) };
1699
1700    for (int i = 0; i < emptyMutations.length; i++) {
1701      // Test empty mutation
1702      try {
1703        getTable.get().mutateRow(new RowMutations(row).add(emptyMutations[i]));
1704        fail("Should fail since the mutation does not contain any cells");
1705      } catch (IllegalArgumentException e) {
1706        assertThat(e.getMessage(), containsString(emptyMessages[i]));
1707      }
1708
1709      // Test oversized mutation
1710      try {
1711        getTable.get().mutateRow(new RowMutations(row).add(oversizedMutations[i]));
1712        fail("Should fail since the mutation exceeds the max key value size");
1713      } catch (IllegalArgumentException e) {
1714        assertThat(e.getMessage(), containsString("KeyValue size too large"));
1715      }
1716    }
1717  }
1718
1719  @Test
1720  public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException {
1721    final byte[] row = Bytes.toBytes(0);
1722
1723    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
1724
1725    String[] emptyMessages =
1726      { "No columns to put", "No columns to increment", "No columns to append" };
1727
1728    Mutation[] oversizedMutations =
1729      { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]),
1730        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
1731        new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) };
1732
1733    for (int i = 0; i < emptyMutations.length; i++) {
1734      // Test empty mutation
1735      try {
1736        getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
1737          .build(new RowMutations(row).add(emptyMutations[i])));
1738        fail("Should fail since the mutation does not contain any cells");
1739      } catch (IllegalArgumentException e) {
1740        assertThat(e.getMessage(), containsString(emptyMessages[i]));
1741      }
1742
1743      // Test oversized mutation
1744      try {
1745        getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
1746          .build(new RowMutations(row).add(oversizedMutations[i])));
1747        fail("Should fail since the mutation exceeds the max key value size");
1748      } catch (IllegalArgumentException e) {
1749        assertThat(e.getMessage(), containsString("KeyValue size too large"));
1750      }
1751    }
1752  }
1753}