001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.jupiter.api.Assertions.assertArrayEquals;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertFalse;
026import static org.junit.jupiter.api.Assertions.assertNull;
027import static org.junit.jupiter.api.Assertions.assertThrows;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029import static org.junit.jupiter.api.Assertions.fail;
030
031import java.io.IOException;
032import java.io.UncheckedIOException;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.List;
036import java.util.concurrent.ArrayBlockingQueue;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CountDownLatch;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.ForkJoinPool;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicLong;
043import java.util.function.Consumer;
044import java.util.function.Supplier;
045import java.util.stream.IntStream;
046import java.util.stream.Stream;
047import org.apache.hadoop.hbase.CompareOperator;
048import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
049import org.apache.hadoop.hbase.HBaseTestingUtil;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.TableNotEnabledException;
052import org.apache.hadoop.hbase.filter.BinaryComparator;
053import org.apache.hadoop.hbase.filter.FamilyFilter;
054import org.apache.hadoop.hbase.filter.FilterList;
055import org.apache.hadoop.hbase.filter.QualifierFilter;
056import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
057import org.apache.hadoop.hbase.filter.TimestampsFilter;
058import org.apache.hadoop.hbase.io.TimeRange;
059import org.apache.hadoop.hbase.testclassification.ClientTests;
060import org.apache.hadoop.hbase.testclassification.MediumTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
063import org.apache.hadoop.hbase.util.Pair;
064import org.junit.jupiter.api.AfterAll;
065import org.junit.jupiter.api.BeforeAll;
066import org.junit.jupiter.api.BeforeEach;
067import org.junit.jupiter.api.Tag;
068import org.junit.jupiter.api.TestTemplate;
069import org.junit.jupiter.params.provider.Arguments;
070
071import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
072
073@Tag(MediumTests.TAG)
074@Tag(ClientTests.TAG)
075@HBaseParameterizedTestTemplate
076public class TestAsyncTable {
077
078  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
079
080  private static TableName TABLE_NAME = TableName.valueOf("async");
081
082  private static byte[] FAMILY = Bytes.toBytes("cf");
083
084  private static byte[] QUALIFIER = Bytes.toBytes("cq");
085
086  private static byte[] VALUE = Bytes.toBytes("value");
087
088  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
089
090  private static AsyncConnection ASYNC_CONN;
091
092  private static final AtomicInteger ROW_COUNTER = new AtomicInteger(0);
093
094  private byte[] row;
095
096  private final Supplier<AsyncTable<?>> getTable;
097
098  public TestAsyncTable(Supplier<AsyncTable<?>> getTable) {
099    this.getTable = getTable;
100  }
101
102  private static AsyncTable<?> getRawTable() {
103    return ASYNC_CONN.getTable(TABLE_NAME);
104  }
105
106  private static AsyncTable<?> getTable() {
107    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
108  }
109
110  public static Stream<Arguments> parameters() {
111    return Stream.of(Arguments.of((Supplier<AsyncTable<?>>) TestAsyncTable::getRawTable),
112      Arguments.of((Supplier<AsyncTable<?>>) TestAsyncTable::getTable));
113  }
114
115  @BeforeAll
116  public static void setUpBeforeClass() throws Exception {
117    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
118      MAX_KEY_VALUE_SIZE);
119    TEST_UTIL.startMiniCluster(1);
120    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
121    TEST_UTIL.waitTableAvailable(TABLE_NAME);
122    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
123    assertFalse(ASYNC_CONN.isClosed());
124  }
125
126  @AfterAll
127  public static void tearDownAfterClass() throws Exception {
128    Closeables.close(ASYNC_CONN, true);
129    assertTrue(ASYNC_CONN.isClosed());
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  @BeforeEach
134  public void setUp() throws IOException, InterruptedException, ExecutionException {
135    row = Bytes.toBytes("row" + ROW_COUNTER.getAndIncrement());
136    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
137      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
138    }
139  }
140
141  @TestTemplate
142  public void testSimple() throws Exception {
143    AsyncTable<?> table = getTable.get();
144    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
145    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
146    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
147    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
148    table.delete(new Delete(row)).get();
149    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
150    assertTrue(result.isEmpty());
151    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
152  }
153
154  private byte[] concat(byte[] base, int index) {
155    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
156  }
157
158  @SuppressWarnings("FutureReturnValueIgnored")
159  @TestTemplate
160  public void testSimpleMultiple() throws Exception {
161    AsyncTable<?> table = getTable.get();
162    int count = 100;
163    CountDownLatch putLatch = new CountDownLatch(count);
164    IntStream.range(0, count).forEach(
165      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
166        .thenAccept(x -> putLatch.countDown()));
167    putLatch.await();
168    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
169    IntStream.range(0, count)
170      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
171        .thenAccept(x -> existsResp.add(x)));
172    for (int i = 0; i < count; i++) {
173      assertTrue(existsResp.take());
174    }
175    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
176    IntStream.range(0, count)
177      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
178        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
179    for (int i = 0; i < count; i++) {
180      Pair<Integer, Result> pair = getResp.take();
181      assertArrayEquals(concat(VALUE, pair.getFirst()),
182        pair.getSecond().getValue(FAMILY, QUALIFIER));
183    }
184    CountDownLatch deleteLatch = new CountDownLatch(count);
185    IntStream.range(0, count).forEach(
186      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
187    deleteLatch.await();
188    IntStream.range(0, count)
189      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
190        .thenAccept(x -> existsResp.add(x)));
191    for (int i = 0; i < count; i++) {
192      assertFalse(existsResp.take());
193    }
194    IntStream.range(0, count)
195      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
196        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
197    for (int i = 0; i < count; i++) {
198      Pair<Integer, Result> pair = getResp.take();
199      assertTrue(pair.getSecond().isEmpty());
200    }
201  }
202
203  @SuppressWarnings("FutureReturnValueIgnored")
204  @TestTemplate
205  public void testIncrement() throws InterruptedException, ExecutionException {
206    AsyncTable<?> table = getTable.get();
207    int count = 100;
208    CountDownLatch latch = new CountDownLatch(count);
209    AtomicLong sum = new AtomicLong(0L);
210    IntStream.range(0, count)
211      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
212        sum.addAndGet(x);
213        latch.countDown();
214      }));
215    latch.await();
216    assertEquals(count, Bytes.toLong(
217      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
218    assertEquals((1 + count) * count / 2, sum.get());
219  }
220
221  @SuppressWarnings("FutureReturnValueIgnored")
222  @TestTemplate
223  public void testAppend() throws InterruptedException, ExecutionException {
224    AsyncTable<?> table = getTable.get();
225    int count = 10;
226    CountDownLatch latch = new CountDownLatch(count);
227    char suffix = ':';
228    AtomicLong suffixCount = new AtomicLong(0L);
229    IntStream.range(0, count)
230      .forEachOrdered(i -> table
231        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
232        .thenAccept(r -> {
233          suffixCount.addAndGet(
234            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
235          latch.countDown();
236        }));
237    latch.await();
238    assertEquals((1 + count) * count / 2, suffixCount.get());
239    String value = Bytes.toString(
240      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
241    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
242      .sorted().toArray();
243    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
244  }
245
246  @TestTemplate
247  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
248    AsyncTable<?> table = getTable.get();
249    RowMutations mutation = new RowMutations(row);
250    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
251    Result result = table.mutateRow(mutation).get();
252    assertTrue(result.getExists());
253    assertTrue(result.isEmpty());
254
255    result = table.get(new Get(row)).get();
256    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
257
258    mutation = new RowMutations(row);
259    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
260    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
261    mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
262    mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
263    result = table.mutateRow(mutation).get();
264    assertTrue(result.getExists());
265    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
266    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
267
268    result = table.get(new Get(row)).get();
269    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
270    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
271    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
272    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
273  }
274
275  // Tests for old checkAndMutate API
276
277  @SuppressWarnings("FutureReturnValueIgnored")
278  @TestTemplate
279  @Deprecated
280  public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
281    AsyncTable<?> table = getTable.get();
282    AtomicInteger successCount = new AtomicInteger(0);
283    AtomicInteger successIndex = new AtomicInteger(-1);
284    int count = 10;
285    CountDownLatch latch = new CountDownLatch(count);
286    IntStream.range(0, count)
287      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
288        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
289          if (x) {
290            successCount.incrementAndGet();
291            successIndex.set(i);
292          }
293          latch.countDown();
294        }));
295    latch.await();
296    assertEquals(1, successCount.get());
297    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
298    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
299  }
300
301  @SuppressWarnings("FutureReturnValueIgnored")
302  @TestTemplate
303  @Deprecated
304  public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
305    AsyncTable<?> table = getTable.get();
306    int count = 10;
307    CountDownLatch putLatch = new CountDownLatch(count + 1);
308    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
309    IntStream.range(0, count)
310      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
311        .thenRun(() -> putLatch.countDown()));
312    putLatch.await();
313
314    AtomicInteger successCount = new AtomicInteger(0);
315    AtomicInteger successIndex = new AtomicInteger(-1);
316    CountDownLatch deleteLatch = new CountDownLatch(count);
317    IntStream.range(0, count)
318      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
319        .thenDelete(
320          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
321        .thenAccept(x -> {
322          if (x) {
323            successCount.incrementAndGet();
324            successIndex.set(i);
325          }
326          deleteLatch.countDown();
327        }));
328    deleteLatch.await();
329    assertEquals(1, successCount.get());
330    Result result = table.get(new Get(row)).get();
331    IntStream.range(0, count).forEach(i -> {
332      if (i == successIndex.get()) {
333        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
334      } else {
335        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
336      }
337    });
338  }
339
340  @SuppressWarnings("FutureReturnValueIgnored")
341  @TestTemplate
342  @Deprecated
343  public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
344    AsyncTable<?> table = getTable.get();
345    int count = 10;
346    CountDownLatch putLatch = new CountDownLatch(count + 1);
347    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
348    IntStream.range(0, count)
349      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
350        .thenRun(() -> putLatch.countDown()));
351    putLatch.await();
352
353    AtomicInteger successCount = new AtomicInteger(0);
354    AtomicInteger successIndex = new AtomicInteger(-1);
355    CountDownLatch mutateLatch = new CountDownLatch(count);
356    IntStream.range(0, count).forEach(i -> {
357      RowMutations mutation = new RowMutations(row);
358      try {
359        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
360        mutation
361          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
362      } catch (IOException e) {
363        throw new UncheckedIOException(e);
364      }
365      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
366        .thenAccept(x -> {
367          if (x) {
368            successCount.incrementAndGet();
369            successIndex.set(i);
370          }
371          mutateLatch.countDown();
372        });
373    });
374    mutateLatch.await();
375    assertEquals(1, successCount.get());
376    Result result = table.get(new Get(row)).get();
377    IntStream.range(0, count).forEach(i -> {
378      if (i == successIndex.get()) {
379        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
380      } else {
381        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
382      }
383    });
384  }
385
386  @TestTemplate
387  @Deprecated
388  public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
389    AsyncTable<?> table = getTable.get();
390    final long ts = EnvironmentEdgeManager.currentTime() / 2;
391    Put put = new Put(row);
392    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
393
394    boolean ok =
395      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
396    assertTrue(ok);
397
398    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
399      .ifEquals(VALUE).thenPut(put).get();
400    assertFalse(ok);
401
402    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
403      .ifEquals(VALUE).thenPut(put).get();
404    assertTrue(ok);
405
406    RowMutations rm = new RowMutations(row).add((Mutation) put);
407
408    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
409      .ifEquals(VALUE).thenMutate(rm).get();
410    assertFalse(ok);
411
412    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
413      .ifEquals(VALUE).thenMutate(rm).get();
414    assertTrue(ok);
415
416    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
417
418    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
419      .ifEquals(VALUE).thenDelete(delete).get();
420    assertFalse(ok);
421
422    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
423      .ifEquals(VALUE).thenDelete(delete).get();
424    assertTrue(ok);
425  }
426
427  @TestTemplate
428  @Deprecated
429  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
430    AsyncTable<?> table = getTable.get();
431
432    // Put one row
433    Put put = new Put(row);
434    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
435    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
436    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
437    table.put(put).get();
438
439    // Put with success
440    boolean ok = table
441      .checkAndMutate(row,
442        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
443          Bytes.toBytes("a")))
444      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get();
445    assertTrue(ok);
446
447    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
448    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
449
450    // Put with failure
451    ok = table
452      .checkAndMutate(row,
453        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
454          Bytes.toBytes("b")))
455      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get();
456    assertFalse(ok);
457
458    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
459
460    // Delete with success
461    ok = table
462      .checkAndMutate(row,
463        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
464          Bytes.toBytes("a")))
465      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get();
466    assertTrue(ok);
467
468    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
469
470    // Mutate with success
471    ok = table
472      .checkAndMutate(row,
473        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
474          Bytes.toBytes("b")))
475      .thenMutate(new RowMutations(row)
476        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
477        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
478      .get();
479    assertTrue(ok);
480
481    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
482    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
483
484    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
485  }
486
487  @TestTemplate
488  @Deprecated
489  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
490    AsyncTable<?> table = getTable.get();
491
492    // Put one row
493    Put put = new Put(row);
494    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
495    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
496    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
497    table.put(put).get();
498
499    // Put with success
500    boolean ok = table
501      .checkAndMutate(row,
502        new FilterList(
503          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
504            Bytes.toBytes("a")),
505          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
506            Bytes.toBytes("b"))))
507      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get();
508    assertTrue(ok);
509
510    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
511    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
512
513    // Put with failure
514    ok = table
515      .checkAndMutate(row,
516        new FilterList(
517          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
518            Bytes.toBytes("a")),
519          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
520            Bytes.toBytes("c"))))
521      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get();
522    assertFalse(ok);
523
524    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
525
526    // Delete with success
527    ok = table
528      .checkAndMutate(row,
529        new FilterList(
530          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
531            Bytes.toBytes("a")),
532          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
533            Bytes.toBytes("b"))))
534      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get();
535    assertTrue(ok);
536
537    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
538
539    // Mutate with success
540    ok = table
541      .checkAndMutate(row,
542        new FilterList(
543          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
544            Bytes.toBytes("a")),
545          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
546            Bytes.toBytes("b"))))
547      .thenMutate(new RowMutations(row)
548        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
549        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
550      .get();
551    assertTrue(ok);
552
553    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
554    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
555
556    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
557  }
558
559  @TestTemplate
560  @Deprecated
561  public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
562    AsyncTable<?> table = getTable.get();
563
564    // Put with specifying the timestamp
565    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
566
567    // Put with success
568    boolean ok = table
569      .checkAndMutate(row,
570        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
571          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
572          new TimestampsFilter(Collections.singletonList(100L))))
573      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get();
574    assertTrue(ok);
575
576    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
577    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
578
579    // Put with failure
580    ok = table
581      .checkAndMutate(row,
582        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
583          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
584          new TimestampsFilter(Collections.singletonList(101L))))
585      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get();
586    assertFalse(ok);
587
588    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
589  }
590
591  @TestTemplate
592  @Deprecated
593  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
594    AsyncTable<?> table = getTable.get();
595
596    // Put with specifying the timestamp
597    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
598
599    // Put with success
600    boolean ok = table
601      .checkAndMutate(row,
602        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
603          Bytes.toBytes("a")))
604      .timeRange(TimeRange.between(0, 101))
605      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get();
606    assertTrue(ok);
607
608    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
609    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
610
611    // Put with failure
612    ok = table
613      .checkAndMutate(row,
614        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
615          Bytes.toBytes("a")))
616      .timeRange(TimeRange.between(0, 100))
617      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get();
618    assertFalse(ok);
619
620    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
621  }
622
623  @TestTemplate
624  @Deprecated
625  public void testCheckAndMutateWithoutConditionForOldApi() {
626    assertThrows(NullPointerException.class, () -> {
627      getTable.get().checkAndMutate(row, FAMILY)
628        .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
629    });
630  }
631
632  // Tests for new CheckAndMutate API
633
634  @SuppressWarnings("FutureReturnValueIgnored")
635  @TestTemplate
636  public void testCheckAndPut() throws InterruptedException, ExecutionException {
637    AsyncTable<?> table = getTable.get();
638    AtomicInteger successCount = new AtomicInteger(0);
639    AtomicInteger successIndex = new AtomicInteger(-1);
640    int count = 10;
641    CountDownLatch latch = new CountDownLatch(count);
642
643    IntStream.range(0, count).forEach(
644      i -> table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
645        .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))).thenAccept(x -> {
646          if (x.isSuccess()) {
647            successCount.incrementAndGet();
648            successIndex.set(i);
649          }
650          assertNull(x.getResult());
651          latch.countDown();
652        }));
653    latch.await();
654    assertEquals(1, successCount.get());
655    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
656    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
657  }
658
659  @SuppressWarnings("FutureReturnValueIgnored")
660  @TestTemplate
661  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
662    AsyncTable<?> table = getTable.get();
663    int count = 10;
664    CountDownLatch putLatch = new CountDownLatch(count + 1);
665    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
666    IntStream.range(0, count)
667      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
668        .thenRun(() -> putLatch.countDown()));
669    putLatch.await();
670
671    AtomicInteger successCount = new AtomicInteger(0);
672    AtomicInteger successIndex = new AtomicInteger(-1);
673    CountDownLatch deleteLatch = new CountDownLatch(count);
674
675    IntStream.range(0, count)
676      .forEach(i -> table
677        .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(
678          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
679        .thenAccept(x -> {
680          if (x.isSuccess()) {
681            successCount.incrementAndGet();
682            successIndex.set(i);
683          }
684          assertNull(x.getResult());
685          deleteLatch.countDown();
686        }));
687    deleteLatch.await();
688    assertEquals(1, successCount.get());
689    Result result = table.get(new Get(row)).get();
690    IntStream.range(0, count).forEach(i -> {
691      if (i == successIndex.get()) {
692        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
693      } else {
694        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
695      }
696    });
697  }
698
699  @SuppressWarnings("FutureReturnValueIgnored")
700  @TestTemplate
701  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
702    AsyncTable<?> table = getTable.get();
703    int count = 10;
704    CountDownLatch putLatch = new CountDownLatch(count + 1);
705    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
706    IntStream.range(0, count)
707      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
708        .thenRun(() -> putLatch.countDown()));
709    putLatch.await();
710
711    AtomicInteger successCount = new AtomicInteger(0);
712    AtomicInteger successIndex = new AtomicInteger(-1);
713    CountDownLatch mutateLatch = new CountDownLatch(count);
714    IntStream.range(0, count).forEach(i -> {
715      RowMutations mutation = new RowMutations(row);
716      try {
717        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
718        mutation
719          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
720      } catch (IOException e) {
721        throw new UncheckedIOException(e);
722      }
723
724      table
725        .checkAndMutate(
726          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(mutation))
727        .thenAccept(x -> {
728          if (x.isSuccess()) {
729            successCount.incrementAndGet();
730            successIndex.set(i);
731          }
732          assertNull(x.getResult());
733          mutateLatch.countDown();
734        });
735    });
736    mutateLatch.await();
737    assertEquals(1, successCount.get());
738    Result result = table.get(new Get(row)).get();
739    IntStream.range(0, count).forEach(i -> {
740      if (i == successIndex.get()) {
741        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
742      } else {
743        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
744      }
745    });
746  }
747
748  @TestTemplate
749  public void testCheckAndMutateWithTimeRange() throws Exception {
750    AsyncTable<?> table = getTable.get();
751    final long ts = EnvironmentEdgeManager.currentTime() / 2;
752    Put put = new Put(row);
753    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
754
755    CheckAndMutateResult result =
756      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(put))
757        .get();
758    assertTrue(result.isSuccess());
759    assertNull(result.getResult());
760
761    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
762      .timeRange(TimeRange.at(ts + 10000)).build(put)).get();
763    assertFalse(result.isSuccess());
764    assertNull(result.getResult());
765
766    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
767      .timeRange(TimeRange.at(ts)).build(put)).get();
768    assertTrue(result.isSuccess());
769    assertNull(result.getResult());
770
771    RowMutations rm = new RowMutations(row).add((Mutation) put);
772
773    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
774      .timeRange(TimeRange.at(ts + 10000)).build(rm)).get();
775    assertFalse(result.isSuccess());
776    assertNull(result.getResult());
777
778    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
779      .timeRange(TimeRange.at(ts)).build(rm)).get();
780    assertTrue(result.isSuccess());
781    assertNull(result.getResult());
782
783    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
784
785    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
786      .timeRange(TimeRange.at(ts + 10000)).build(delete)).get();
787    assertFalse(result.isSuccess());
788    assertNull(result.getResult());
789
790    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
791      .timeRange(TimeRange.at(ts)).build(delete)).get();
792    assertTrue(result.isSuccess());
793    assertNull(result.getResult());
794  }
795
796  @TestTemplate
797  public void testCheckAndMutateWithSingleFilter() throws Throwable {
798    AsyncTable<?> table = getTable.get();
799
800    // Put one row
801    Put put = new Put(row);
802    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
803    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
804    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
805    table.put(put).get();
806
807    // Put with success
808    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
809      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
810        Bytes.toBytes("a")))
811      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
812    assertTrue(result.isSuccess());
813    assertNull(result.getResult());
814
815    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
816    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
817
818    // Put with failure
819    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
820      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
821        Bytes.toBytes("b")))
822      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
823    assertFalse(result.isSuccess());
824    assertNull(result.getResult());
825
826    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
827
828    // Delete with success
829    result =
830      table
831        .checkAndMutate(CheckAndMutate.newBuilder(row)
832          .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
833            Bytes.toBytes("a")))
834          .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))))
835        .get();
836    assertTrue(result.isSuccess());
837    assertNull(result.getResult());
838
839    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
840
841    // Mutate with success
842    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
843      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
844        Bytes.toBytes("b")))
845      .build(new RowMutations(row)
846        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
847        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))))
848      .get();
849    assertTrue(result.isSuccess());
850    assertNull(result.getResult());
851
852    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
853    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
854
855    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
856  }
857
858  @TestTemplate
859  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
860    AsyncTable<?> table = getTable.get();
861
862    // Put one row
863    Put put = new Put(row);
864    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
865    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
866    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
867    table.put(put).get();
868
869    // Put with success
870    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
871      .ifMatches(new FilterList(
872        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
873          Bytes.toBytes("a")),
874        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
875          Bytes.toBytes("b"))))
876      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
877    assertTrue(result.isSuccess());
878    assertNull(result.getResult());
879
880    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
881    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
882
883    // Put with failure
884    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
885      .ifMatches(new FilterList(
886        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
887          Bytes.toBytes("a")),
888        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
889          Bytes.toBytes("c"))))
890      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
891    assertFalse(result.isSuccess());
892    assertNull(result.getResult());
893
894    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
895
896    // Delete with success
897    result = table
898      .checkAndMutate(CheckAndMutate.newBuilder(row)
899        .ifMatches(new FilterList(
900          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
901            Bytes.toBytes("a")),
902          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
903            Bytes.toBytes("b"))))
904        .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))))
905      .get();
906    assertTrue(result.isSuccess());
907    assertNull(result.getResult());
908
909    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
910
911    // Mutate with success
912    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
913      .ifMatches(new FilterList(
914        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
915          Bytes.toBytes("a")),
916        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
917          Bytes.toBytes("b"))))
918      .build(new RowMutations(row)
919        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
920        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))))
921      .get();
922    assertTrue(result.isSuccess());
923    assertNull(result.getResult());
924
925    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
926    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
927
928    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
929  }
930
931  @TestTemplate
932  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
933    AsyncTable<?> table = getTable.get();
934
935    // Put with specifying the timestamp
936    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
937
938    // Put with success
939    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
940      .ifMatches(
941        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
942          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
943          new TimestampsFilter(Collections.singletonList(100L))))
944      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
945    assertTrue(result.isSuccess());
946    assertNull(result.getResult());
947
948    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
949    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
950
951    // Put with failure
952    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
953      .ifMatches(
954        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
955          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
956          new TimestampsFilter(Collections.singletonList(101L))))
957      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
958    assertFalse(result.isSuccess());
959    assertNull(result.getResult());
960
961    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
962  }
963
964  @TestTemplate
965  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
966    AsyncTable<?> table = getTable.get();
967
968    // Put with specifying the timestamp
969    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
970
971    // Put with success
972    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
973      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
974        Bytes.toBytes("a")))
975      .timeRange(TimeRange.between(0, 101))
976      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
977    assertTrue(result.isSuccess());
978    assertNull(result.getResult());
979
980    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
981    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
982
983    // Put with failure
984    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
985      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
986        Bytes.toBytes("a")))
987      .timeRange(TimeRange.between(0, 100))
988      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
989    assertFalse(result.isSuccess());
990    assertNull(result.getResult());
991
992    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
993  }
994
995  @TestTemplate
996  public void testCheckAndIncrement() throws Throwable {
997    AsyncTable<?> table = getTable.get();
998
999    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1000
1001    // CheckAndIncrement with correct value
1002    CheckAndMutateResult res = table.checkAndMutate(
1003      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1004        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)))
1005      .get();
1006    assertTrue(res.isSuccess());
1007    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1008
1009    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1010    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1011
1012    // CheckAndIncrement with wrong value
1013    res = table.checkAndMutate(
1014      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1015        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)))
1016      .get();
1017    assertFalse(res.isSuccess());
1018    assertNull(res.getResult());
1019
1020    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1021    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1022
1023    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1024
1025    // CheckAndIncrement with a filter and correct value
1026    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1027      .ifMatches(new FilterList(
1028        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1029          Bytes.toBytes("a")),
1030        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1031          Bytes.toBytes("c"))))
1032      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1033    assertTrue(res.isSuccess());
1034    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1035
1036    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1037    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1038
1039    // CheckAndIncrement with a filter and correct value
1040    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1041      .ifMatches(new FilterList(
1042        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1043          Bytes.toBytes("b")),
1044        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1045          Bytes.toBytes("d"))))
1046      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1047    assertFalse(res.isSuccess());
1048    assertNull(res.getResult());
1049
1050    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1051    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1052  }
1053
1054  @TestTemplate
1055  public void testCheckAndAppend() throws Throwable {
1056    AsyncTable<?> table = getTable.get();
1057
1058    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1059
1060    // CheckAndAppend with correct value
1061    CheckAndMutateResult res =
1062      table
1063        .checkAndMutate(
1064          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1065            .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))))
1066        .get();
1067    assertTrue(res.isSuccess());
1068    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1069
1070    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1071    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1072
1073    // CheckAndAppend with correct value
1074    res =
1075      table
1076        .checkAndMutate(
1077          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1078            .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))))
1079        .get();
1080    assertFalse(res.isSuccess());
1081    assertNull(res.getResult());
1082
1083    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1084    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1085
1086    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1087
1088    // CheckAndAppend with a filter and correct value
1089    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1090      .ifMatches(new FilterList(
1091        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1092          Bytes.toBytes("a")),
1093        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1094          Bytes.toBytes("c"))))
1095      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1096    assertTrue(res.isSuccess());
1097    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1098
1099    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1100    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1101
1102    // CheckAndAppend with a filter and wrong value
1103    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1104      .ifMatches(new FilterList(
1105        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1106          Bytes.toBytes("b")),
1107        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1108          Bytes.toBytes("d"))))
1109      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1110    assertFalse(res.isSuccess());
1111    assertNull(res.getResult());
1112
1113    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1114    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1115  }
1116
1117  @TestTemplate
1118  public void testCheckAndRowMutations() throws Throwable {
1119    final byte[] q1 = Bytes.toBytes("q1");
1120    final byte[] q2 = Bytes.toBytes("q2");
1121    final byte[] q3 = Bytes.toBytes("q3");
1122    final byte[] q4 = Bytes.toBytes("q4");
1123    final String v1 = "v1";
1124
1125    AsyncTable<?> table = getTable.get();
1126
1127    // Initial values
1128    table.putAll(Arrays.asList(new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
1129      new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
1130      new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
1131
1132    // Do CheckAndRowMutations
1133    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1).build(
1134      new RowMutations(row).add(Arrays.asList(new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
1135        new Delete(row).addColumns(FAMILY, q2), new Increment(row).addColumn(FAMILY, q3, 1),
1136        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))));
1137
1138    CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
1139    assertTrue(result.isSuccess());
1140    assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
1141    assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
1142
1143    // Verify the value
1144    Result r = table.get(new Get(row)).get();
1145    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1146    assertNull(r.getValue(FAMILY, q2));
1147    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1148    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1149
1150    // Do CheckAndRowMutations again
1151    checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1)
1152      .build(new RowMutations(row).add(Arrays.asList(new Delete(row).addColumns(FAMILY, q1),
1153        new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
1154        new Increment(row).addColumn(FAMILY, q3, 1),
1155        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))));
1156
1157    result = table.checkAndMutate(checkAndMutate).get();
1158    assertFalse(result.isSuccess());
1159    assertNull(result.getResult());
1160
1161    // Verify the value
1162    r = table.get(new Get(row)).get();
1163    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1164    assertNull(r.getValue(FAMILY, q2));
1165    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1166    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1167  }
1168
1169  // Tests for batch version of checkAndMutate
1170
1171  @TestTemplate
1172  public void testCheckAndMutateBatch() throws Throwable {
1173    AsyncTable<?> table = getTable.get();
1174    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1175    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1176    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1177
1178    table
1179      .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1180        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1181        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1182        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))))
1183      .get();
1184
1185    // Test for Put
1186    CheckAndMutate checkAndMutate1 =
1187      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1188        .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1189
1190    CheckAndMutate checkAndMutate2 =
1191      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1192        .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1193
1194    List<CheckAndMutateResult> results =
1195      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1196
1197    assertTrue(results.get(0).isSuccess());
1198    assertNull(results.get(0).getResult());
1199    assertFalse(results.get(1).isSuccess());
1200    assertNull(results.get(1).getResult());
1201
1202    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1203    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1204
1205    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1206    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1207
1208    // Test for Delete
1209    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1210      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")).build(new Delete(row));
1211
1212    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1213      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")).build(new Delete(row2));
1214
1215    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1216
1217    assertTrue(results.get(0).isSuccess());
1218    assertNull(results.get(0).getResult());
1219    assertFalse(results.get(1).isSuccess());
1220    assertNull(results.get(1).getResult());
1221
1222    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
1223
1224    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1225    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1226
1227    // Test for RowMutations
1228    checkAndMutate1 =
1229      CheckAndMutate.newBuilder(row3).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1230        .build(new RowMutations(row3)
1231          .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1232          .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
1233
1234    checkAndMutate2 =
1235      CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
1236        .build(new RowMutations(row4)
1237          .add((Mutation) new Put(row4).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1238          .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
1239
1240    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1241
1242    assertTrue(results.get(0).isSuccess());
1243    assertNull(results.get(0).getResult());
1244    assertFalse(results.get(1).isSuccess());
1245    assertNull(results.get(1).getResult());
1246
1247    result = table.get(new Get(row3)).get();
1248    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1249    assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
1250
1251    result = table.get(new Get(row4)).get();
1252    assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
1253    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1254  }
1255
1256  @TestTemplate
1257  public void testCheckAndMutateBatch2() throws Throwable {
1258    AsyncTable<?> table = getTable.get();
1259    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1260    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1261    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1262
1263    table
1264      .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1265        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1266        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
1267        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))))
1268      .get();
1269
1270    // Test for ifNotExists()
1271    CheckAndMutate checkAndMutate1 =
1272      CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, Bytes.toBytes("B"))
1273        .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1274
1275    CheckAndMutate checkAndMutate2 =
1276      CheckAndMutate.newBuilder(row2).ifNotExists(FAMILY, Bytes.toBytes("B"))
1277        .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1278
1279    List<CheckAndMutateResult> results =
1280      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1281
1282    assertTrue(results.get(0).isSuccess());
1283    assertNull(results.get(0).getResult());
1284    assertFalse(results.get(1).isSuccess());
1285    assertNull(results.get(1).getResult());
1286
1287    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1288    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1289
1290    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1291    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1292
1293    // Test for ifMatches()
1294    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1295      .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
1296      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
1297
1298    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1299      .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
1300      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1301
1302    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1303
1304    assertTrue(results.get(0).isSuccess());
1305    assertNull(results.get(0).getResult());
1306    assertFalse(results.get(1).isSuccess());
1307    assertNull(results.get(1).getResult());
1308
1309    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1310    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1311
1312    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1313    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1314
1315    // Test for timeRange()
1316    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1317      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).timeRange(TimeRange.between(0, 101))
1318      .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
1319
1320    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1321      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")).timeRange(TimeRange.between(0, 100))
1322      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
1323
1324    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1325
1326    assertTrue(results.get(0).isSuccess());
1327    assertNull(results.get(0).getResult());
1328    assertFalse(results.get(1).isSuccess());
1329    assertNull(results.get(1).getResult());
1330
1331    result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1332    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1333
1334    result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1335    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1336  }
1337
1338  @TestTemplate
1339  public void testCheckAndMutateBatchWithFilter() throws Throwable {
1340    AsyncTable<?> table = getTable.get();
1341    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1342
1343    table.putAll(Arrays.asList(
1344      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1345        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1346        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1347      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1348        .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
1349        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))))
1350      .get();
1351
1352    // Test for Put
1353    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1354      .ifMatches(new FilterList(
1355        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1356          Bytes.toBytes("a")),
1357        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1358          Bytes.toBytes("b"))))
1359      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1360
1361    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1362      .ifMatches(new FilterList(
1363        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1364          Bytes.toBytes("a")),
1365        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1366          Bytes.toBytes("b"))))
1367      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1368
1369    List<CheckAndMutateResult> results =
1370      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1371
1372    assertTrue(results.get(0).isSuccess());
1373    assertNull(results.get(0).getResult());
1374    assertFalse(results.get(1).isSuccess());
1375    assertNull(results.get(1).getResult());
1376
1377    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1378    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1379
1380    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1381    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1382
1383    // Test for Delete
1384    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1385      .ifMatches(new FilterList(
1386        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1387          Bytes.toBytes("a")),
1388        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1389          Bytes.toBytes("b"))))
1390      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
1391
1392    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1393      .ifMatches(new FilterList(
1394        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1395          Bytes.toBytes("a")),
1396        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1397          Bytes.toBytes("b"))))
1398      .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
1399
1400    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1401
1402    assertTrue(results.get(0).isSuccess());
1403    assertNull(results.get(0).getResult());
1404    assertFalse(results.get(1).isSuccess());
1405    assertNull(results.get(1).getResult());
1406
1407    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1408
1409    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1410    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1411
1412    // Test for RowMutations
1413    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1414      .ifMatches(new FilterList(
1415        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1416          Bytes.toBytes("a")),
1417        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1418          Bytes.toBytes("b"))))
1419      .build(new RowMutations(row)
1420        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
1421        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
1422
1423    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1424      .ifMatches(new FilterList(
1425        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1426          Bytes.toBytes("a")),
1427        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1428          Bytes.toBytes("b"))))
1429      .build(new RowMutations(row2)
1430        .add((Mutation) new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
1431        .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
1432
1433    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1434
1435    assertTrue(results.get(0).isSuccess());
1436    assertNull(results.get(0).getResult());
1437    assertFalse(results.get(1).isSuccess());
1438    assertNull(results.get(1).getResult());
1439
1440    result = table.get(new Get(row)).get();
1441    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
1442    assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1443
1444    result = table.get(new Get(row2)).get();
1445    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1446    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1447  }
1448
1449  @TestTemplate
1450  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
1451    AsyncTable<?> table = getTable.get();
1452    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1453
1454    table.putAll(Arrays.asList(
1455      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
1456        .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
1457        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1458      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
1459        .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
1460        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))))
1461      .get();
1462
1463    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1464      .ifMatches(new FilterList(
1465        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1466          Bytes.toBytes("a")),
1467        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1468          Bytes.toBytes("b"))))
1469      .timeRange(TimeRange.between(0, 101))
1470      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1471
1472    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1473      .ifMatches(new FilterList(
1474        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1475          Bytes.toBytes("d")),
1476        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1477          Bytes.toBytes("e"))))
1478      .timeRange(TimeRange.between(0, 100))
1479      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1480
1481    List<CheckAndMutateResult> results =
1482      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1483
1484    assertTrue(results.get(0).isSuccess());
1485    assertNull(results.get(0).getResult());
1486    assertFalse(results.get(1).isSuccess());
1487    assertNull(results.get(1).getResult());
1488
1489    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1490    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1491
1492    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1493    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1494  }
1495
1496  @TestTemplate
1497  public void testCheckAndIncrementBatch() throws Throwable {
1498    AsyncTable<?> table = getTable.get();
1499    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1500
1501    table.putAll(Arrays.asList(
1502      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY,
1503        Bytes.toBytes("B"), Bytes.toBytes(0L)),
1504      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY,
1505        Bytes.toBytes("D"), Bytes.toBytes(0L))))
1506      .get();
1507
1508    // CheckAndIncrement with correct value
1509    CheckAndMutate checkAndMutate1 =
1510      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1511        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
1512
1513    // CheckAndIncrement with wrong value
1514    CheckAndMutate checkAndMutate2 =
1515      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1516        .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
1517
1518    List<CheckAndMutateResult> results =
1519      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1520
1521    assertTrue(results.get(0).isSuccess());
1522    assertEquals(1, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1523    assertFalse(results.get(1).isSuccess());
1524    assertNull(results.get(1).getResult());
1525
1526    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1527    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1528
1529    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1530    assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
1531  }
1532
1533  @TestTemplate
1534  public void testCheckAndAppendBatch() throws Throwable {
1535    AsyncTable<?> table = getTable.get();
1536    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1537
1538    table.putAll(Arrays.asList(
1539      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY,
1540        Bytes.toBytes("B"), Bytes.toBytes("b")),
1541      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY,
1542        Bytes.toBytes("D"), Bytes.toBytes("d"))))
1543      .get();
1544
1545    // CheckAndAppend with correct value
1546    CheckAndMutate checkAndMutate1 =
1547      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1548        .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
1549
1550    // CheckAndAppend with wrong value
1551    CheckAndMutate checkAndMutate2 =
1552      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1553        .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
1554
1555    List<CheckAndMutateResult> results =
1556      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1557
1558    assertTrue(results.get(0).isSuccess());
1559    assertEquals("bb",
1560      Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1561    assertFalse(results.get(1).isSuccess());
1562    assertNull(results.get(1).getResult());
1563
1564    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1565    assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1566
1567    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1568    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1569  }
1570
1571  @TestTemplate
1572  public void testCheckAndRowMutationsBatch() throws Throwable {
1573    AsyncTable<?> table = getTable.get();
1574    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1575
1576    table.putAll(Arrays.asList(
1577      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1578        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
1579        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
1580      new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
1581        .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
1582        .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))))
1583      .get();
1584
1585    // CheckAndIncrement with correct value
1586    CheckAndMutate checkAndMutate1 =
1587      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1588        .build(new RowMutations(row)
1589          .add(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1590            new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
1591            new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
1592            new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))));
1593
1594    // CheckAndIncrement with wrong value
1595    CheckAndMutate checkAndMutate2 =
1596      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
1597        .build(new RowMutations(row2).add(
1598          Arrays.asList(new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
1599            new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
1600            new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
1601            new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))));
1602
1603    List<CheckAndMutateResult> results =
1604      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1605
1606    assertTrue(results.get(0).isSuccess());
1607    assertEquals(2, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("C"))));
1608    assertEquals("dd",
1609      Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("D"))));
1610
1611    assertFalse(results.get(1).isSuccess());
1612    assertNull(results.get(1).getResult());
1613
1614    Result result = table.get(new Get(row)).get();
1615    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1616    assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
1617    assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
1618    assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1619
1620    result = table.get(new Get(row2)).get();
1621    assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
1622    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1623    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
1624    assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
1625  }
1626
1627  @TestTemplate
1628  public void testDisabled() throws InterruptedException, ExecutionException {
1629    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
1630    try {
1631      getTable.get().get(new Get(row)).get();
1632      fail("Should fail since table has been disabled");
1633    } catch (ExecutionException e) {
1634      Throwable cause = e.getCause();
1635      assertThat(cause, instanceOf(TableNotEnabledException.class));
1636      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
1637    }
1638  }
1639
1640  @TestTemplate
1641  public void testInvalidMutation() throws Exception {
1642    Consumer<Mutation> executeMutation = mutation -> {
1643      if (mutation instanceof Put) {
1644        getTable.get().put((Put) mutation);
1645      } else if (mutation instanceof Increment) {
1646        getTable.get().increment((Increment) mutation);
1647      } else if (mutation instanceof Append) {
1648        getTable.get().append((Append) mutation);
1649      }
1650    };
1651
1652    Mutation[] emptyMutations =
1653      { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) };
1654
1655    String[] emptyMessages =
1656      { "No columns to put", "No columns to increment", "No columns to append" };
1657
1658    Mutation[] oversizedMutations =
1659      { new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]),
1660        new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
1661        new Append(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) };
1662
1663    for (int i = 0; i < emptyMutations.length; i++) {
1664      // Test empty mutation
1665      try {
1666        executeMutation.accept(emptyMutations[i]);
1667        fail("Should fail since the mutation does not contain any cells");
1668      } catch (IllegalArgumentException e) {
1669        assertThat(e.getMessage(), containsString(emptyMessages[i]));
1670      }
1671
1672      // Test oversized mutation
1673      try {
1674        executeMutation.accept(oversizedMutations[i]);
1675        fail("Should fail since the mutation exceeds the max key value size");
1676      } catch (IllegalArgumentException e) {
1677        assertThat(e.getMessage(), containsString("KeyValue size too large"));
1678      }
1679    }
1680  }
1681
1682  @TestTemplate
1683  public void testInvalidMutationInRowMutations() throws IOException {
1684    final byte[] row = Bytes.toBytes(0);
1685
1686    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
1687
1688    String[] emptyMessages =
1689      { "No columns to put", "No columns to increment", "No columns to append" };
1690
1691    Mutation[] oversizedMutations =
1692      { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]),
1693        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
1694        new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) };
1695
1696    for (int i = 0; i < emptyMutations.length; i++) {
1697      // Test empty mutation
1698      try {
1699        getTable.get().mutateRow(new RowMutations(row).add(emptyMutations[i]));
1700        fail("Should fail since the mutation does not contain any cells");
1701      } catch (IllegalArgumentException e) {
1702        assertThat(e.getMessage(), containsString(emptyMessages[i]));
1703      }
1704
1705      // Test oversized mutation
1706      try {
1707        getTable.get().mutateRow(new RowMutations(row).add(oversizedMutations[i]));
1708        fail("Should fail since the mutation exceeds the max key value size");
1709      } catch (IllegalArgumentException e) {
1710        assertThat(e.getMessage(), containsString("KeyValue size too large"));
1711      }
1712    }
1713  }
1714
1715  @TestTemplate
1716  public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException {
1717    final byte[] row = Bytes.toBytes(0);
1718
1719    Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) };
1720
1721    String[] emptyMessages =
1722      { "No columns to put", "No columns to increment", "No columns to append" };
1723
1724    Mutation[] oversizedMutations =
1725      { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]),
1726        new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1),
1727        new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) };
1728
1729    for (int i = 0; i < emptyMutations.length; i++) {
1730      // Test empty mutation
1731      try {
1732        getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
1733          .build(new RowMutations(row).add(emptyMutations[i])));
1734        fail("Should fail since the mutation does not contain any cells");
1735      } catch (IllegalArgumentException e) {
1736        assertThat(e.getMessage(), containsString(emptyMessages[i]));
1737      }
1738
1739      // Test oversized mutation
1740      try {
1741        getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
1742          .build(new RowMutations(row).add(oversizedMutations[i])));
1743        fail("Should fail since the mutation exceeds the max key value size");
1744      } catch (IllegalArgumentException e) {
1745        assertThat(e.getMessage(), containsString("KeyValue size too large"));
1746      }
1747    }
1748  }
1749}