001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertThat;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.Arrays;
033import java.util.List;
034import java.util.concurrent.ArrayBlockingQueue;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.CountDownLatch;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ForkJoinPool;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.concurrent.atomic.AtomicLong;
041import java.util.function.Supplier;
042import java.util.stream.IntStream;
043import org.apache.commons.io.IOUtils;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.TableNotEnabledException;
048import org.apache.hadoop.hbase.io.TimeRange;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.MediumTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.Pair;
053import org.junit.AfterClass;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063import org.junit.runners.Parameterized.Parameter;
064import org.junit.runners.Parameterized.Parameters;
065
066@RunWith(Parameterized.class)
067@Category({ MediumTests.class, ClientTests.class })
068public class TestAsyncTable {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestAsyncTable.class);
073
074  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075
076  private static TableName TABLE_NAME = TableName.valueOf("async");
077
078  private static byte[] FAMILY = Bytes.toBytes("cf");
079
080  private static byte[] QUALIFIER = Bytes.toBytes("cq");
081
082  private static byte[] VALUE = Bytes.toBytes("value");
083
084  private static AsyncConnection ASYNC_CONN;
085
086  @Rule
087  public TestName testName = new TestName();
088
089  private byte[] row;
090
091  @Parameter
092  public Supplier<AsyncTable<?>> getTable;
093
094  private static AsyncTable<?> getRawTable() {
095    return ASYNC_CONN.getTable(TABLE_NAME);
096  }
097
098  private static AsyncTable<?> getTable() {
099    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
100  }
101
102  @Parameters
103  public static List<Object[]> params() {
104    return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
105      new Supplier<?>[] { TestAsyncTable::getTable });
106  }
107
108  @BeforeClass
109  public static void setUpBeforeClass() throws Exception {
110    TEST_UTIL.startMiniCluster(1);
111    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
112    TEST_UTIL.waitTableAvailable(TABLE_NAME);
113    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
114  }
115
116  @AfterClass
117  public static void tearDownAfterClass() throws Exception {
118    IOUtils.closeQuietly(ASYNC_CONN);
119    TEST_UTIL.shutdownMiniCluster();
120  }
121
122  @Before
123  public void setUp() throws IOException, InterruptedException, ExecutionException {
124    row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
125    if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
126      ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
127    }
128  }
129
130  @Test
131  public void testSimple() throws Exception {
132    AsyncTable<?> table = getTable.get();
133    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
134    assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
135    Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
136    assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
137    table.delete(new Delete(row)).get();
138    result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
139    assertTrue(result.isEmpty());
140    assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
141  }
142
143  private byte[] concat(byte[] base, int index) {
144    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
145  }
146
147  @Test
148  public void testSimpleMultiple() throws Exception {
149    AsyncTable<?> table = getTable.get();
150    int count = 100;
151    CountDownLatch putLatch = new CountDownLatch(count);
152    IntStream.range(0, count).forEach(
153      i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
154          .thenAccept(x -> putLatch.countDown()));
155    putLatch.await();
156    BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
157    IntStream.range(0, count)
158        .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
159            .thenAccept(x -> existsResp.add(x)));
160    for (int i = 0; i < count; i++) {
161      assertTrue(existsResp.take());
162    }
163    BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
164    IntStream.range(0, count)
165        .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
166            .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
167    for (int i = 0; i < count; i++) {
168      Pair<Integer, Result> pair = getResp.take();
169      assertArrayEquals(concat(VALUE, pair.getFirst()),
170        pair.getSecond().getValue(FAMILY, QUALIFIER));
171    }
172    CountDownLatch deleteLatch = new CountDownLatch(count);
173    IntStream.range(0, count).forEach(
174      i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
175    deleteLatch.await();
176    IntStream.range(0, count)
177        .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
178            .thenAccept(x -> existsResp.add(x)));
179    for (int i = 0; i < count; i++) {
180      assertFalse(existsResp.take());
181    }
182    IntStream.range(0, count)
183        .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
184            .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
185    for (int i = 0; i < count; i++) {
186      Pair<Integer, Result> pair = getResp.take();
187      assertTrue(pair.getSecond().isEmpty());
188    }
189  }
190
191  @Test
192  public void testIncrement() throws InterruptedException, ExecutionException {
193    AsyncTable<?> table = getTable.get();
194    int count = 100;
195    CountDownLatch latch = new CountDownLatch(count);
196    AtomicLong sum = new AtomicLong(0L);
197    IntStream.range(0, count)
198        .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
199          sum.addAndGet(x);
200          latch.countDown();
201        }));
202    latch.await();
203    assertEquals(count, Bytes.toLong(
204      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
205    assertEquals((1 + count) * count / 2, sum.get());
206  }
207
208  @Test
209  public void testAppend() throws InterruptedException, ExecutionException {
210    AsyncTable<?> table = getTable.get();
211    int count = 10;
212    CountDownLatch latch = new CountDownLatch(count);
213    char suffix = ':';
214    AtomicLong suffixCount = new AtomicLong(0L);
215    IntStream.range(0, count).forEachOrdered(
216      i -> table.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
217          .thenAccept(r -> {
218            suffixCount.addAndGet(Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars()
219                .filter(x -> x == suffix).count());
220            latch.countDown();
221          }));
222    latch.await();
223    assertEquals((1 + count) * count / 2, suffixCount.get());
224    String value = Bytes.toString(
225      table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
226    int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
227        .sorted().toArray();
228    assertArrayEquals(IntStream.range(0, count).toArray(), actual);
229  }
230
231  @Test
232  public void testCheckAndPut() throws InterruptedException, ExecutionException {
233    AsyncTable<?> table = getTable.get();
234    AtomicInteger successCount = new AtomicInteger(0);
235    AtomicInteger successIndex = new AtomicInteger(-1);
236    int count = 10;
237    CountDownLatch latch = new CountDownLatch(count);
238    IntStream.range(0, count)
239        .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
240            .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
241              if (x) {
242                successCount.incrementAndGet();
243                successIndex.set(i);
244              }
245              latch.countDown();
246            }));
247    latch.await();
248    assertEquals(1, successCount.get());
249    String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
250    assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
251  }
252
253  @Test
254  public void testCheckAndDelete() throws InterruptedException, ExecutionException {
255    AsyncTable<?> table = getTable.get();
256    int count = 10;
257    CountDownLatch putLatch = new CountDownLatch(count + 1);
258    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
259    IntStream.range(0, count)
260        .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
261            .thenRun(() -> putLatch.countDown()));
262    putLatch.await();
263
264    AtomicInteger successCount = new AtomicInteger(0);
265    AtomicInteger successIndex = new AtomicInteger(-1);
266    CountDownLatch deleteLatch = new CountDownLatch(count);
267    IntStream.range(0, count)
268        .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
269            .thenDelete(
270              new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
271            .thenAccept(x -> {
272              if (x) {
273                successCount.incrementAndGet();
274                successIndex.set(i);
275              }
276              deleteLatch.countDown();
277            }));
278    deleteLatch.await();
279    assertEquals(1, successCount.get());
280    Result result = table.get(new Get(row)).get();
281    IntStream.range(0, count).forEach(i -> {
282      if (i == successIndex.get()) {
283        assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
284      } else {
285        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
286      }
287    });
288  }
289
290  @Test
291  public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
292    AsyncTable<?> table = getTable.get();
293    RowMutations mutation = new RowMutations(row);
294    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
295    table.mutateRow(mutation).get();
296    Result result = table.get(new Get(row)).get();
297    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
298
299    mutation = new RowMutations(row);
300    mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
301    mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
302    table.mutateRow(mutation).get();
303    result = table.get(new Get(row)).get();
304    assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
305    assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
306  }
307
308  @Test
309  public void testCheckAndMutate() throws InterruptedException, ExecutionException {
310    AsyncTable<?> table = getTable.get();
311    int count = 10;
312    CountDownLatch putLatch = new CountDownLatch(count + 1);
313    table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
314    IntStream.range(0, count)
315        .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
316            .thenRun(() -> putLatch.countDown()));
317    putLatch.await();
318
319    AtomicInteger successCount = new AtomicInteger(0);
320    AtomicInteger successIndex = new AtomicInteger(-1);
321    CountDownLatch mutateLatch = new CountDownLatch(count);
322    IntStream.range(0, count).forEach(i -> {
323      RowMutations mutation = new RowMutations(row);
324      try {
325        mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
326        mutation
327          .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
328      } catch (IOException e) {
329        throw new UncheckedIOException(e);
330      }
331      table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
332          .thenAccept(x -> {
333            if (x) {
334              successCount.incrementAndGet();
335              successIndex.set(i);
336            }
337            mutateLatch.countDown();
338          });
339    });
340    mutateLatch.await();
341    assertEquals(1, successCount.get());
342    Result result = table.get(new Get(row)).get();
343    IntStream.range(0, count).forEach(i -> {
344      if (i == successIndex.get()) {
345        assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
346      } else {
347        assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
348      }
349    });
350  }
351
352  @Test
353  public void testCheckAndMutateWithTimeRange() throws Exception {
354    AsyncTable<?> table = getTable.get();
355    final long ts = System.currentTimeMillis() / 2;
356    Put put = new Put(row);
357    put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
358
359    boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
360      .ifNotExists()
361      .thenPut(put)
362      .get();
363    assertTrue(ok);
364
365    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
366      .timeRange(TimeRange.at(ts + 10000))
367      .ifEquals(VALUE)
368      .thenPut(put)
369      .get();
370    assertFalse(ok);
371
372    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
373      .timeRange(TimeRange.at(ts))
374      .ifEquals(VALUE)
375      .thenPut(put)
376      .get();
377    assertTrue(ok);
378
379    RowMutations rm = new RowMutations(row)
380      .add((Mutation) put);
381    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
382      .timeRange(TimeRange.at(ts + 10000))
383      .ifEquals(VALUE)
384      .thenMutate(rm)
385      .get();
386    assertFalse(ok);
387
388    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
389      .timeRange(TimeRange.at(ts))
390      .ifEquals(VALUE)
391      .thenMutate(rm)
392      .get();
393    assertTrue(ok);
394
395    Delete delete = new Delete(row)
396      .addColumn(FAMILY, QUALIFIER);
397
398    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
399      .timeRange(TimeRange.at(ts + 10000))
400      .ifEquals(VALUE)
401      .thenDelete(delete)
402      .get();
403    assertFalse(ok);
404
405    ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
406      .timeRange(TimeRange.at(ts))
407      .ifEquals(VALUE)
408      .thenDelete(delete)
409      .get();
410    assertTrue(ok);
411  }
412
413  @Test
414  public void testDisabled() throws InterruptedException, ExecutionException {
415    ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
416    try {
417      getTable.get().get(new Get(row)).get();
418      fail("Should fail since table has been disabled");
419    } catch (ExecutionException e) {
420      Throwable cause = e.getCause();
421      assertThat(cause, instanceOf(TableNotEnabledException.class));
422      assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
423    }
424  }
425}