001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.MatcherAssert.assertThat;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ArrayBlockingQueue;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.function.Supplier;
043import java.util.stream.IntStream;
044import org.apache.hadoop.hbase.CompareOperator;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtil;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.TableNotEnabledException;
049import org.apache.hadoop.hbase.filter.BinaryComparator;
050import org.apache.hadoop.hbase.filter.FamilyFilter;
051import org.apache.hadoop.hbase.filter.FilterList;
052import org.apache.hadoop.hbase.filter.QualifierFilter;
053import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
054import org.apache.hadoop.hbase.filter.TimestampsFilter;
055import org.apache.hadoop.hbase.io.TimeRange;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Pair;
061import org.junit.AfterClass;
062import org.junit.Before;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Rule;
066import org.junit.Test;
067import org.junit.experimental.categories.Category;
068import org.junit.rules.TestName;
069import org.junit.runner.RunWith;
070import org.junit.runners.Parameterized;
071import org.junit.runners.Parameterized.Parameter;
072import org.junit.runners.Parameterized.Parameters;
073
074import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
075
076@RunWith(Parameterized.class)
077@Category({ MediumTests.class, ClientTests.class })
078public class TestAsyncTable {
079
080  @ClassRule
081  public static final HBaseClassTestRule CLASS_RULE =
082    HBaseClassTestRule.forClass(TestAsyncTable.class);
083
084  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
085
086  private static TableName TABLE_NAME = TableName.valueOf("async");
087
088  private static byte[] FAMILY = Bytes.toBytes("cf");
089
090  private static byte[] QUALIFIER = Bytes.toBytes("cq");
091
092  private static byte[] VALUE = Bytes.toBytes("value");
093
094  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
095
096  private static AsyncConnection ASYNC_CONN;
097
098  @Rule
099  public TestName testName = new TestName();
100
101  private byte[] row;
102
103  @Parameter
104  public Supplier<AsyncTable<?>> getTable;
105
106  private static AsyncTable<?> getRawTable() {
107    return ASYNC_CONN.getTable(TABLE_NAME);
108  }
109
110  private static AsyncTable<?> getTable() {
111    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
112  }
113
114  @Parameters
115  public static List<Object[]> params() {
116    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
117      new Supplier<?>[] { TestAsyncTable::getTable });
118  }
119
120  @BeforeClass
121  public static void setUpBeforeClass() throws Exception {
122    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
123      MAX_KEY_VALUE_SIZE);
124    TEST_UTIL.startMiniCluster(1);
125    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
126    TEST_UTIL.waitTableAvailable(TABLE_NAME);
127    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
128    assertFalse(ASYNC_CONN.isClosed());
129  }
130
131  @AfterClass
132  public static void tearDownAfterClass() throws Exception {
133    Closeables.close(ASYNC_CONN, true);
134    assertTrue(ASYNC_CONN.isClosed());
135    TEST_UTIL.shutdownMiniCluster();
136  }
137
138  @Before
139  public void setUp() throws IOException, InterruptedException, ExecutionException {
140    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
141    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
142      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
143    }
144  }
145
146  @Test
147  public void testSimple() throws Exception {
148    AsyncTable<?> table = getTable.get();
149    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
150    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
151    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
152    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
153    table.delete(new Delete(row)).get();
154    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
155    assertTrue(result.isEmpty());
156    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
157  }
158
159  private byte[] concat(byte[] base, int index) {
160    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
161  }
162
163  @SuppressWarnings("FutureReturnValueIgnored")
164  @Test
165  public void testSimpleMultiple() throws Exception {
166    AsyncTable<?> table = getTable.get();
167    int count = 100;
168    CountDownLatch putLatch = new CountDownLatch(count);
169    IntStream.range(0, count).forEach(
170      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
171        .thenAccept(x -> putLatch.countDown()));
172    putLatch.await();
173    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
174    IntStream.range(0, count)
175      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
176        .thenAccept(x -> existsResp.add(x)));
177    for (int i = 0; i < count; i++) {
178      assertTrue(existsResp.take());
179    }
180    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
181    IntStream.range(0, count)
182      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
183        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
184    for (int i = 0; i < count; i++) {
185      Pair<Integer, Result> pair = getResp.take();
186      assertArrayEquals(concat(VALUE, pair.getFirst()),
187        pair.getSecond().getValue(FAMILY, QUALIFIER));
188    }
189    CountDownLatch deleteLatch = new CountDownLatch(count);
190    IntStream.range(0, count).forEach(
191      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
192    deleteLatch.await();
193    IntStream.range(0, count)
194      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
195        .thenAccept(x -> existsResp.add(x)));
196    for (int i = 0; i < count; i++) {
197      assertFalse(existsResp.take());
198    }
199    IntStream.range(0, count)
200      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
201        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
202    for (int i = 0; i < count; i++) {
203      Pair<Integer, Result> pair = getResp.take();
204      assertTrue(pair.getSecond().isEmpty());
205    }
206  }
207
208  @SuppressWarnings("FutureReturnValueIgnored")
209  @Test
210  public void testIncrement() throws InterruptedException, ExecutionException {
211    AsyncTable<?> table = getTable.get();
212    int count = 100;
213    CountDownLatch latch = new CountDownLatch(count);
214    AtomicLong sum = new AtomicLong(0L);
215    IntStream.range(0, count)
216      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
217        sum.addAndGet(x);
218        latch.countDown();
219      }));
220    latch.await();
221    assertEquals(count, Bytes.toLong(
222      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
223    assertEquals((1 + count) * count / 2, sum.get());
224  }
225
226  @SuppressWarnings("FutureReturnValueIgnored")
227  @Test
228  public void testAppend() throws InterruptedException, ExecutionException {
229    AsyncTable<?> table = getTable.get();
230    int count = 10;
231    CountDownLatch latch = new CountDownLatch(count);
232    char suffix = ':';
233    AtomicLong suffixCount = new AtomicLong(0L);
234    IntStream.range(0, count)
235      .forEachOrdered(i -> table
236        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
237        .thenAccept(r -> {
238          suffixCount.addAndGet(
239            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
240          latch.countDown();
241        }));
242    latch.await();
243    assertEquals((1 + count) * count / 2, suffixCount.get());
244    String value = Bytes.toString(
245      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
246    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
247      .sorted().toArray();
248    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
249  }
250
251  @Test
252  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
253    AsyncTable<?> table = getTable.get();
254    RowMutations mutation = new RowMutations(row);
255    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
256    Result result = table.mutateRow(mutation).get();
257    assertTrue(result.getExists());
258    assertTrue(result.isEmpty());
259
260    result = table.get(new Get(row)).get();
261    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
262
263    mutation = new RowMutations(row);
264    mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
265    mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
266    mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
267    mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
268    result = table.mutateRow(mutation).get();
269    assertTrue(result.getExists());
270    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
271    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
272
273    result = table.get(new Get(row)).get();
274    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
275    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
276    assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
277    assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
278  }
279
280  // Tests for old checkAndMutate API
281
282  @SuppressWarnings("FutureReturnValueIgnored")
283  @Test
284  @Deprecated
285  public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
286    AsyncTable<?> table = getTable.get();
287    AtomicInteger successCount = new AtomicInteger(0);
288    AtomicInteger successIndex = new AtomicInteger(-1);
289    int count = 10;
290    CountDownLatch latch = new CountDownLatch(count);
291    IntStream.range(0, count)
292      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
293        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
294          if (x) {
295            successCount.incrementAndGet();
296            successIndex.set(i);
297          }
298          latch.countDown();
299        }));
300    latch.await();
301    assertEquals(1, successCount.get());
302    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
303    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
304  }
305
306  @SuppressWarnings("FutureReturnValueIgnored")
307  @Test
308  @Deprecated
309  public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
310    AsyncTable<?> table = getTable.get();
311    int count = 10;
312    CountDownLatch putLatch = new CountDownLatch(count + 1);
313    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
314    IntStream.range(0, count)
315      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
316        .thenRun(() -> putLatch.countDown()));
317    putLatch.await();
318
319    AtomicInteger successCount = new AtomicInteger(0);
320    AtomicInteger successIndex = new AtomicInteger(-1);
321    CountDownLatch deleteLatch = new CountDownLatch(count);
322    IntStream.range(0, count)
323      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
324        .thenDelete(
325          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
326        .thenAccept(x -> {
327          if (x) {
328            successCount.incrementAndGet();
329            successIndex.set(i);
330          }
331          deleteLatch.countDown();
332        }));
333    deleteLatch.await();
334    assertEquals(1, successCount.get());
335    Result result = table.get(new Get(row)).get();
336    IntStream.range(0, count).forEach(i -> {
337      if (i == successIndex.get()) {
338        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
339      } else {
340        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
341      }
342    });
343  }
344
345  @SuppressWarnings("FutureReturnValueIgnored")
346  @Test
347  @Deprecated
348  public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
349    AsyncTable<?> table = getTable.get();
350    int count = 10;
351    CountDownLatch putLatch = new CountDownLatch(count + 1);
352    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
353    IntStream.range(0, count)
354      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
355        .thenRun(() -> putLatch.countDown()));
356    putLatch.await();
357
358    AtomicInteger successCount = new AtomicInteger(0);
359    AtomicInteger successIndex = new AtomicInteger(-1);
360    CountDownLatch mutateLatch = new CountDownLatch(count);
361    IntStream.range(0, count).forEach(i -> {
362      RowMutations mutation = new RowMutations(row);
363      try {
364        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
365        mutation
366          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
367      } catch (IOException e) {
368        throw new UncheckedIOException(e);
369      }
370      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
371        .thenAccept(x -> {
372          if (x) {
373            successCount.incrementAndGet();
374            successIndex.set(i);
375          }
376          mutateLatch.countDown();
377        });
378    });
379    mutateLatch.await();
380    assertEquals(1, successCount.get());
381    Result result = table.get(new Get(row)).get();
382    IntStream.range(0, count).forEach(i -> {
383      if (i == successIndex.get()) {
384        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
385      } else {
386        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
387      }
388    });
389  }
390
391  @Test
392  @Deprecated
393  public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
394    AsyncTable<?> table = getTable.get();
395    final long ts = EnvironmentEdgeManager.currentTime() / 2;
396    Put put = new Put(row);
397    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
398
399    boolean ok =
400      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
401    assertTrue(ok);
402
403    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
404      .ifEquals(VALUE).thenPut(put).get();
405    assertFalse(ok);
406
407    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
408      .ifEquals(VALUE).thenPut(put).get();
409    assertTrue(ok);
410
411    RowMutations rm = new RowMutations(row).add((Mutation) put);
412
413    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
414      .ifEquals(VALUE).thenMutate(rm).get();
415    assertFalse(ok);
416
417    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
418      .ifEquals(VALUE).thenMutate(rm).get();
419    assertTrue(ok);
420
421    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
422
423    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
424      .ifEquals(VALUE).thenDelete(delete).get();
425    assertFalse(ok);
426
427    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
428      .ifEquals(VALUE).thenDelete(delete).get();
429    assertTrue(ok);
430  }
431
432  @Test
433  @Deprecated
434  public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
435    AsyncTable<?> table = getTable.get();
436
437    // Put one row
438    Put put = new Put(row);
439    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
440    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
441    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
442    table.put(put).get();
443
444    // Put with success
445    boolean ok = table
446      .checkAndMutate(row,
447        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
448          Bytes.toBytes("a")))
449      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get();
450    assertTrue(ok);
451
452    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
453    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
454
455    // Put with failure
456    ok = table
457      .checkAndMutate(row,
458        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
459          Bytes.toBytes("b")))
460      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get();
461    assertFalse(ok);
462
463    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
464
465    // Delete with success
466    ok = table
467      .checkAndMutate(row,
468        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
469          Bytes.toBytes("a")))
470      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get();
471    assertTrue(ok);
472
473    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
474
475    // Mutate with success
476    ok = table
477      .checkAndMutate(row,
478        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
479          Bytes.toBytes("b")))
480      .thenMutate(new RowMutations(row)
481        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
482        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
483      .get();
484    assertTrue(ok);
485
486    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
487    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
488
489    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
490  }
491
492  @Test
493  @Deprecated
494  public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
495    AsyncTable<?> table = getTable.get();
496
497    // Put one row
498    Put put = new Put(row);
499    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
500    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
501    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
502    table.put(put).get();
503
504    // Put with success
505    boolean ok = table
506      .checkAndMutate(row,
507        new FilterList(
508          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
509            Bytes.toBytes("a")),
510          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
511            Bytes.toBytes("b"))))
512      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get();
513    assertTrue(ok);
514
515    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
516    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
517
518    // Put with failure
519    ok = table
520      .checkAndMutate(row,
521        new FilterList(
522          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
523            Bytes.toBytes("a")),
524          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
525            Bytes.toBytes("c"))))
526      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get();
527    assertFalse(ok);
528
529    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
530
531    // Delete with success
532    ok = table
533      .checkAndMutate(row,
534        new FilterList(
535          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
536            Bytes.toBytes("a")),
537          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
538            Bytes.toBytes("b"))))
539      .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get();
540    assertTrue(ok);
541
542    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
543
544    // Mutate with success
545    ok = table
546      .checkAndMutate(row,
547        new FilterList(
548          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
549            Bytes.toBytes("a")),
550          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
551            Bytes.toBytes("b"))))
552      .thenMutate(new RowMutations(row)
553        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
554        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
555      .get();
556    assertTrue(ok);
557
558    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
559    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
560
561    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
562  }
563
564  @Test
565  @Deprecated
566  public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
567    AsyncTable<?> table = getTable.get();
568
569    // Put with specifying the timestamp
570    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
571
572    // Put with success
573    boolean ok = table
574      .checkAndMutate(row,
575        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
576          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
577          new TimestampsFilter(Collections.singletonList(100L))))
578      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get();
579    assertTrue(ok);
580
581    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
582    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
583
584    // Put with failure
585    ok = table
586      .checkAndMutate(row,
587        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
588          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
589          new TimestampsFilter(Collections.singletonList(101L))))
590      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get();
591    assertFalse(ok);
592
593    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
594  }
595
596  @Test
597  @Deprecated
598  public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
599    AsyncTable<?> table = getTable.get();
600
601    // Put with specifying the timestamp
602    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
603
604    // Put with success
605    boolean ok = table
606      .checkAndMutate(row,
607        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
608          Bytes.toBytes("a")))
609      .timeRange(TimeRange.between(0, 101))
610      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get();
611    assertTrue(ok);
612
613    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
614    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
615
616    // Put with failure
617    ok = table
618      .checkAndMutate(row,
619        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
620          Bytes.toBytes("a")))
621      .timeRange(TimeRange.between(0, 100))
622      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get();
623    assertFalse(ok);
624
625    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
626  }
627
628  @Test(expected = NullPointerException.class)
629  @Deprecated
630  public void testCheckAndMutateWithoutConditionForOldApi() {
631    getTable.get().checkAndMutate(row, FAMILY)
632      .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
633  }
634
635  // Tests for new CheckAndMutate API
636
637  @SuppressWarnings("FutureReturnValueIgnored")
638  @Test
639  public void testCheckAndPut() throws InterruptedException, ExecutionException {
640    AsyncTable<?> table = getTable.get();
641    AtomicInteger successCount = new AtomicInteger(0);
642    AtomicInteger successIndex = new AtomicInteger(-1);
643    int count = 10;
644    CountDownLatch latch = new CountDownLatch(count);
645
646    IntStream.range(0, count).forEach(
647      i -> table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
648        .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))).thenAccept(x -> {
649          if (x.isSuccess()) {
650            successCount.incrementAndGet();
651            successIndex.set(i);
652          }
653          assertNull(x.getResult());
654          latch.countDown();
655        }));
656    latch.await();
657    assertEquals(1, successCount.get());
658    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
659    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
660  }
661
662  @SuppressWarnings("FutureReturnValueIgnored")
663  @Test
664  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
665    AsyncTable<?> table = getTable.get();
666    int count = 10;
667    CountDownLatch putLatch = new CountDownLatch(count + 1);
668    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
669    IntStream.range(0, count)
670      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
671        .thenRun(() -> putLatch.countDown()));
672    putLatch.await();
673
674    AtomicInteger successCount = new AtomicInteger(0);
675    AtomicInteger successIndex = new AtomicInteger(-1);
676    CountDownLatch deleteLatch = new CountDownLatch(count);
677
678    IntStream.range(0, count)
679      .forEach(i -> table
680        .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(
681          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
682        .thenAccept(x -> {
683          if (x.isSuccess()) {
684            successCount.incrementAndGet();
685            successIndex.set(i);
686          }
687          assertNull(x.getResult());
688          deleteLatch.countDown();
689        }));
690    deleteLatch.await();
691    assertEquals(1, successCount.get());
692    Result result = table.get(new Get(row)).get();
693    IntStream.range(0, count).forEach(i -> {
694      if (i == successIndex.get()) {
695        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
696      } else {
697        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
698      }
699    });
700  }
701
702  @SuppressWarnings("FutureReturnValueIgnored")
703  @Test
704  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
705    AsyncTable<?> table = getTable.get();
706    int count = 10;
707    CountDownLatch putLatch = new CountDownLatch(count + 1);
708    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
709    IntStream.range(0, count)
710      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
711        .thenRun(() -> putLatch.countDown()));
712    putLatch.await();
713
714    AtomicInteger successCount = new AtomicInteger(0);
715    AtomicInteger successIndex = new AtomicInteger(-1);
716    CountDownLatch mutateLatch = new CountDownLatch(count);
717    IntStream.range(0, count).forEach(i -> {
718      RowMutations mutation = new RowMutations(row);
719      try {
720        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
721        mutation
722          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
723      } catch (IOException e) {
724        throw new UncheckedIOException(e);
725      }
726
727      table
728        .checkAndMutate(
729          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(mutation))
730        .thenAccept(x -> {
731          if (x.isSuccess()) {
732            successCount.incrementAndGet();
733            successIndex.set(i);
734          }
735          assertNull(x.getResult());
736          mutateLatch.countDown();
737        });
738    });
739    mutateLatch.await();
740    assertEquals(1, successCount.get());
741    Result result = table.get(new Get(row)).get();
742    IntStream.range(0, count).forEach(i -> {
743      if (i == successIndex.get()) {
744        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
745      } else {
746        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
747      }
748    });
749  }
750
751  @Test
752  public void testCheckAndMutateWithTimeRange() throws Exception {
753    AsyncTable<?> table = getTable.get();
754    final long ts = EnvironmentEdgeManager.currentTime() / 2;
755    Put put = new Put(row);
756    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
757
758    CheckAndMutateResult result =
759      table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(put))
760        .get();
761    assertTrue(result.isSuccess());
762    assertNull(result.getResult());
763
764    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
765      .timeRange(TimeRange.at(ts + 10000)).build(put)).get();
766    assertFalse(result.isSuccess());
767    assertNull(result.getResult());
768
769    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
770      .timeRange(TimeRange.at(ts)).build(put)).get();
771    assertTrue(result.isSuccess());
772    assertNull(result.getResult());
773
774    RowMutations rm = new RowMutations(row).add((Mutation) put);
775
776    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
777      .timeRange(TimeRange.at(ts + 10000)).build(rm)).get();
778    assertFalse(result.isSuccess());
779    assertNull(result.getResult());
780
781    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
782      .timeRange(TimeRange.at(ts)).build(rm)).get();
783    assertTrue(result.isSuccess());
784    assertNull(result.getResult());
785
786    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
787
788    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
789      .timeRange(TimeRange.at(ts + 10000)).build(delete)).get();
790    assertFalse(result.isSuccess());
791    assertNull(result.getResult());
792
793    result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE)
794      .timeRange(TimeRange.at(ts)).build(delete)).get();
795    assertTrue(result.isSuccess());
796    assertNull(result.getResult());
797  }
798
799  @Test
800  public void testCheckAndMutateWithSingleFilter() throws Throwable {
801    AsyncTable<?> table = getTable.get();
802
803    // Put one row
804    Put put = new Put(row);
805    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
806    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
807    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
808    table.put(put).get();
809
810    // Put with success
811    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
812      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
813        Bytes.toBytes("a")))
814      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
815    assertTrue(result.isSuccess());
816    assertNull(result.getResult());
817
818    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
819    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
820
821    // Put with failure
822    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
823      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
824        Bytes.toBytes("b")))
825      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
826    assertFalse(result.isSuccess());
827    assertNull(result.getResult());
828
829    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
830
831    // Delete with success
832    result =
833      table
834        .checkAndMutate(CheckAndMutate.newBuilder(row)
835          .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
836            Bytes.toBytes("a")))
837          .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))))
838        .get();
839    assertTrue(result.isSuccess());
840    assertNull(result.getResult());
841
842    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
843
844    // Mutate with success
845    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
846      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
847        Bytes.toBytes("b")))
848      .build(new RowMutations(row)
849        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
850        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))))
851      .get();
852    assertTrue(result.isSuccess());
853    assertNull(result.getResult());
854
855    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
856    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
857
858    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
859  }
860
861  @Test
862  public void testCheckAndMutateWithMultipleFilters() throws Throwable {
863    AsyncTable<?> table = getTable.get();
864
865    // Put one row
866    Put put = new Put(row);
867    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
868    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
869    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
870    table.put(put).get();
871
872    // Put with success
873    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
874      .ifMatches(new FilterList(
875        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
876          Bytes.toBytes("a")),
877        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
878          Bytes.toBytes("b"))))
879      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
880    assertTrue(result.isSuccess());
881    assertNull(result.getResult());
882
883    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
884    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
885
886    // Put with failure
887    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
888      .ifMatches(new FilterList(
889        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
890          Bytes.toBytes("a")),
891        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
892          Bytes.toBytes("c"))))
893      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
894    assertFalse(result.isSuccess());
895    assertNull(result.getResult());
896
897    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
898
899    // Delete with success
900    result = table
901      .checkAndMutate(CheckAndMutate.newBuilder(row)
902        .ifMatches(new FilterList(
903          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
904            Bytes.toBytes("a")),
905          new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
906            Bytes.toBytes("b"))))
907        .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))))
908      .get();
909    assertTrue(result.isSuccess());
910    assertNull(result.getResult());
911
912    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
913
914    // Mutate with success
915    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
916      .ifMatches(new FilterList(
917        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
918          Bytes.toBytes("a")),
919        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
920          Bytes.toBytes("b"))))
921      .build(new RowMutations(row)
922        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
923        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))))
924      .get();
925    assertTrue(result.isSuccess());
926    assertNull(result.getResult());
927
928    r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
929    assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
930
931    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
932  }
933
934  @Test
935  public void testCheckAndMutateWithTimestampFilter() throws Throwable {
936    AsyncTable<?> table = getTable.get();
937
938    // Put with specifying the timestamp
939    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
940
941    // Put with success
942    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
943      .ifMatches(
944        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
945          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
946          new TimestampsFilter(Collections.singletonList(100L))))
947      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
948    assertTrue(result.isSuccess());
949    assertNull(result.getResult());
950
951    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
952    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
953
954    // Put with failure
955    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
956      .ifMatches(
957        new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
958          new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
959          new TimestampsFilter(Collections.singletonList(101L))))
960      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
961    assertFalse(result.isSuccess());
962    assertNull(result.getResult());
963
964    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
965  }
966
967  @Test
968  public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
969    AsyncTable<?> table = getTable.get();
970
971    // Put with specifying the timestamp
972    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
973
974    // Put with success
975    CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
976      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
977        Bytes.toBytes("a")))
978      .timeRange(TimeRange.between(0, 101))
979      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
980    assertTrue(result.isSuccess());
981    assertNull(result.getResult());
982
983    Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
984    assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
985
986    // Put with failure
987    result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
988      .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
989        Bytes.toBytes("a")))
990      .timeRange(TimeRange.between(0, 100))
991      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
992    assertFalse(result.isSuccess());
993    assertNull(result.getResult());
994
995    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
996  }
997
998  @Test
999  public void testCheckAndIncrement() throws Throwable {
1000    AsyncTable<?> table = getTable.get();
1001
1002    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1003
1004    // CheckAndIncrement with correct value
1005    CheckAndMutateResult res = table.checkAndMutate(
1006      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1007        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)))
1008      .get();
1009    assertTrue(res.isSuccess());
1010    assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1011
1012    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1013    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1014
1015    // CheckAndIncrement with wrong value
1016    res = table.checkAndMutate(
1017      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1018        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)))
1019      .get();
1020    assertFalse(res.isSuccess());
1021    assertNull(res.getResult());
1022
1023    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1024    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1025
1026    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1027
1028    // CheckAndIncrement with a filter and correct value
1029    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1030      .ifMatches(new FilterList(
1031        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1032          Bytes.toBytes("a")),
1033        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1034          Bytes.toBytes("c"))))
1035      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1036    assertTrue(res.isSuccess());
1037    assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1038
1039    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1040    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1041
1042    // CheckAndIncrement with a filter and correct value
1043    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1044      .ifMatches(new FilterList(
1045        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1046          Bytes.toBytes("b")),
1047        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1048          Bytes.toBytes("d"))))
1049      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
1050    assertFalse(res.isSuccess());
1051    assertNull(res.getResult());
1052
1053    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1054    assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1055  }
1056
1057  @Test
1058  public void testCheckAndAppend() throws Throwable {
1059    AsyncTable<?> table = getTable.get();
1060
1061    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
1062
1063    // CheckAndAppend with correct value
1064    CheckAndMutateResult res =
1065      table
1066        .checkAndMutate(
1067          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1068            .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))))
1069        .get();
1070    assertTrue(res.isSuccess());
1071    assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1072
1073    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1074    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1075
1076    // CheckAndAppend with correct value
1077    res =
1078      table
1079        .checkAndMutate(
1080          CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
1081            .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))))
1082        .get();
1083    assertFalse(res.isSuccess());
1084    assertNull(res.getResult());
1085
1086    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1087    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1088
1089    table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
1090
1091    // CheckAndAppend with a filter and correct value
1092    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1093      .ifMatches(new FilterList(
1094        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1095          Bytes.toBytes("a")),
1096        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1097          Bytes.toBytes("c"))))
1098      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1099    assertTrue(res.isSuccess());
1100    assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1101
1102    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1103    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1104
1105    // CheckAndAppend with a filter and wrong value
1106    res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
1107      .ifMatches(new FilterList(
1108        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1109          Bytes.toBytes("b")),
1110        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
1111          Bytes.toBytes("d"))))
1112      .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
1113    assertFalse(res.isSuccess());
1114    assertNull(res.getResult());
1115
1116    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1117    assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1118  }
1119
1120  @Test
1121  public void testCheckAndRowMutations() throws Throwable {
1122    final byte[] q1 = Bytes.toBytes("q1");
1123    final byte[] q2 = Bytes.toBytes("q2");
1124    final byte[] q3 = Bytes.toBytes("q3");
1125    final byte[] q4 = Bytes.toBytes("q4");
1126    final String v1 = "v1";
1127
1128    AsyncTable<?> table = getTable.get();
1129
1130    // Initial values
1131    table.putAll(Arrays.asList(new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
1132      new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
1133      new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
1134
1135    // Do CheckAndRowMutations
1136    CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1).build(
1137      new RowMutations(row).add(Arrays.asList(new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
1138        new Delete(row).addColumns(FAMILY, q2), new Increment(row).addColumn(FAMILY, q3, 1),
1139        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))));
1140
1141    CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
1142    assertTrue(result.isSuccess());
1143    assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
1144    assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
1145
1146    // Verify the value
1147    Result r = table.get(new Get(row)).get();
1148    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1149    assertNull(r.getValue(FAMILY, q2));
1150    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1151    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1152
1153    // Do CheckAndRowMutations again
1154    checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1)
1155      .build(new RowMutations(row).add(Arrays.asList(new Delete(row).addColumns(FAMILY, q1),
1156        new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
1157        new Increment(row).addColumn(FAMILY, q3, 1),
1158        new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b")))));
1159
1160    result = table.checkAndMutate(checkAndMutate).get();
1161    assertFalse(result.isSuccess());
1162    assertNull(result.getResult());
1163
1164    // Verify the value
1165    r = table.get(new Get(row)).get();
1166    assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
1167    assertNull(r.getValue(FAMILY, q2));
1168    assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
1169    assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
1170  }
1171
1172  // Tests for batch version of checkAndMutate
1173
1174  @Test
1175  public void testCheckAndMutateBatch() throws Throwable {
1176    AsyncTable<?> table = getTable.get();
1177    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1178    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1179    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1180
1181    table
1182      .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1183        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1184        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1185        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))))
1186      .get();
1187
1188    // Test for Put
1189    CheckAndMutate checkAndMutate1 =
1190      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1191        .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1192
1193    CheckAndMutate checkAndMutate2 =
1194      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
1195        .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1196
1197    List<CheckAndMutateResult> results =
1198      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1199
1200    assertTrue(results.get(0).isSuccess());
1201    assertNull(results.get(0).getResult());
1202    assertFalse(results.get(1).isSuccess());
1203    assertNull(results.get(1).getResult());
1204
1205    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1206    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1207
1208    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1209    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1210
1211    // Test for Delete
1212    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1213      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")).build(new Delete(row));
1214
1215    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1216      .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")).build(new Delete(row2));
1217
1218    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1219
1220    assertTrue(results.get(0).isSuccess());
1221    assertNull(results.get(0).getResult());
1222    assertFalse(results.get(1).isSuccess());
1223    assertNull(results.get(1).getResult());
1224
1225    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
1226
1227    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1228    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1229
1230    // Test for RowMutations
1231    checkAndMutate1 =
1232      CheckAndMutate.newBuilder(row3).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
1233        .build(new RowMutations(row3)
1234          .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1235          .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
1236
1237    checkAndMutate2 =
1238      CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
1239        .build(new RowMutations(row4)
1240          .add((Mutation) new Put(row4).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
1241          .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
1242
1243    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1244
1245    assertTrue(results.get(0).isSuccess());
1246    assertNull(results.get(0).getResult());
1247    assertFalse(results.get(1).isSuccess());
1248    assertNull(results.get(1).getResult());
1249
1250    result = table.get(new Get(row3)).get();
1251    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1252    assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
1253
1254    result = table.get(new Get(row4)).get();
1255    assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
1256    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1257  }
1258
1259  @Test
1260  public void testCheckAndMutateBatch2() throws Throwable {
1261    AsyncTable<?> table = getTable.get();
1262    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1263    byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
1264    byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
1265
1266    table
1267      .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1268        new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
1269        new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
1270        new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))))
1271      .get();
1272
1273    // Test for ifNotExists()
1274    CheckAndMutate checkAndMutate1 =
1275      CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, Bytes.toBytes("B"))
1276        .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
1277
1278    CheckAndMutate checkAndMutate2 =
1279      CheckAndMutate.newBuilder(row2).ifNotExists(FAMILY, Bytes.toBytes("B"))
1280        .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1281
1282    List<CheckAndMutateResult> results =
1283      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1284
1285    assertTrue(results.get(0).isSuccess());
1286    assertNull(results.get(0).getResult());
1287    assertFalse(results.get(1).isSuccess());
1288    assertNull(results.get(1).getResult());
1289
1290    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1291    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1292
1293    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1294    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1295
1296    // Test for ifMatches()
1297    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1298      .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
1299      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
1300
1301    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1302      .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
1303      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
1304
1305    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1306
1307    assertTrue(results.get(0).isSuccess());
1308    assertNull(results.get(0).getResult());
1309    assertFalse(results.get(1).isSuccess());
1310    assertNull(results.get(1).getResult());
1311
1312    result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
1313    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1314
1315    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1316    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1317
1318    // Test for timeRange()
1319    checkAndMutate1 = CheckAndMutate.newBuilder(row3)
1320      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).timeRange(TimeRange.between(0, 101))
1321      .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
1322
1323    checkAndMutate2 = CheckAndMutate.newBuilder(row4)
1324      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")).timeRange(TimeRange.between(0, 100))
1325      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
1326
1327    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1328
1329    assertTrue(results.get(0).isSuccess());
1330    assertNull(results.get(0).getResult());
1331    assertFalse(results.get(1).isSuccess());
1332    assertNull(results.get(1).getResult());
1333
1334    result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1335    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1336
1337    result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1338    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1339  }
1340
1341  @Test
1342  public void testCheckAndMutateBatchWithFilter() throws Throwable {
1343    AsyncTable<?> table = getTable.get();
1344    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1345
1346    table.putAll(Arrays.asList(
1347      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1348        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1349        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1350      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
1351        .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
1352        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))))
1353      .get();
1354
1355    // Test for Put
1356    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1357      .ifMatches(new FilterList(
1358        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1359          Bytes.toBytes("a")),
1360        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1361          Bytes.toBytes("b"))))
1362      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1363
1364    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1365      .ifMatches(new FilterList(
1366        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1367          Bytes.toBytes("a")),
1368        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1369          Bytes.toBytes("b"))))
1370      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1371
1372    List<CheckAndMutateResult> results =
1373      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1374
1375    assertTrue(results.get(0).isSuccess());
1376    assertNull(results.get(0).getResult());
1377    assertFalse(results.get(1).isSuccess());
1378    assertNull(results.get(1).getResult());
1379
1380    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1381    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1382
1383    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1384    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1385
1386    // Test for Delete
1387    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1388      .ifMatches(new FilterList(
1389        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1390          Bytes.toBytes("a")),
1391        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1392          Bytes.toBytes("b"))))
1393      .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
1394
1395    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1396      .ifMatches(new FilterList(
1397        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1398          Bytes.toBytes("a")),
1399        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1400          Bytes.toBytes("b"))))
1401      .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
1402
1403    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1404
1405    assertTrue(results.get(0).isSuccess());
1406    assertNull(results.get(0).getResult());
1407    assertFalse(results.get(1).isSuccess());
1408    assertNull(results.get(1).getResult());
1409
1410    assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
1411
1412    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1413    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1414
1415    // Test for RowMutations
1416    checkAndMutate1 = CheckAndMutate.newBuilder(row)
1417      .ifMatches(new FilterList(
1418        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1419          Bytes.toBytes("a")),
1420        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1421          Bytes.toBytes("b"))))
1422      .build(new RowMutations(row)
1423        .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
1424        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
1425
1426    checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1427      .ifMatches(new FilterList(
1428        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1429          Bytes.toBytes("a")),
1430        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1431          Bytes.toBytes("b"))))
1432      .build(new RowMutations(row2)
1433        .add((Mutation) new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
1434        .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
1435
1436    results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1437
1438    assertTrue(results.get(0).isSuccess());
1439    assertNull(results.get(0).getResult());
1440    assertFalse(results.get(1).isSuccess());
1441    assertNull(results.get(1).getResult());
1442
1443    result = table.get(new Get(row)).get();
1444    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
1445    assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1446
1447    result = table.get(new Get(row2)).get();
1448    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1449    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1450  }
1451
1452  @Test
1453  public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
1454    AsyncTable<?> table = getTable.get();
1455    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1456
1457    table.putAll(Arrays.asList(
1458      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
1459        .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
1460        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
1461      new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
1462        .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
1463        .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))))
1464      .get();
1465
1466    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
1467      .ifMatches(new FilterList(
1468        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
1469          Bytes.toBytes("a")),
1470        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
1471          Bytes.toBytes("b"))))
1472      .timeRange(TimeRange.between(0, 101))
1473      .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
1474
1475    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
1476      .ifMatches(new FilterList(
1477        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
1478          Bytes.toBytes("d")),
1479        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
1480          Bytes.toBytes("e"))))
1481      .timeRange(TimeRange.between(0, 100))
1482      .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
1483
1484    List<CheckAndMutateResult> results =
1485      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1486
1487    assertTrue(results.get(0).isSuccess());
1488    assertNull(results.get(0).getResult());
1489    assertFalse(results.get(1).isSuccess());
1490    assertNull(results.get(1).getResult());
1491
1492    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
1493    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
1494
1495    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
1496    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1497  }
1498
1499  @Test
1500  public void testCheckAndIncrementBatch() throws Throwable {
1501    AsyncTable<?> table = getTable.get();
1502    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1503
1504    table.putAll(Arrays.asList(
1505      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY,
1506        Bytes.toBytes("B"), Bytes.toBytes(0L)),
1507      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY,
1508        Bytes.toBytes("D"), Bytes.toBytes(0L))))
1509      .get();
1510
1511    // CheckAndIncrement with correct value
1512    CheckAndMutate checkAndMutate1 =
1513      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1514        .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
1515
1516    // CheckAndIncrement with wrong value
1517    CheckAndMutate checkAndMutate2 =
1518      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1519        .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
1520
1521    List<CheckAndMutateResult> results =
1522      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1523
1524    assertTrue(results.get(0).isSuccess());
1525    assertEquals(1, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1526    assertFalse(results.get(1).isSuccess());
1527    assertNull(results.get(1).getResult());
1528
1529    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1530    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
1531
1532    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1533    assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
1534  }
1535
1536  @Test
1537  public void testCheckAndAppendBatch() throws Throwable {
1538    AsyncTable<?> table = getTable.get();
1539    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1540
1541    table.putAll(Arrays.asList(
1542      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY,
1543        Bytes.toBytes("B"), Bytes.toBytes("b")),
1544      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY,
1545        Bytes.toBytes("D"), Bytes.toBytes("d"))))
1546      .get();
1547
1548    // CheckAndAppend with correct value
1549    CheckAndMutate checkAndMutate1 =
1550      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
1551        .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
1552
1553    // CheckAndAppend with wrong value
1554    CheckAndMutate checkAndMutate2 =
1555      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
1556        .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
1557
1558    List<CheckAndMutateResult> results =
1559      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1560
1561    assertTrue(results.get(0).isSuccess());
1562    assertEquals("bb",
1563      Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B"))));
1564    assertFalse(results.get(1).isSuccess());
1565    assertNull(results.get(1).getResult());
1566
1567    Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
1568    assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
1569
1570    result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
1571    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1572  }
1573
1574  @Test
1575  public void testCheckAndRowMutationsBatch() throws Throwable {
1576    AsyncTable<?> table = getTable.get();
1577    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
1578
1579    table.putAll(Arrays.asList(
1580      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1581        .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
1582        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
1583      new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
1584        .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
1585        .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))))
1586      .get();
1587
1588    // CheckAndIncrement with correct value
1589    CheckAndMutate checkAndMutate1 =
1590      CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
1591        .build(new RowMutations(row)
1592          .add(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
1593            new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
1594            new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
1595            new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))));
1596
1597    // CheckAndIncrement with wrong value
1598    CheckAndMutate checkAndMutate2 =
1599      CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
1600        .build(new RowMutations(row2).add(
1601          Arrays.asList(new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
1602            new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
1603            new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
1604            new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))));
1605
1606    List<CheckAndMutateResult> results =
1607      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
1608
1609    assertTrue(results.get(0).isSuccess());
1610    assertEquals(2, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("C"))));
1611    assertEquals("dd",
1612      Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("D"))));
1613
1614    assertFalse(results.get(1).isSuccess());
1615    assertNull(results.get(1).getResult());
1616
1617    Result result = table.get(new Get(row)).get();
1618    assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
1619    assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
1620    assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
1621    assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
1622
1623    result = table.get(new Get(row2)).get();
1624    assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
1625    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
1626    assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
1627    assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
1628  }
1629
1630  @Test
1631  public void testDisabled() throws InterruptedException, ExecutionException {
1632    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
1633    try {
1634      getTable.get().get(new Get(row)).get();
1635      fail("Should fail since table has been disabled");
1636    } catch (ExecutionException e) {
1637      Throwable cause = e.getCause();
1638      assertThat(cause, instanceOf(TableNotEnabledException.class));
1639      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
1640    }
1641  }
1642
1643  @Test
1644  public void testInvalidPut() {
1645    try {
1646      getTable.get().put(new Put(Bytes.toBytes(0)));
1647      fail("Should fail since the put does not contain any cells");
1648    } catch (IllegalArgumentException e) {
1649      assertThat(e.getMessage(), containsString("No columns to insert"));
1650    }
1651
1652    try {
1653      getTable.get()
1654        .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
1655      fail("Should fail since the put exceeds the max key value size");
1656    } catch (IllegalArgumentException e) {
1657      assertThat(e.getMessage(), containsString("KeyValue size too large"));
1658    }
1659  }
1660
1661  @Test
1662  public void testInvalidPutInRowMutations() throws IOException {
1663    final byte[] row = Bytes.toBytes(0);
1664    try {
1665      getTable.get().mutateRow(new RowMutations(row).add(new Put(row)));
1666      fail("Should fail since the put does not contain any cells");
1667    } catch (IllegalArgumentException e) {
1668      assertThat(e.getMessage(), containsString("No columns to insert"));
1669    }
1670
1671    try {
1672      getTable.get().mutateRow(new RowMutations(row)
1673        .add(new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])));
1674      fail("Should fail since the put exceeds the max key value size");
1675    } catch (IllegalArgumentException e) {
1676      assertThat(e.getMessage(), containsString("KeyValue size too large"));
1677    }
1678  }
1679
1680  @Test
1681  public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
1682    final byte[] row = Bytes.toBytes(0);
1683    try {
1684      getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER)
1685        .build(new RowMutations(row).add(new Put(row))));
1686      fail("Should fail since the put does not contain any cells");
1687    } catch (IllegalArgumentException e) {
1688      assertThat(e.getMessage(), containsString("No columns to insert"));
1689    }
1690
1691    try {
1692      getTable.get().checkAndMutate(
1693        CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(new RowMutations(row)
1694          .add(new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))));
1695      fail("Should fail since the put exceeds the max key value size");
1696    } catch (IllegalArgumentException e) {
1697      assertThat(e.getMessage(), containsString("KeyValue size too large"));
1698    }
1699  }
1700}