001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertThat;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.List;
034import java.util.concurrent.ArrayBlockingQueue;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.CountDownLatch;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ForkJoinPool;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicLong;
041import java.util.function.Supplier;
042import java.util.stream.IntStream;
043import org.apache.commons.io.IOUtils;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.TableNotEnabledException;
048import org.apache.hadoop.hbase.io.TimeRange;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.Pair;
053import org.junit.AfterClass;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameter;
064import org.junit.runners.Parameterized.Parameters;
065
066@RunWith(Parameterized.class)
067@Category({ MediumTests.class, ClientTests.class })
068public class TestAsyncTable {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestAsyncTable.class);
073
074  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075
076  private static TableName TABLE_NAME = TableName.valueOf("async");
077
078  private static byte[] FAMILY = Bytes.toBytes("cf");
079
080  private static byte[] QUALIFIER = Bytes.toBytes("cq");
081
082  private static byte[] VALUE = Bytes.toBytes("value");
083
084  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
085
086  private static AsyncConnection ASYNC_CONN;
087
088  @Rule
089  public TestName testName = new TestName();
090
091  private byte[] row;
092
093  @Parameter
094  public Supplier<AsyncTable<?>> getTable;
095
096  private static AsyncTable<?> getRawTable() {
097    return ASYNC_CONN.getTable(TABLE_NAME);
098  }
099
100  private static AsyncTable<?> getTable() {
101    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
102  }
103
104  @Parameters
105  public static List<Object[]> params() {
106    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
107      new Supplier<?>[] { TestAsyncTable::getTable });
108  }
109
110  @BeforeClass
111  public static void setUpBeforeClass() throws Exception {
112    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
113      MAX_KEY_VALUE_SIZE);
114    TEST_UTIL.startMiniCluster(1);
115    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
116    TEST_UTIL.waitTableAvailable(TABLE_NAME);
117    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
118    assertFalse(ASYNC_CONN.isClosed());
119  }
120
121  @AfterClass
122  public static void tearDownAfterClass() throws Exception {
123    IOUtils.closeQuietly(ASYNC_CONN);
124    assertTrue(ASYNC_CONN.isClosed());
125    TEST_UTIL.shutdownMiniCluster();
126  }
127
128  @Before
129  public void setUp() throws IOException, InterruptedException, ExecutionException {
130    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
131    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
132      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
133    }
134  }
135
136  @Test
137  public void testSimple() throws Exception {
138    AsyncTable<?> table = getTable.get();
139    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
140    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
141    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
142    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
143    table.delete(new Delete(row)).get();
144    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
145    assertTrue(result.isEmpty());
146    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
147  }
148
149  private byte[] concat(byte[] base, int index) {
150    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
151  }
152
153  @SuppressWarnings("FutureReturnValueIgnored")
154  @Test
155  public void testSimpleMultiple() throws Exception {
156    AsyncTable<?> table = getTable.get();
157    int count = 100;
158    CountDownLatch putLatch = new CountDownLatch(count);
159    IntStream.range(0, count).forEach(
160      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
161        .thenAccept(x -> putLatch.countDown()));
162    putLatch.await();
163    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
164    IntStream.range(0, count)
165      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
166        .thenAccept(x -> existsResp.add(x)));
167    for (int i = 0; i < count; i++) {
168      assertTrue(existsResp.take());
169    }
170    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
171    IntStream.range(0, count)
172      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
173        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
174    for (int i = 0; i < count; i++) {
175      Pair<Integer, Result> pair = getResp.take();
176      assertArrayEquals(concat(VALUE, pair.getFirst()),
177        pair.getSecond().getValue(FAMILY, QUALIFIER));
178    }
179    CountDownLatch deleteLatch = new CountDownLatch(count);
180    IntStream.range(0, count).forEach(
181      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
182    deleteLatch.await();
183    IntStream.range(0, count)
184      .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
185        .thenAccept(x -> existsResp.add(x)));
186    for (int i = 0; i < count; i++) {
187      assertFalse(existsResp.take());
188    }
189    IntStream.range(0, count)
190      .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
191        .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
192    for (int i = 0; i < count; i++) {
193      Pair<Integer, Result> pair = getResp.take();
194      assertTrue(pair.getSecond().isEmpty());
195    }
196  }
197
198  @SuppressWarnings("FutureReturnValueIgnored")
199  @Test
200  public void testIncrement() throws InterruptedException, ExecutionException {
201    AsyncTable<?> table = getTable.get();
202    int count = 100;
203    CountDownLatch latch = new CountDownLatch(count);
204    AtomicLong sum = new AtomicLong(0L);
205    IntStream.range(0, count)
206      .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
207        sum.addAndGet(x);
208        latch.countDown();
209      }));
210    latch.await();
211    assertEquals(count, Bytes.toLong(
212      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
213    assertEquals((1 + count) * count / 2, sum.get());
214  }
215
216  @SuppressWarnings("FutureReturnValueIgnored")
217  @Test
218  public void testAppend() throws InterruptedException, ExecutionException {
219    AsyncTable<?> table = getTable.get();
220    int count = 10;
221    CountDownLatch latch = new CountDownLatch(count);
222    char suffix = ':';
223    AtomicLong suffixCount = new AtomicLong(0L);
224    IntStream.range(0, count)
225      .forEachOrdered(i -> table
226        .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
227        .thenAccept(r -> {
228          suffixCount.addAndGet(
229            Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
230          latch.countDown();
231        }));
232    latch.await();
233    assertEquals((1 + count) * count / 2, suffixCount.get());
234    String value = Bytes.toString(
235      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
236    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
237      .sorted().toArray();
238    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
239  }
240
241  @SuppressWarnings("FutureReturnValueIgnored")
242  @Test
243  public void testCheckAndPut() throws InterruptedException, ExecutionException {
244    AsyncTable<?> table = getTable.get();
245    AtomicInteger successCount = new AtomicInteger(0);
246    AtomicInteger successIndex = new AtomicInteger(-1);
247    int count = 10;
248    CountDownLatch latch = new CountDownLatch(count);
249    IntStream.range(0, count)
250      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
251        .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
252          if (x) {
253            successCount.incrementAndGet();
254            successIndex.set(i);
255          }
256          latch.countDown();
257        }));
258    latch.await();
259    assertEquals(1, successCount.get());
260    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
261    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
262  }
263
264  @SuppressWarnings("FutureReturnValueIgnored")
265  @Test
266  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
267    AsyncTable<?> table = getTable.get();
268    int count = 10;
269    CountDownLatch putLatch = new CountDownLatch(count + 1);
270    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
271    IntStream.range(0, count)
272      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
273        .thenRun(() -> putLatch.countDown()));
274    putLatch.await();
275
276    AtomicInteger successCount = new AtomicInteger(0);
277    AtomicInteger successIndex = new AtomicInteger(-1);
278    CountDownLatch deleteLatch = new CountDownLatch(count);
279    IntStream.range(0, count)
280      .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
281        .thenDelete(
282          new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
283        .thenAccept(x -> {
284          if (x) {
285            successCount.incrementAndGet();
286            successIndex.set(i);
287          }
288          deleteLatch.countDown();
289        }));
290    deleteLatch.await();
291    assertEquals(1, successCount.get());
292    Result result = table.get(new Get(row)).get();
293    IntStream.range(0, count).forEach(i -> {
294      if (i == successIndex.get()) {
295        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
296      } else {
297        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
298      }
299    });
300  }
301
302  @Test
303  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
304    AsyncTable<?> table = getTable.get();
305    RowMutations mutation = new RowMutations(row);
306    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
307    table.mutateRow(mutation).get();
308    Result result = table.get(new Get(row)).get();
309    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
310
311    mutation = new RowMutations(row);
312    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
313    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
314    table.mutateRow(mutation).get();
315    result = table.get(new Get(row)).get();
316    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
317    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
318  }
319
320  @SuppressWarnings("FutureReturnValueIgnored")
321  @Test
322  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
323    AsyncTable<?> table = getTable.get();
324    int count = 10;
325    CountDownLatch putLatch = new CountDownLatch(count + 1);
326    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
327    IntStream.range(0, count)
328      .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
329        .thenRun(() -> putLatch.countDown()));
330    putLatch.await();
331
332    AtomicInteger successCount = new AtomicInteger(0);
333    AtomicInteger successIndex = new AtomicInteger(-1);
334    CountDownLatch mutateLatch = new CountDownLatch(count);
335    IntStream.range(0, count).forEach(i -> {
336      RowMutations mutation = new RowMutations(row);
337      try {
338        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
339        mutation
340          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
341      } catch (IOException e) {
342        throw new UncheckedIOException(e);
343      }
344      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
345        .thenAccept(x -> {
346          if (x) {
347            successCount.incrementAndGet();
348            successIndex.set(i);
349          }
350          mutateLatch.countDown();
351        });
352    });
353    mutateLatch.await();
354    assertEquals(1, successCount.get());
355    Result result = table.get(new Get(row)).get();
356    IntStream.range(0, count).forEach(i -> {
357      if (i == successIndex.get()) {
358        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
359      } else {
360        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
361      }
362    });
363  }
364
365  @Test
366  public void testCheckAndMutateWithTimeRange() throws Exception {
367    AsyncTable<?> table = getTable.get();
368    final long ts = System.currentTimeMillis() / 2;
369    Put put = new Put(row);
370    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
371
372    boolean ok =
373      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
374    assertTrue(ok);
375
376    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
377      .ifEquals(VALUE).thenPut(put).get();
378    assertFalse(ok);
379
380    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
381      .ifEquals(VALUE).thenPut(put).get();
382    assertTrue(ok);
383
384    RowMutations rm = new RowMutations(row).add((Mutation) put);
385    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
386      .ifEquals(VALUE).thenMutate(rm).get();
387    assertFalse(ok);
388
389    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
390      .ifEquals(VALUE).thenMutate(rm).get();
391    assertTrue(ok);
392
393    Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
394
395    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
396      .ifEquals(VALUE).thenDelete(delete).get();
397    assertFalse(ok);
398
399    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
400      .ifEquals(VALUE).thenDelete(delete).get();
401    assertTrue(ok);
402  }
403
404  @Test
405  public void testDisabled() throws InterruptedException, ExecutionException {
406    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
407    try {
408      getTable.get().get(new Get(row)).get();
409      fail("Should fail since table has been disabled");
410    } catch (ExecutionException e) {
411      Throwable cause = e.getCause();
412      assertThat(cause, instanceOf(TableNotEnabledException.class));
413      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
414    }
415  }
416
417  @Test
418  public void testInvalidPut() {
419    try {
420      getTable.get().put(new Put(Bytes.toBytes(0)));
421      fail("Should fail since the put does not contain any cells");
422    } catch (IllegalArgumentException e) {
423      assertThat(e.getMessage(), containsString("No columns to insert"));
424    }
425
426    try {
427      getTable.get()
428        .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
429      fail("Should fail since the put exceeds the max key value size");
430    } catch (IllegalArgumentException e) {
431      assertThat(e.getMessage(), containsString("KeyValue size too large"));
432    }
433  }
434}