001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.MatcherAssert.assertThat; 023import static org.junit.jupiter.api.Assertions.assertArrayEquals; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertFalse; 026import static org.junit.jupiter.api.Assertions.assertNull; 027import static org.junit.jupiter.api.Assertions.assertThrows; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029import static org.junit.jupiter.api.Assertions.fail; 030 031import java.io.IOException; 032import java.io.UncheckedIOException; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.List; 036import java.util.concurrent.ArrayBlockingQueue; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CountDownLatch; 039import java.util.concurrent.ExecutionException; 040import java.util.concurrent.ForkJoinPool; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicLong; 043import java.util.function.Consumer; 044import java.util.function.Supplier; 045import java.util.stream.IntStream; 046import java.util.stream.Stream; 047import org.apache.hadoop.hbase.CompareOperator; 048import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 049import org.apache.hadoop.hbase.HBaseTestingUtil; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.TableNotEnabledException; 052import org.apache.hadoop.hbase.filter.BinaryComparator; 053import org.apache.hadoop.hbase.filter.FamilyFilter; 054import org.apache.hadoop.hbase.filter.FilterList; 055import org.apache.hadoop.hbase.filter.QualifierFilter; 056import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 057import org.apache.hadoop.hbase.filter.TimestampsFilter; 058import org.apache.hadoop.hbase.io.TimeRange; 059import org.apache.hadoop.hbase.testclassification.ClientTests; 060import org.apache.hadoop.hbase.testclassification.MediumTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 063import org.apache.hadoop.hbase.util.Pair; 064import org.junit.jupiter.api.AfterAll; 065import org.junit.jupiter.api.BeforeAll; 066import org.junit.jupiter.api.BeforeEach; 067import org.junit.jupiter.api.Tag; 068import org.junit.jupiter.api.TestTemplate; 069import org.junit.jupiter.params.provider.Arguments; 070 071import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 072 073@Tag(MediumTests.TAG) 074@Tag(ClientTests.TAG) 075@HBaseParameterizedTestTemplate 076public class TestAsyncTable { 077 078 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 079 080 private static TableName TABLE_NAME = TableName.valueOf("async"); 081 082 private static byte[] FAMILY = Bytes.toBytes("cf"); 083 084 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 085 086 private static byte[] VALUE = Bytes.toBytes("value"); 087 088 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 089 090 private static AsyncConnection ASYNC_CONN; 091 092 private static final AtomicInteger ROW_COUNTER = new AtomicInteger(0); 093 094 private byte[] row; 095 096 private final Supplier<AsyncTable<?>> getTable; 097 098 public TestAsyncTable(Supplier<AsyncTable<?>> getTable) { 099 this.getTable = getTable; 100 } 101 102 private static AsyncTable<?> getRawTable() { 103 return ASYNC_CONN.getTable(TABLE_NAME); 104 } 105 106 private static AsyncTable<?> getTable() { 107 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 108 } 109 110 public static Stream<Arguments> parameters() { 111 return Stream.of(Arguments.of((Supplier<AsyncTable<?>>) TestAsyncTable::getRawTable), 112 Arguments.of((Supplier<AsyncTable<?>>) TestAsyncTable::getTable)); 113 } 114 115 @BeforeAll 116 public static void setUpBeforeClass() throws Exception { 117 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 118 MAX_KEY_VALUE_SIZE); 119 TEST_UTIL.startMiniCluster(1); 120 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 121 TEST_UTIL.waitTableAvailable(TABLE_NAME); 122 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 123 assertFalse(ASYNC_CONN.isClosed()); 124 } 125 126 @AfterAll 127 public static void tearDownAfterClass() throws Exception { 128 Closeables.close(ASYNC_CONN, true); 129 assertTrue(ASYNC_CONN.isClosed()); 130 TEST_UTIL.shutdownMiniCluster(); 131 } 132 133 @BeforeEach 134 public void setUp() throws IOException, InterruptedException, ExecutionException { 135 row = Bytes.toBytes("row" + ROW_COUNTER.getAndIncrement()); 136 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 137 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 138 } 139 } 140 141 @TestTemplate 142 public void testSimple() throws Exception { 143 AsyncTable<?> table = getTable.get(); 144 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 145 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 146 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 147 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 148 table.delete(new Delete(row)).get(); 149 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 150 assertTrue(result.isEmpty()); 151 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 152 } 153 154 private byte[] concat(byte[] base, int index) { 155 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 156 } 157 158 @SuppressWarnings("FutureReturnValueIgnored") 159 @TestTemplate 160 public void testSimpleMultiple() throws Exception { 161 AsyncTable<?> table = getTable.get(); 162 int count = 100; 163 CountDownLatch putLatch = new CountDownLatch(count); 164 IntStream.range(0, count).forEach( 165 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 166 .thenAccept(x -> putLatch.countDown())); 167 putLatch.await(); 168 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 169 IntStream.range(0, count) 170 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 171 .thenAccept(x -> existsResp.add(x))); 172 for (int i = 0; i < count; i++) { 173 assertTrue(existsResp.take()); 174 } 175 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 176 IntStream.range(0, count) 177 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 178 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 179 for (int i = 0; i < count; i++) { 180 Pair<Integer, Result> pair = getResp.take(); 181 assertArrayEquals(concat(VALUE, pair.getFirst()), 182 pair.getSecond().getValue(FAMILY, QUALIFIER)); 183 } 184 CountDownLatch deleteLatch = new CountDownLatch(count); 185 IntStream.range(0, count).forEach( 186 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 187 deleteLatch.await(); 188 IntStream.range(0, count) 189 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 190 .thenAccept(x -> existsResp.add(x))); 191 for (int i = 0; i < count; i++) { 192 assertFalse(existsResp.take()); 193 } 194 IntStream.range(0, count) 195 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 196 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 197 for (int i = 0; i < count; i++) { 198 Pair<Integer, Result> pair = getResp.take(); 199 assertTrue(pair.getSecond().isEmpty()); 200 } 201 } 202 203 @SuppressWarnings("FutureReturnValueIgnored") 204 @TestTemplate 205 public void testIncrement() throws InterruptedException, ExecutionException { 206 AsyncTable<?> table = getTable.get(); 207 int count = 100; 208 CountDownLatch latch = new CountDownLatch(count); 209 AtomicLong sum = new AtomicLong(0L); 210 IntStream.range(0, count) 211 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 212 sum.addAndGet(x); 213 latch.countDown(); 214 })); 215 latch.await(); 216 assertEquals(count, Bytes.toLong( 217 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 218 assertEquals((1 + count) * count / 2, sum.get()); 219 } 220 221 @SuppressWarnings("FutureReturnValueIgnored") 222 @TestTemplate 223 public void testAppend() throws InterruptedException, ExecutionException { 224 AsyncTable<?> table = getTable.get(); 225 int count = 10; 226 CountDownLatch latch = new CountDownLatch(count); 227 char suffix = ':'; 228 AtomicLong suffixCount = new AtomicLong(0L); 229 IntStream.range(0, count) 230 .forEachOrdered(i -> table 231 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 232 .thenAccept(r -> { 233 suffixCount.addAndGet( 234 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); 235 latch.countDown(); 236 })); 237 latch.await(); 238 assertEquals((1 + count) * count / 2, suffixCount.get()); 239 String value = Bytes.toString( 240 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 241 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 242 .sorted().toArray(); 243 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 244 } 245 246 @TestTemplate 247 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 248 AsyncTable<?> table = getTable.get(); 249 RowMutations mutation = new RowMutations(row); 250 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 251 Result result = table.mutateRow(mutation).get(); 252 assertTrue(result.getExists()); 253 assertTrue(result.isEmpty()); 254 255 result = table.get(new Get(row)).get(); 256 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 257 258 mutation = new RowMutations(row); 259 mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 260 mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 261 mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L)); 262 mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc"))); 263 result = table.mutateRow(mutation).get(); 264 assertTrue(result.getExists()); 265 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 266 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 267 268 result = table.get(new Get(row)).get(); 269 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 270 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 271 assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3)))); 272 assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4)))); 273 } 274 275 // Tests for old checkAndMutate API 276 277 @SuppressWarnings("FutureReturnValueIgnored") 278 @TestTemplate 279 @Deprecated 280 public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException { 281 AsyncTable<?> table = getTable.get(); 282 AtomicInteger successCount = new AtomicInteger(0); 283 AtomicInteger successIndex = new AtomicInteger(-1); 284 int count = 10; 285 CountDownLatch latch = new CountDownLatch(count); 286 IntStream.range(0, count) 287 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 288 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 289 if (x) { 290 successCount.incrementAndGet(); 291 successIndex.set(i); 292 } 293 latch.countDown(); 294 })); 295 latch.await(); 296 assertEquals(1, successCount.get()); 297 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 298 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 299 } 300 301 @SuppressWarnings("FutureReturnValueIgnored") 302 @TestTemplate 303 @Deprecated 304 public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException { 305 AsyncTable<?> table = getTable.get(); 306 int count = 10; 307 CountDownLatch putLatch = new CountDownLatch(count + 1); 308 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 309 IntStream.range(0, count) 310 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 311 .thenRun(() -> putLatch.countDown())); 312 putLatch.await(); 313 314 AtomicInteger successCount = new AtomicInteger(0); 315 AtomicInteger successIndex = new AtomicInteger(-1); 316 CountDownLatch deleteLatch = new CountDownLatch(count); 317 IntStream.range(0, count) 318 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 319 .thenDelete( 320 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 321 .thenAccept(x -> { 322 if (x) { 323 successCount.incrementAndGet(); 324 successIndex.set(i); 325 } 326 deleteLatch.countDown(); 327 })); 328 deleteLatch.await(); 329 assertEquals(1, successCount.get()); 330 Result result = table.get(new Get(row)).get(); 331 IntStream.range(0, count).forEach(i -> { 332 if (i == successIndex.get()) { 333 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 334 } else { 335 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 336 } 337 }); 338 } 339 340 @SuppressWarnings("FutureReturnValueIgnored") 341 @TestTemplate 342 @Deprecated 343 public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException { 344 AsyncTable<?> table = getTable.get(); 345 int count = 10; 346 CountDownLatch putLatch = new CountDownLatch(count + 1); 347 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 348 IntStream.range(0, count) 349 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 350 .thenRun(() -> putLatch.countDown())); 351 putLatch.await(); 352 353 AtomicInteger successCount = new AtomicInteger(0); 354 AtomicInteger successIndex = new AtomicInteger(-1); 355 CountDownLatch mutateLatch = new CountDownLatch(count); 356 IntStream.range(0, count).forEach(i -> { 357 RowMutations mutation = new RowMutations(row); 358 try { 359 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 360 mutation 361 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 362 } catch (IOException e) { 363 throw new UncheckedIOException(e); 364 } 365 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 366 .thenAccept(x -> { 367 if (x) { 368 successCount.incrementAndGet(); 369 successIndex.set(i); 370 } 371 mutateLatch.countDown(); 372 }); 373 }); 374 mutateLatch.await(); 375 assertEquals(1, successCount.get()); 376 Result result = table.get(new Get(row)).get(); 377 IntStream.range(0, count).forEach(i -> { 378 if (i == successIndex.get()) { 379 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 380 } else { 381 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 382 } 383 }); 384 } 385 386 @TestTemplate 387 @Deprecated 388 public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception { 389 AsyncTable<?> table = getTable.get(); 390 final long ts = EnvironmentEdgeManager.currentTime() / 2; 391 Put put = new Put(row); 392 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 393 394 boolean ok = 395 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); 396 assertTrue(ok); 397 398 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 399 .ifEquals(VALUE).thenPut(put).get(); 400 assertFalse(ok); 401 402 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 403 .ifEquals(VALUE).thenPut(put).get(); 404 assertTrue(ok); 405 406 RowMutations rm = new RowMutations(row).add((Mutation) put); 407 408 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 409 .ifEquals(VALUE).thenMutate(rm).get(); 410 assertFalse(ok); 411 412 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 413 .ifEquals(VALUE).thenMutate(rm).get(); 414 assertTrue(ok); 415 416 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 417 418 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 419 .ifEquals(VALUE).thenDelete(delete).get(); 420 assertFalse(ok); 421 422 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 423 .ifEquals(VALUE).thenDelete(delete).get(); 424 assertTrue(ok); 425 } 426 427 @TestTemplate 428 @Deprecated 429 public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable { 430 AsyncTable<?> table = getTable.get(); 431 432 // Put one row 433 Put put = new Put(row); 434 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 435 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 436 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 437 table.put(put).get(); 438 439 // Put with success 440 boolean ok = table 441 .checkAndMutate(row, 442 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 443 Bytes.toBytes("a"))) 444 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get(); 445 assertTrue(ok); 446 447 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 448 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 449 450 // Put with failure 451 ok = table 452 .checkAndMutate(row, 453 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 454 Bytes.toBytes("b"))) 455 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get(); 456 assertFalse(ok); 457 458 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 459 460 // Delete with success 461 ok = table 462 .checkAndMutate(row, 463 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 464 Bytes.toBytes("a"))) 465 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get(); 466 assertTrue(ok); 467 468 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 469 470 // Mutate with success 471 ok = table 472 .checkAndMutate(row, 473 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 474 Bytes.toBytes("b"))) 475 .thenMutate(new RowMutations(row) 476 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 477 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 478 .get(); 479 assertTrue(ok); 480 481 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 482 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 483 484 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 485 } 486 487 @TestTemplate 488 @Deprecated 489 public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable { 490 AsyncTable<?> table = getTable.get(); 491 492 // Put one row 493 Put put = new Put(row); 494 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 495 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 496 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 497 table.put(put).get(); 498 499 // Put with success 500 boolean ok = table 501 .checkAndMutate(row, 502 new FilterList( 503 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 504 Bytes.toBytes("a")), 505 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 506 Bytes.toBytes("b")))) 507 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))).get(); 508 assertTrue(ok); 509 510 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 511 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 512 513 // Put with failure 514 ok = table 515 .checkAndMutate(row, 516 new FilterList( 517 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 518 Bytes.toBytes("a")), 519 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 520 Bytes.toBytes("c")))) 521 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))).get(); 522 assertFalse(ok); 523 524 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 525 526 // Delete with success 527 ok = table 528 .checkAndMutate(row, 529 new FilterList( 530 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 531 Bytes.toBytes("a")), 532 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 533 Bytes.toBytes("b")))) 534 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))).get(); 535 assertTrue(ok); 536 537 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 538 539 // Mutate with success 540 ok = table 541 .checkAndMutate(row, 542 new FilterList( 543 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 544 Bytes.toBytes("a")), 545 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 546 Bytes.toBytes("b")))) 547 .thenMutate(new RowMutations(row) 548 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 549 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 550 .get(); 551 assertTrue(ok); 552 553 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 554 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 555 556 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 557 } 558 559 @TestTemplate 560 @Deprecated 561 public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable { 562 AsyncTable<?> table = getTable.get(); 563 564 // Put with specifying the timestamp 565 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 566 567 // Put with success 568 boolean ok = table 569 .checkAndMutate(row, 570 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 571 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 572 new TimestampsFilter(Collections.singletonList(100L)))) 573 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get(); 574 assertTrue(ok); 575 576 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 577 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 578 579 // Put with failure 580 ok = table 581 .checkAndMutate(row, 582 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 583 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 584 new TimestampsFilter(Collections.singletonList(101L)))) 585 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get(); 586 assertFalse(ok); 587 588 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 589 } 590 591 @TestTemplate 592 @Deprecated 593 public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable { 594 AsyncTable<?> table = getTable.get(); 595 596 // Put with specifying the timestamp 597 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 598 599 // Put with success 600 boolean ok = table 601 .checkAndMutate(row, 602 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 603 Bytes.toBytes("a"))) 604 .timeRange(TimeRange.between(0, 101)) 605 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))).get(); 606 assertTrue(ok); 607 608 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 609 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 610 611 // Put with failure 612 ok = table 613 .checkAndMutate(row, 614 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 615 Bytes.toBytes("a"))) 616 .timeRange(TimeRange.between(0, 100)) 617 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))).get(); 618 assertFalse(ok); 619 620 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 621 } 622 623 @TestTemplate 624 @Deprecated 625 public void testCheckAndMutateWithoutConditionForOldApi() { 626 assertThrows(NullPointerException.class, () -> { 627 getTable.get().checkAndMutate(row, FAMILY) 628 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 629 }); 630 } 631 632 // Tests for new CheckAndMutate API 633 634 @SuppressWarnings("FutureReturnValueIgnored") 635 @TestTemplate 636 public void testCheckAndPut() throws InterruptedException, ExecutionException { 637 AsyncTable<?> table = getTable.get(); 638 AtomicInteger successCount = new AtomicInteger(0); 639 AtomicInteger successIndex = new AtomicInteger(-1); 640 int count = 10; 641 CountDownLatch latch = new CountDownLatch(count); 642 643 IntStream.range(0, count).forEach( 644 i -> table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 645 .build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))).thenAccept(x -> { 646 if (x.isSuccess()) { 647 successCount.incrementAndGet(); 648 successIndex.set(i); 649 } 650 assertNull(x.getResult()); 651 latch.countDown(); 652 })); 653 latch.await(); 654 assertEquals(1, successCount.get()); 655 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 656 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 657 } 658 659 @SuppressWarnings("FutureReturnValueIgnored") 660 @TestTemplate 661 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 662 AsyncTable<?> table = getTable.get(); 663 int count = 10; 664 CountDownLatch putLatch = new CountDownLatch(count + 1); 665 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 666 IntStream.range(0, count) 667 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 668 .thenRun(() -> putLatch.countDown())); 669 putLatch.await(); 670 671 AtomicInteger successCount = new AtomicInteger(0); 672 AtomicInteger successIndex = new AtomicInteger(-1); 673 CountDownLatch deleteLatch = new CountDownLatch(count); 674 675 IntStream.range(0, count) 676 .forEach(i -> table 677 .checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build( 678 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))) 679 .thenAccept(x -> { 680 if (x.isSuccess()) { 681 successCount.incrementAndGet(); 682 successIndex.set(i); 683 } 684 assertNull(x.getResult()); 685 deleteLatch.countDown(); 686 })); 687 deleteLatch.await(); 688 assertEquals(1, successCount.get()); 689 Result result = table.get(new Get(row)).get(); 690 IntStream.range(0, count).forEach(i -> { 691 if (i == successIndex.get()) { 692 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 693 } else { 694 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 695 } 696 }); 697 } 698 699 @SuppressWarnings("FutureReturnValueIgnored") 700 @TestTemplate 701 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 702 AsyncTable<?> table = getTable.get(); 703 int count = 10; 704 CountDownLatch putLatch = new CountDownLatch(count + 1); 705 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 706 IntStream.range(0, count) 707 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 708 .thenRun(() -> putLatch.countDown())); 709 putLatch.await(); 710 711 AtomicInteger successCount = new AtomicInteger(0); 712 AtomicInteger successIndex = new AtomicInteger(-1); 713 CountDownLatch mutateLatch = new CountDownLatch(count); 714 IntStream.range(0, count).forEach(i -> { 715 RowMutations mutation = new RowMutations(row); 716 try { 717 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 718 mutation 719 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 720 } catch (IOException e) { 721 throw new UncheckedIOException(e); 722 } 723 724 table 725 .checkAndMutate( 726 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE).build(mutation)) 727 .thenAccept(x -> { 728 if (x.isSuccess()) { 729 successCount.incrementAndGet(); 730 successIndex.set(i); 731 } 732 assertNull(x.getResult()); 733 mutateLatch.countDown(); 734 }); 735 }); 736 mutateLatch.await(); 737 assertEquals(1, successCount.get()); 738 Result result = table.get(new Get(row)).get(); 739 IntStream.range(0, count).forEach(i -> { 740 if (i == successIndex.get()) { 741 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 742 } else { 743 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 744 } 745 }); 746 } 747 748 @TestTemplate 749 public void testCheckAndMutateWithTimeRange() throws Exception { 750 AsyncTable<?> table = getTable.get(); 751 final long ts = EnvironmentEdgeManager.currentTime() / 2; 752 Put put = new Put(row); 753 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 754 755 CheckAndMutateResult result = 756 table.checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER).build(put)) 757 .get(); 758 assertTrue(result.isSuccess()); 759 assertNull(result.getResult()); 760 761 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 762 .timeRange(TimeRange.at(ts + 10000)).build(put)).get(); 763 assertFalse(result.isSuccess()); 764 assertNull(result.getResult()); 765 766 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 767 .timeRange(TimeRange.at(ts)).build(put)).get(); 768 assertTrue(result.isSuccess()); 769 assertNull(result.getResult()); 770 771 RowMutations rm = new RowMutations(row).add((Mutation) put); 772 773 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 774 .timeRange(TimeRange.at(ts + 10000)).build(rm)).get(); 775 assertFalse(result.isSuccess()); 776 assertNull(result.getResult()); 777 778 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 779 .timeRange(TimeRange.at(ts)).build(rm)).get(); 780 assertTrue(result.isSuccess()); 781 assertNull(result.getResult()); 782 783 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 784 785 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 786 .timeRange(TimeRange.at(ts + 10000)).build(delete)).get(); 787 assertFalse(result.isSuccess()); 788 assertNull(result.getResult()); 789 790 result = table.checkAndMutate(CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, VALUE) 791 .timeRange(TimeRange.at(ts)).build(delete)).get(); 792 assertTrue(result.isSuccess()); 793 assertNull(result.getResult()); 794 } 795 796 @TestTemplate 797 public void testCheckAndMutateWithSingleFilter() throws Throwable { 798 AsyncTable<?> table = getTable.get(); 799 800 // Put one row 801 Put put = new Put(row); 802 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 803 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 804 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 805 table.put(put).get(); 806 807 // Put with success 808 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 809 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 810 Bytes.toBytes("a"))) 811 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 812 assertTrue(result.isSuccess()); 813 assertNull(result.getResult()); 814 815 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 816 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 817 818 // Put with failure 819 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 820 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 821 Bytes.toBytes("b"))) 822 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 823 assertFalse(result.isSuccess()); 824 assertNull(result.getResult()); 825 826 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 827 828 // Delete with success 829 result = 830 table 831 .checkAndMutate(CheckAndMutate.newBuilder(row) 832 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 833 Bytes.toBytes("a"))) 834 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))) 835 .get(); 836 assertTrue(result.isSuccess()); 837 assertNull(result.getResult()); 838 839 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 840 841 // Mutate with success 842 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 843 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 844 Bytes.toBytes("b"))) 845 .build(new RowMutations(row) 846 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 847 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))) 848 .get(); 849 assertTrue(result.isSuccess()); 850 assertNull(result.getResult()); 851 852 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 853 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 854 855 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 856 } 857 858 @TestTemplate 859 public void testCheckAndMutateWithMultipleFilters() throws Throwable { 860 AsyncTable<?> table = getTable.get(); 861 862 // Put one row 863 Put put = new Put(row); 864 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 865 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 866 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 867 table.put(put).get(); 868 869 // Put with success 870 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 871 .ifMatches(new FilterList( 872 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 873 Bytes.toBytes("a")), 874 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 875 Bytes.toBytes("b")))) 876 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get(); 877 assertTrue(result.isSuccess()); 878 assertNull(result.getResult()); 879 880 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 881 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 882 883 // Put with failure 884 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 885 .ifMatches(new FilterList( 886 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 887 Bytes.toBytes("a")), 888 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 889 Bytes.toBytes("c")))) 890 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get(); 891 assertFalse(result.isSuccess()); 892 assertNull(result.getResult()); 893 894 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 895 896 // Delete with success 897 result = table 898 .checkAndMutate(CheckAndMutate.newBuilder(row) 899 .ifMatches(new FilterList( 900 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 901 Bytes.toBytes("a")), 902 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 903 Bytes.toBytes("b")))) 904 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))) 905 .get(); 906 assertTrue(result.isSuccess()); 907 assertNull(result.getResult()); 908 909 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 910 911 // Mutate with success 912 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 913 .ifMatches(new FilterList( 914 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 915 Bytes.toBytes("a")), 916 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 917 Bytes.toBytes("b")))) 918 .build(new RowMutations(row) 919 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 920 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))) 921 .get(); 922 assertTrue(result.isSuccess()); 923 assertNull(result.getResult()); 924 925 r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 926 assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D")))); 927 928 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 929 } 930 931 @TestTemplate 932 public void testCheckAndMutateWithTimestampFilter() throws Throwable { 933 AsyncTable<?> table = getTable.get(); 934 935 // Put with specifying the timestamp 936 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 937 938 // Put with success 939 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 940 .ifMatches( 941 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 942 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 943 new TimestampsFilter(Collections.singletonList(100L)))) 944 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 945 assertTrue(result.isSuccess()); 946 assertNull(result.getResult()); 947 948 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 949 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 950 951 // Put with failure 952 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 953 .ifMatches( 954 new FilterList(new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 955 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 956 new TimestampsFilter(Collections.singletonList(101L)))) 957 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 958 assertFalse(result.isSuccess()); 959 assertNull(result.getResult()); 960 961 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 962 } 963 964 @TestTemplate 965 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { 966 AsyncTable<?> table = getTable.get(); 967 968 // Put with specifying the timestamp 969 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 970 971 // Put with success 972 CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 973 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 974 Bytes.toBytes("a"))) 975 .timeRange(TimeRange.between(0, 101)) 976 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get(); 977 assertTrue(result.isSuccess()); 978 assertNull(result.getResult()); 979 980 Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 981 assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B")))); 982 983 // Put with failure 984 result = table.checkAndMutate(CheckAndMutate.newBuilder(row) 985 .ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 986 Bytes.toBytes("a"))) 987 .timeRange(TimeRange.between(0, 100)) 988 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get(); 989 assertFalse(result.isSuccess()); 990 assertNull(result.getResult()); 991 992 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 993 } 994 995 @TestTemplate 996 public void testCheckAndIncrement() throws Throwable { 997 AsyncTable<?> table = getTable.get(); 998 999 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1000 1001 // CheckAndIncrement with correct value 1002 CheckAndMutateResult res = table.checkAndMutate( 1003 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1004 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))) 1005 .get(); 1006 assertTrue(res.isSuccess()); 1007 assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1008 1009 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1010 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1011 1012 // CheckAndIncrement with wrong value 1013 res = table.checkAndMutate( 1014 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1015 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))) 1016 .get(); 1017 assertFalse(res.isSuccess()); 1018 assertNull(res.getResult()); 1019 1020 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1021 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1022 1023 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1024 1025 // CheckAndIncrement with a filter and correct value 1026 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1027 .ifMatches(new FilterList( 1028 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1029 Bytes.toBytes("a")), 1030 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1031 Bytes.toBytes("c")))) 1032 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1033 assertTrue(res.isSuccess()); 1034 assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1035 1036 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1037 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1038 1039 // CheckAndIncrement with a filter and correct value 1040 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1041 .ifMatches(new FilterList( 1042 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1043 Bytes.toBytes("b")), 1044 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1045 Bytes.toBytes("d")))) 1046 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get(); 1047 assertFalse(res.isSuccess()); 1048 assertNull(res.getResult()); 1049 1050 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1051 assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1052 } 1053 1054 @TestTemplate 1055 public void testCheckAndAppend() throws Throwable { 1056 AsyncTable<?> table = getTable.get(); 1057 1058 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get(); 1059 1060 // CheckAndAppend with correct value 1061 CheckAndMutateResult res = 1062 table 1063 .checkAndMutate( 1064 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1065 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))) 1066 .get(); 1067 assertTrue(res.isSuccess()); 1068 assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1069 1070 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1071 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1072 1073 // CheckAndAppend with correct value 1074 res = 1075 table 1076 .checkAndMutate( 1077 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b")) 1078 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))) 1079 .get(); 1080 assertFalse(res.isSuccess()); 1081 assertNull(res.getResult()); 1082 1083 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1084 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1085 1086 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))); 1087 1088 // CheckAndAppend with a filter and correct value 1089 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1090 .ifMatches(new FilterList( 1091 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1092 Bytes.toBytes("a")), 1093 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1094 Bytes.toBytes("c")))) 1095 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1096 assertTrue(res.isSuccess()); 1097 assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1098 1099 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1100 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1101 1102 // CheckAndAppend with a filter and wrong value 1103 res = table.checkAndMutate(CheckAndMutate.newBuilder(row) 1104 .ifMatches(new FilterList( 1105 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1106 Bytes.toBytes("b")), 1107 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL, 1108 Bytes.toBytes("d")))) 1109 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get(); 1110 assertFalse(res.isSuccess()); 1111 assertNull(res.getResult()); 1112 1113 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1114 assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1115 } 1116 1117 @TestTemplate 1118 public void testCheckAndRowMutations() throws Throwable { 1119 final byte[] q1 = Bytes.toBytes("q1"); 1120 final byte[] q2 = Bytes.toBytes("q2"); 1121 final byte[] q3 = Bytes.toBytes("q3"); 1122 final byte[] q4 = Bytes.toBytes("q4"); 1123 final String v1 = "v1"; 1124 1125 AsyncTable<?> table = getTable.get(); 1126 1127 // Initial values 1128 table.putAll(Arrays.asList(new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")), 1129 new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)), 1130 new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get(); 1131 1132 // Do CheckAndRowMutations 1133 CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1).build( 1134 new RowMutations(row).add(Arrays.asList(new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)), 1135 new Delete(row).addColumns(FAMILY, q2), new Increment(row).addColumn(FAMILY, q3, 1), 1136 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))); 1137 1138 CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get(); 1139 assertTrue(result.isSuccess()); 1140 assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3))); 1141 assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4))); 1142 1143 // Verify the value 1144 Result r = table.get(new Get(row)).get(); 1145 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1146 assertNull(r.getValue(FAMILY, q2)); 1147 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1148 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1149 1150 // Do CheckAndRowMutations again 1151 checkAndMutate = CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, q1) 1152 .build(new RowMutations(row).add(Arrays.asList(new Delete(row).addColumns(FAMILY, q1), 1153 new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)), 1154 new Increment(row).addColumn(FAMILY, q3, 1), 1155 new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))); 1156 1157 result = table.checkAndMutate(checkAndMutate).get(); 1158 assertFalse(result.isSuccess()); 1159 assertNull(result.getResult()); 1160 1161 // Verify the value 1162 r = table.get(new Get(row)).get(); 1163 assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1))); 1164 assertNull(r.getValue(FAMILY, q2)); 1165 assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3))); 1166 assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4))); 1167 } 1168 1169 // Tests for batch version of checkAndMutate 1170 1171 @TestTemplate 1172 public void testCheckAndMutateBatch() throws Throwable { 1173 AsyncTable<?> table = getTable.get(); 1174 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1175 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1176 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1177 1178 table 1179 .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1180 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1181 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1182 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))) 1183 .get(); 1184 1185 // Test for Put 1186 CheckAndMutate checkAndMutate1 = 1187 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1188 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1189 1190 CheckAndMutate checkAndMutate2 = 1191 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")) 1192 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1193 1194 List<CheckAndMutateResult> results = 1195 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1196 1197 assertTrue(results.get(0).isSuccess()); 1198 assertNull(results.get(0).getResult()); 1199 assertFalse(results.get(1).isSuccess()); 1200 assertNull(results.get(1).getResult()); 1201 1202 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1203 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1204 1205 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1206 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1207 1208 // Test for Delete 1209 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1210 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")).build(new Delete(row)); 1211 1212 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1213 .ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a")).build(new Delete(row2)); 1214 1215 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1216 1217 assertTrue(results.get(0).isSuccess()); 1218 assertNull(results.get(0).getResult()); 1219 assertFalse(results.get(1).isSuccess()); 1220 assertNull(results.get(1).getResult()); 1221 1222 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 1223 1224 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1225 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1226 1227 // Test for RowMutations 1228 checkAndMutate1 = 1229 CheckAndMutate.newBuilder(row3).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")) 1230 .build(new RowMutations(row3) 1231 .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1232 .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))); 1233 1234 checkAndMutate2 = 1235 CheckAndMutate.newBuilder(row4).ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")) 1236 .build(new RowMutations(row4) 1237 .add((Mutation) new Put(row4).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))) 1238 .add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D")))); 1239 1240 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1241 1242 assertTrue(results.get(0).isSuccess()); 1243 assertNull(results.get(0).getResult()); 1244 assertFalse(results.get(1).isSuccess()); 1245 assertNull(results.get(1).getResult()); 1246 1247 result = table.get(new Get(row3)).get(); 1248 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1249 assertNull(result.getValue(FAMILY, Bytes.toBytes("D"))); 1250 1251 result = table.get(new Get(row4)).get(); 1252 assertNull(result.getValue(FAMILY, Bytes.toBytes("F"))); 1253 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1254 } 1255 1256 @TestTemplate 1257 public void testCheckAndMutateBatch2() throws Throwable { 1258 AsyncTable<?> table = getTable.get(); 1259 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1260 byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3"); 1261 byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4"); 1262 1263 table 1264 .putAll(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1265 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")), 1266 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")), 1267 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))) 1268 .get(); 1269 1270 // Test for ifNotExists() 1271 CheckAndMutate checkAndMutate1 = 1272 CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, Bytes.toBytes("B")) 1273 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))); 1274 1275 CheckAndMutate checkAndMutate2 = 1276 CheckAndMutate.newBuilder(row2).ifNotExists(FAMILY, Bytes.toBytes("B")) 1277 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1278 1279 List<CheckAndMutateResult> results = 1280 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1281 1282 assertTrue(results.get(0).isSuccess()); 1283 assertNull(results.get(0).getResult()); 1284 assertFalse(results.get(1).isSuccess()); 1285 assertNull(results.get(1).getResult()); 1286 1287 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1288 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1289 1290 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1291 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1292 1293 // Test for ifMatches() 1294 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1295 .ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a")) 1296 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))); 1297 1298 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1299 .ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b")) 1300 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f"))); 1301 1302 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1303 1304 assertTrue(results.get(0).isSuccess()); 1305 assertNull(results.get(0).getResult()); 1306 assertFalse(results.get(1).isSuccess()); 1307 assertNull(results.get(1).getResult()); 1308 1309 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get(); 1310 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1311 1312 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1313 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1314 1315 // Test for timeRange() 1316 checkAndMutate1 = CheckAndMutate.newBuilder(row3) 1317 .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).timeRange(TimeRange.between(0, 101)) 1318 .build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e"))); 1319 1320 checkAndMutate2 = CheckAndMutate.newBuilder(row4) 1321 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")).timeRange(TimeRange.between(0, 100)) 1322 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))); 1323 1324 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1325 1326 assertTrue(results.get(0).isSuccess()); 1327 assertNull(results.get(0).getResult()); 1328 assertFalse(results.get(1).isSuccess()); 1329 assertNull(results.get(1).getResult()); 1330 1331 result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1332 assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1333 1334 result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1335 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1336 } 1337 1338 @TestTemplate 1339 public void testCheckAndMutateBatchWithFilter() throws Throwable { 1340 AsyncTable<?> table = getTable.get(); 1341 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1342 1343 table.putAll(Arrays.asList( 1344 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1345 .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1346 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1347 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")) 1348 .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")) 1349 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))) 1350 .get(); 1351 1352 // Test for Put 1353 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1354 .ifMatches(new FilterList( 1355 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1356 Bytes.toBytes("a")), 1357 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1358 Bytes.toBytes("b")))) 1359 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1360 1361 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1362 .ifMatches(new FilterList( 1363 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1364 Bytes.toBytes("a")), 1365 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1366 Bytes.toBytes("b")))) 1367 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1368 1369 List<CheckAndMutateResult> results = 1370 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1371 1372 assertTrue(results.get(0).isSuccess()); 1373 assertNull(results.get(0).getResult()); 1374 assertFalse(results.get(1).isSuccess()); 1375 assertNull(results.get(1).getResult()); 1376 1377 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1378 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1379 1380 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1381 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1382 1383 // Test for Delete 1384 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1385 .ifMatches(new FilterList( 1386 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1387 Bytes.toBytes("a")), 1388 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1389 Bytes.toBytes("b")))) 1390 .build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C"))); 1391 1392 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1393 .ifMatches(new FilterList( 1394 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1395 Bytes.toBytes("a")), 1396 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1397 Bytes.toBytes("b")))) 1398 .build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F"))); 1399 1400 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1401 1402 assertTrue(results.get(0).isSuccess()); 1403 assertNull(results.get(0).getResult()); 1404 assertFalse(results.get(1).isSuccess()); 1405 assertNull(results.get(1).getResult()); 1406 1407 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 1408 1409 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1410 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1411 1412 // Test for RowMutations 1413 checkAndMutate1 = CheckAndMutate.newBuilder(row) 1414 .ifMatches(new FilterList( 1415 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1416 Bytes.toBytes("a")), 1417 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1418 Bytes.toBytes("b")))) 1419 .build(new RowMutations(row) 1420 .add((Mutation) new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 1421 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))); 1422 1423 checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1424 .ifMatches(new FilterList( 1425 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1426 Bytes.toBytes("a")), 1427 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1428 Bytes.toBytes("b")))) 1429 .build(new RowMutations(row2) 1430 .add((Mutation) new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g"))) 1431 .add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D")))); 1432 1433 results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1434 1435 assertTrue(results.get(0).isSuccess()); 1436 assertNull(results.get(0).getResult()); 1437 assertFalse(results.get(1).isSuccess()); 1438 assertNull(results.get(1).getResult()); 1439 1440 result = table.get(new Get(row)).get(); 1441 assertNull(result.getValue(FAMILY, Bytes.toBytes("A"))); 1442 assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1443 1444 result = table.get(new Get(row2)).get(); 1445 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1446 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1447 } 1448 1449 @TestTemplate 1450 public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable { 1451 AsyncTable<?> table = getTable.get(); 1452 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1453 1454 table.putAll(Arrays.asList( 1455 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")) 1456 .addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b")) 1457 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")), 1458 new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")) 1459 .addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e")) 1460 .addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))) 1461 .get(); 1462 1463 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row) 1464 .ifMatches(new FilterList( 1465 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 1466 Bytes.toBytes("a")), 1467 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 1468 Bytes.toBytes("b")))) 1469 .timeRange(TimeRange.between(0, 101)) 1470 .build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g"))); 1471 1472 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2) 1473 .ifMatches(new FilterList( 1474 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL, 1475 Bytes.toBytes("d")), 1476 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL, 1477 Bytes.toBytes("e")))) 1478 .timeRange(TimeRange.between(0, 100)) 1479 .build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h"))); 1480 1481 List<CheckAndMutateResult> results = 1482 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1483 1484 assertTrue(results.get(0).isSuccess()); 1485 assertNull(results.get(0).getResult()); 1486 assertFalse(results.get(1).isSuccess()); 1487 assertNull(results.get(1).getResult()); 1488 1489 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get(); 1490 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C")))); 1491 1492 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get(); 1493 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1494 } 1495 1496 @TestTemplate 1497 public void testCheckAndIncrementBatch() throws Throwable { 1498 AsyncTable<?> table = getTable.get(); 1499 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1500 1501 table.putAll(Arrays.asList( 1502 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY, 1503 Bytes.toBytes("B"), Bytes.toBytes(0L)), 1504 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY, 1505 Bytes.toBytes("D"), Bytes.toBytes(0L)))) 1506 .get(); 1507 1508 // CheckAndIncrement with correct value 1509 CheckAndMutate checkAndMutate1 = 1510 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1511 .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1)); 1512 1513 // CheckAndIncrement with wrong value 1514 CheckAndMutate checkAndMutate2 = 1515 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1516 .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1)); 1517 1518 List<CheckAndMutateResult> results = 1519 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1520 1521 assertTrue(results.get(0).isSuccess()); 1522 assertEquals(1, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1523 assertFalse(results.get(1).isSuccess()); 1524 assertNull(results.get(1).getResult()); 1525 1526 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1527 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B")))); 1528 1529 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1530 assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D")))); 1531 } 1532 1533 @TestTemplate 1534 public void testCheckAndAppendBatch() throws Throwable { 1535 AsyncTable<?> table = getTable.get(); 1536 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1537 1538 table.putAll(Arrays.asList( 1539 new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")).addColumn(FAMILY, 1540 Bytes.toBytes("B"), Bytes.toBytes("b")), 1541 new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")).addColumn(FAMILY, 1542 Bytes.toBytes("D"), Bytes.toBytes("d")))) 1543 .get(); 1544 1545 // CheckAndAppend with correct value 1546 CheckAndMutate checkAndMutate1 = 1547 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")) 1548 .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))); 1549 1550 // CheckAndAppend with wrong value 1551 CheckAndMutate checkAndMutate2 = 1552 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d")) 1553 .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 1554 1555 List<CheckAndMutateResult> results = 1556 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1557 1558 assertTrue(results.get(0).isSuccess()); 1559 assertEquals("bb", 1560 Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("B")))); 1561 assertFalse(results.get(1).isSuccess()); 1562 assertNull(results.get(1).getResult()); 1563 1564 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 1565 assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 1566 1567 result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 1568 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1569 } 1570 1571 @TestTemplate 1572 public void testCheckAndRowMutationsBatch() throws Throwable { 1573 AsyncTable<?> table = getTable.get(); 1574 byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2"); 1575 1576 table.putAll(Arrays.asList( 1577 new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1578 .addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L)) 1579 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")), 1580 new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")) 1581 .addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L)) 1582 .addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))) 1583 .get(); 1584 1585 // CheckAndIncrement with correct value 1586 CheckAndMutate checkAndMutate1 = 1587 CheckAndMutate.newBuilder(row).ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")) 1588 .build(new RowMutations(row) 1589 .add(Arrays.asList(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")), 1590 new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")), 1591 new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L), 1592 new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))))); 1593 1594 // CheckAndIncrement with wrong value 1595 CheckAndMutate checkAndMutate2 = 1596 CheckAndMutate.newBuilder(row2).ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a")) 1597 .build(new RowMutations(row2).add( 1598 Arrays.asList(new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")), 1599 new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")), 1600 new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L), 1601 new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))))); 1602 1603 List<CheckAndMutateResult> results = 1604 table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get(); 1605 1606 assertTrue(results.get(0).isSuccess()); 1607 assertEquals(2, Bytes.toLong(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("C")))); 1608 assertEquals("dd", 1609 Bytes.toString(results.get(0).getResult().getValue(FAMILY, Bytes.toBytes("D")))); 1610 1611 assertFalse(results.get(1).isSuccess()); 1612 assertNull(results.get(1).getResult()); 1613 1614 Result result = table.get(new Get(row)).get(); 1615 assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A")))); 1616 assertNull(result.getValue(FAMILY, Bytes.toBytes("B"))); 1617 assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C")))); 1618 assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 1619 1620 result = table.get(new Get(row2)).get(); 1621 assertNull(result.getValue(FAMILY, Bytes.toBytes("E"))); 1622 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F")))); 1623 assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G")))); 1624 assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H")))); 1625 } 1626 1627 @TestTemplate 1628 public void testDisabled() throws InterruptedException, ExecutionException { 1629 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 1630 try { 1631 getTable.get().get(new Get(row)).get(); 1632 fail("Should fail since table has been disabled"); 1633 } catch (ExecutionException e) { 1634 Throwable cause = e.getCause(); 1635 assertThat(cause, instanceOf(TableNotEnabledException.class)); 1636 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 1637 } 1638 } 1639 1640 @TestTemplate 1641 public void testInvalidMutation() throws Exception { 1642 Consumer<Mutation> executeMutation = mutation -> { 1643 if (mutation instanceof Put) { 1644 getTable.get().put((Put) mutation); 1645 } else if (mutation instanceof Increment) { 1646 getTable.get().increment((Increment) mutation); 1647 } else if (mutation instanceof Append) { 1648 getTable.get().append((Append) mutation); 1649 } 1650 }; 1651 1652 Mutation[] emptyMutations = 1653 { new Put(Bytes.toBytes(0)), new Increment(Bytes.toBytes(0)), new Append(Bytes.toBytes(0)) }; 1654 1655 String[] emptyMessages = 1656 { "No columns to put", "No columns to increment", "No columns to append" }; 1657 1658 Mutation[] oversizedMutations = 1659 { new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]), 1660 new Increment(Bytes.toBytes(0)).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 1661 new Append(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) }; 1662 1663 for (int i = 0; i < emptyMutations.length; i++) { 1664 // Test empty mutation 1665 try { 1666 executeMutation.accept(emptyMutations[i]); 1667 fail("Should fail since the mutation does not contain any cells"); 1668 } catch (IllegalArgumentException e) { 1669 assertThat(e.getMessage(), containsString(emptyMessages[i])); 1670 } 1671 1672 // Test oversized mutation 1673 try { 1674 executeMutation.accept(oversizedMutations[i]); 1675 fail("Should fail since the mutation exceeds the max key value size"); 1676 } catch (IllegalArgumentException e) { 1677 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1678 } 1679 } 1680 } 1681 1682 @TestTemplate 1683 public void testInvalidMutationInRowMutations() throws IOException { 1684 final byte[] row = Bytes.toBytes(0); 1685 1686 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 1687 1688 String[] emptyMessages = 1689 { "No columns to put", "No columns to increment", "No columns to append" }; 1690 1691 Mutation[] oversizedMutations = 1692 { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]), 1693 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 1694 new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) }; 1695 1696 for (int i = 0; i < emptyMutations.length; i++) { 1697 // Test empty mutation 1698 try { 1699 getTable.get().mutateRow(new RowMutations(row).add(emptyMutations[i])); 1700 fail("Should fail since the mutation does not contain any cells"); 1701 } catch (IllegalArgumentException e) { 1702 assertThat(e.getMessage(), containsString(emptyMessages[i])); 1703 } 1704 1705 // Test oversized mutation 1706 try { 1707 getTable.get().mutateRow(new RowMutations(row).add(oversizedMutations[i])); 1708 fail("Should fail since the mutation exceeds the max key value size"); 1709 } catch (IllegalArgumentException e) { 1710 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1711 } 1712 } 1713 } 1714 1715 @TestTemplate 1716 public void testInvalidMutationInRowMutationsInCheckAndMutate() throws IOException { 1717 final byte[] row = Bytes.toBytes(0); 1718 1719 Mutation[] emptyMutations = { new Put(row), new Increment(row), new Append(row) }; 1720 1721 String[] emptyMessages = 1722 { "No columns to put", "No columns to increment", "No columns to append" }; 1723 1724 Mutation[] oversizedMutations = 1725 { new Put(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]), 1726 new Increment(row).addColumn(FAMILY, new byte[MAX_KEY_VALUE_SIZE], 1), 1727 new Append(row).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]) }; 1728 1729 for (int i = 0; i < emptyMutations.length; i++) { 1730 // Test empty mutation 1731 try { 1732 getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 1733 .build(new RowMutations(row).add(emptyMutations[i]))); 1734 fail("Should fail since the mutation does not contain any cells"); 1735 } catch (IllegalArgumentException e) { 1736 assertThat(e.getMessage(), containsString(emptyMessages[i])); 1737 } 1738 1739 // Test oversized mutation 1740 try { 1741 getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row).ifNotExists(FAMILY, QUALIFIER) 1742 .build(new RowMutations(row).add(oversizedMutations[i]))); 1743 fail("Should fail since the mutation exceeds the max key value size"); 1744 } catch (IllegalArgumentException e) { 1745 assertThat(e.getMessage(), containsString("KeyValue size too large")); 1746 } 1747 } 1748 } 1749}