001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.containsString; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertThat; 027import static org.junit.Assert.assertTrue; 028import static org.junit.Assert.fail; 029 030import java.io.IOException; 031import java.io.UncheckedIOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.concurrent.ArrayBlockingQueue; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.CountDownLatch; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.ForkJoinPool; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.function.Supplier; 043import java.util.stream.IntStream; 044import org.apache.commons.io.IOUtils; 045import org.apache.hadoop.hbase.CompareOperator; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.TableNotEnabledException; 050import org.apache.hadoop.hbase.filter.BinaryComparator; 051import org.apache.hadoop.hbase.filter.FamilyFilter; 052import org.apache.hadoop.hbase.filter.FilterList; 053import org.apache.hadoop.hbase.filter.QualifierFilter; 054import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 055import org.apache.hadoop.hbase.filter.TimestampsFilter; 056import org.apache.hadoop.hbase.io.TimeRange; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.MediumTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.Pair; 061import org.junit.AfterClass; 062import org.junit.Before; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.junit.runner.RunWith; 070import org.junit.runners.Parameterized; 071import org.junit.runners.Parameterized.Parameter; 072import org.junit.runners.Parameterized.Parameters; 073 074@RunWith(Parameterized.class) 075@Category({ MediumTests.class, ClientTests.class }) 076public class TestAsyncTable { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestAsyncTable.class); 081 082 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 084 private static TableName TABLE_NAME = TableName.valueOf("async"); 085 086 private static byte[] FAMILY = Bytes.toBytes("cf"); 087 088 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 089 090 private static byte[] VALUE = Bytes.toBytes("value"); 091 092 private static int MAX_KEY_VALUE_SIZE = 64 * 1024; 093 094 private static AsyncConnection ASYNC_CONN; 095 096 @Rule 097 public TestName testName = new TestName(); 098 099 private byte[] row; 100 101 @Parameter 102 public Supplier<AsyncTable<?>> getTable; 103 104 private static AsyncTable<?> getRawTable() { 105 return ASYNC_CONN.getTable(TABLE_NAME); 106 } 107 108 private static AsyncTable<?> getTable() { 109 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 110 } 111 112 @Parameters 113 public static List<Object[]> params() { 114 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable }, 115 new Supplier<?>[] { TestAsyncTable::getTable }); 116 } 117 118 @BeforeClass 119 public static void setUpBeforeClass() throws Exception { 120 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY, 121 MAX_KEY_VALUE_SIZE); 122 TEST_UTIL.startMiniCluster(1); 123 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 124 TEST_UTIL.waitTableAvailable(TABLE_NAME); 125 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 126 assertFalse(ASYNC_CONN.isClosed()); 127 } 128 129 @AfterClass 130 public static void tearDownAfterClass() throws Exception { 131 IOUtils.closeQuietly(ASYNC_CONN); 132 assertTrue(ASYNC_CONN.isClosed()); 133 TEST_UTIL.shutdownMiniCluster(); 134 } 135 136 @Before 137 public void setUp() throws IOException, InterruptedException, ExecutionException { 138 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_")); 139 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) { 140 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get(); 141 } 142 } 143 144 @Test 145 public void testSimple() throws Exception { 146 AsyncTable<?> table = getTable.get(); 147 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get(); 148 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 149 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 150 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER)); 151 table.delete(new Delete(row)).get(); 152 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get(); 153 assertTrue(result.isEmpty()); 154 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get()); 155 } 156 157 private byte[] concat(byte[] base, int index) { 158 return Bytes.toBytes(Bytes.toString(base) + "-" + index); 159 } 160 161 @SuppressWarnings("FutureReturnValueIgnored") 162 @Test 163 public void testSimpleMultiple() throws Exception { 164 AsyncTable<?> table = getTable.get(); 165 int count = 100; 166 CountDownLatch putLatch = new CountDownLatch(count); 167 IntStream.range(0, count).forEach( 168 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))) 169 .thenAccept(x -> putLatch.countDown())); 170 putLatch.await(); 171 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count); 172 IntStream.range(0, count) 173 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 174 .thenAccept(x -> existsResp.add(x))); 175 for (int i = 0; i < count; i++) { 176 assertTrue(existsResp.take()); 177 } 178 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count); 179 IntStream.range(0, count) 180 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 181 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 182 for (int i = 0; i < count; i++) { 183 Pair<Integer, Result> pair = getResp.take(); 184 assertArrayEquals(concat(VALUE, pair.getFirst()), 185 pair.getSecond().getValue(FAMILY, QUALIFIER)); 186 } 187 CountDownLatch deleteLatch = new CountDownLatch(count); 188 IntStream.range(0, count).forEach( 189 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown())); 190 deleteLatch.await(); 191 IntStream.range(0, count) 192 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 193 .thenAccept(x -> existsResp.add(x))); 194 for (int i = 0; i < count; i++) { 195 assertFalse(existsResp.take()); 196 } 197 IntStream.range(0, count) 198 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER)) 199 .thenAccept(x -> getResp.add(Pair.newPair(i, x)))); 200 for (int i = 0; i < count; i++) { 201 Pair<Integer, Result> pair = getResp.take(); 202 assertTrue(pair.getSecond().isEmpty()); 203 } 204 } 205 206 @SuppressWarnings("FutureReturnValueIgnored") 207 @Test 208 public void testIncrement() throws InterruptedException, ExecutionException { 209 AsyncTable<?> table = getTable.get(); 210 int count = 100; 211 CountDownLatch latch = new CountDownLatch(count); 212 AtomicLong sum = new AtomicLong(0L); 213 IntStream.range(0, count) 214 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> { 215 sum.addAndGet(x); 216 latch.countDown(); 217 })); 218 latch.await(); 219 assertEquals(count, Bytes.toLong( 220 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER))); 221 assertEquals((1 + count) * count / 2, sum.get()); 222 } 223 224 @SuppressWarnings("FutureReturnValueIgnored") 225 @Test 226 public void testAppend() throws InterruptedException, ExecutionException { 227 AsyncTable<?> table = getTable.get(); 228 int count = 10; 229 CountDownLatch latch = new CountDownLatch(count); 230 char suffix = ':'; 231 AtomicLong suffixCount = new AtomicLong(0L); 232 IntStream.range(0, count) 233 .forEachOrdered(i -> table 234 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix))) 235 .thenAccept(r -> { 236 suffixCount.addAndGet( 237 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count()); 238 latch.countDown(); 239 })); 240 latch.await(); 241 assertEquals((1 + count) * count / 2, suffixCount.get()); 242 String value = Bytes.toString( 243 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)); 244 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt) 245 .sorted().toArray(); 246 assertArrayEquals(IntStream.range(0, count).toArray(), actual); 247 } 248 249 @SuppressWarnings("FutureReturnValueIgnored") 250 @Test 251 public void testCheckAndPut() throws InterruptedException, ExecutionException { 252 AsyncTable<?> table = getTable.get(); 253 AtomicInteger successCount = new AtomicInteger(0); 254 AtomicInteger successIndex = new AtomicInteger(-1); 255 int count = 10; 256 CountDownLatch latch = new CountDownLatch(count); 257 IntStream.range(0, count) 258 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists() 259 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> { 260 if (x) { 261 successCount.incrementAndGet(); 262 successIndex.set(i); 263 } 264 latch.countDown(); 265 })); 266 latch.await(); 267 assertEquals(1, successCount.get()); 268 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER)); 269 assertTrue(actual.endsWith(Integer.toString(successIndex.get()))); 270 } 271 272 @SuppressWarnings("FutureReturnValueIgnored") 273 @Test 274 public void testCheckAndDelete() throws InterruptedException, ExecutionException { 275 AsyncTable<?> table = getTable.get(); 276 int count = 10; 277 CountDownLatch putLatch = new CountDownLatch(count + 1); 278 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 279 IntStream.range(0, count) 280 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 281 .thenRun(() -> putLatch.countDown())); 282 putLatch.await(); 283 284 AtomicInteger successCount = new AtomicInteger(0); 285 AtomicInteger successIndex = new AtomicInteger(-1); 286 CountDownLatch deleteLatch = new CountDownLatch(count); 287 IntStream.range(0, count) 288 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE) 289 .thenDelete( 290 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))) 291 .thenAccept(x -> { 292 if (x) { 293 successCount.incrementAndGet(); 294 successIndex.set(i); 295 } 296 deleteLatch.countDown(); 297 })); 298 deleteLatch.await(); 299 assertEquals(1, successCount.get()); 300 Result result = table.get(new Get(row)).get(); 301 IntStream.range(0, count).forEach(i -> { 302 if (i == successIndex.get()) { 303 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i))); 304 } else { 305 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 306 } 307 }); 308 } 309 310 @Test 311 public void testMutateRow() throws InterruptedException, ExecutionException, IOException { 312 AsyncTable<?> table = getTable.get(); 313 RowMutations mutation = new RowMutations(row); 314 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE)); 315 table.mutateRow(mutation).get(); 316 Result result = table.get(new Get(row)).get(); 317 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1))); 318 319 mutation = new RowMutations(row); 320 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1))); 321 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE)); 322 table.mutateRow(mutation).get(); 323 result = table.get(new Get(row)).get(); 324 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1))); 325 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2))); 326 } 327 328 @SuppressWarnings("FutureReturnValueIgnored") 329 @Test 330 public void testCheckAndMutate() throws InterruptedException, ExecutionException { 331 AsyncTable<?> table = getTable.get(); 332 int count = 10; 333 CountDownLatch putLatch = new CountDownLatch(count + 1); 334 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown()); 335 IntStream.range(0, count) 336 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE)) 337 .thenRun(() -> putLatch.countDown())); 338 putLatch.await(); 339 340 AtomicInteger successCount = new AtomicInteger(0); 341 AtomicInteger successIndex = new AtomicInteger(-1); 342 CountDownLatch mutateLatch = new CountDownLatch(count); 343 IntStream.range(0, count).forEach(i -> { 344 RowMutations mutation = new RowMutations(row); 345 try { 346 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER)); 347 mutation 348 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i))); 349 } catch (IOException e) { 350 throw new UncheckedIOException(e); 351 } 352 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation) 353 .thenAccept(x -> { 354 if (x) { 355 successCount.incrementAndGet(); 356 successIndex.set(i); 357 } 358 mutateLatch.countDown(); 359 }); 360 }); 361 mutateLatch.await(); 362 assertEquals(1, successCount.get()); 363 Result result = table.get(new Get(row)).get(); 364 IntStream.range(0, count).forEach(i -> { 365 if (i == successIndex.get()) { 366 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i))); 367 } else { 368 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i))); 369 } 370 }); 371 } 372 373 @Test 374 public void testCheckAndMutateWithTimeRange() throws Exception { 375 AsyncTable<?> table = getTable.get(); 376 final long ts = System.currentTimeMillis() / 2; 377 Put put = new Put(row); 378 put.addColumn(FAMILY, QUALIFIER, ts, VALUE); 379 380 boolean ok = 381 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get(); 382 assertTrue(ok); 383 384 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 385 .ifEquals(VALUE).thenPut(put).get(); 386 assertFalse(ok); 387 388 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 389 .ifEquals(VALUE).thenPut(put).get(); 390 assertTrue(ok); 391 392 RowMutations rm = new RowMutations(row).add((Mutation) put); 393 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 394 .ifEquals(VALUE).thenMutate(rm).get(); 395 assertFalse(ok); 396 397 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 398 .ifEquals(VALUE).thenMutate(rm).get(); 399 assertTrue(ok); 400 401 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER); 402 403 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000)) 404 .ifEquals(VALUE).thenDelete(delete).get(); 405 assertFalse(ok); 406 407 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts)) 408 .ifEquals(VALUE).thenDelete(delete).get(); 409 assertTrue(ok); 410 } 411 412 @Test 413 public void testCheckAndMutateWithSingleFilter() throws Throwable { 414 AsyncTable<?> table = getTable.get(); 415 416 // Put one row 417 Put put = new Put(row); 418 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 419 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 420 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 421 table.put(put).get(); 422 423 // Put with success 424 boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 425 CompareOperator.EQUAL, Bytes.toBytes("a"))) 426 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 427 .get(); 428 assertTrue(ok); 429 430 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 431 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 432 433 // Put with failure 434 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 435 CompareOperator.EQUAL, Bytes.toBytes("b"))) 436 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) 437 .get(); 438 assertFalse(ok); 439 440 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 441 442 // Delete with success 443 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 444 CompareOperator.EQUAL, Bytes.toBytes("a"))) 445 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))) 446 .get(); 447 assertTrue(ok); 448 449 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 450 451 // Mutate with success 452 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), 453 CompareOperator.EQUAL, Bytes.toBytes("b"))) 454 .thenMutate(new RowMutations(row) 455 .add((Mutation) new Put(row) 456 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 457 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 458 .get(); 459 assertTrue(ok); 460 461 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 462 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 463 464 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 465 } 466 467 @Test 468 public void testCheckAndMutateWithMultipleFilters() throws Throwable { 469 AsyncTable<?> table = getTable.get(); 470 471 // Put one row 472 Put put = new Put(row); 473 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")); 474 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")); 475 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")); 476 table.put(put).get(); 477 478 // Put with success 479 boolean ok = table.checkAndMutate(row, new FilterList( 480 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 481 Bytes.toBytes("a")), 482 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 483 Bytes.toBytes("b")) 484 )) 485 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 486 .get(); 487 assertTrue(ok); 488 489 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 490 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 491 492 // Put with failure 493 ok = table.checkAndMutate(row, new FilterList( 494 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 495 Bytes.toBytes("a")), 496 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 497 Bytes.toBytes("c")) 498 )) 499 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))) 500 .get(); 501 assertFalse(ok); 502 503 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get()); 504 505 // Delete with success 506 ok = table.checkAndMutate(row, new FilterList( 507 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 508 Bytes.toBytes("a")), 509 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 510 Bytes.toBytes("b")) 511 )) 512 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D"))) 513 .get(); 514 assertTrue(ok); 515 516 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get()); 517 518 // Mutate with success 519 ok = table.checkAndMutate(row, new FilterList( 520 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL, 521 Bytes.toBytes("a")), 522 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL, 523 Bytes.toBytes("b")) 524 )) 525 .thenMutate(new RowMutations(row) 526 .add((Mutation) new Put(row) 527 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))) 528 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A")))) 529 .get(); 530 assertTrue(ok); 531 532 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get(); 533 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D")))); 534 535 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get()); 536 } 537 538 @Test 539 public void testCheckAndMutateWithTimestampFilter() throws Throwable { 540 AsyncTable<?> table = getTable.get(); 541 542 // Put with specifying the timestamp 543 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get(); 544 545 // Put with success 546 boolean ok = table.checkAndMutate(row, new FilterList( 547 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 548 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 549 new TimestampsFilter(Collections.singletonList(100L)) 550 )) 551 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))) 552 .get(); 553 assertTrue(ok); 554 555 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 556 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 557 558 // Put with failure 559 ok = table.checkAndMutate(row, new FilterList( 560 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)), 561 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))), 562 new TimestampsFilter(Collections.singletonList(101L)) 563 )) 564 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 565 .get(); 566 assertFalse(ok); 567 568 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 569 } 570 571 @Test 572 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable { 573 AsyncTable<?> table = getTable.get(); 574 575 // Put with specifying the timestamp 576 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))) 577 .get(); 578 579 // Put with success 580 boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 581 CompareOperator.EQUAL, Bytes.toBytes("a"))) 582 .timeRange(TimeRange.between(0, 101)) 583 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))) 584 .get(); 585 assertTrue(ok); 586 587 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get(); 588 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B")))); 589 590 // Put with failure 591 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), 592 CompareOperator.EQUAL, Bytes.toBytes("a"))) 593 .timeRange(TimeRange.between(0, 100)) 594 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))) 595 .get(); 596 assertFalse(ok); 597 598 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get()); 599 } 600 601 @Test(expected = NullPointerException.class) 602 public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable { 603 getTable.get().checkAndMutate(row, FAMILY) 604 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))); 605 } 606 607 @Test 608 public void testDisabled() throws InterruptedException, ExecutionException { 609 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get(); 610 try { 611 getTable.get().get(new Get(row)).get(); 612 fail("Should fail since table has been disabled"); 613 } catch (ExecutionException e) { 614 Throwable cause = e.getCause(); 615 assertThat(cause, instanceOf(TableNotEnabledException.class)); 616 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString())); 617 } 618 } 619 620 @Test 621 public void testInvalidPut() { 622 try { 623 getTable.get().put(new Put(Bytes.toBytes(0))); 624 fail("Should fail since the put does not contain any cells"); 625 } catch (IllegalArgumentException e) { 626 assertThat(e.getMessage(), containsString("No columns to insert")); 627 } 628 629 try { 630 getTable.get() 631 .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])); 632 fail("Should fail since the put exceeds the max key value size"); 633 } catch (IllegalArgumentException e) { 634 assertThat(e.getMessage(), containsString("KeyValue size too large")); 635 } 636 } 637}