001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertThat;
027import static org.junit.Assert.assertTrue;
028import static org.junit.Assert.fail;
029
030import java.io.IOException;
031import java.io.UncheckedIOException;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.List;
035import java.util.Optional;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ForkJoinPool;
039import java.util.concurrent.Future;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.TimeoutException;
042import java.util.function.Function;
043import java.util.stream.Collectors;
044import java.util.stream.IntStream;
045import org.apache.hadoop.hbase.Cell;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.coprocessor.ObserverContext;
050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
052import org.apache.hadoop.hbase.coprocessor.RegionObserver;
053import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.junit.After;
058import org.junit.AfterClass;
059import org.junit.Before;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.runner.RunWith;
065import org.junit.runners.Parameterized;
066import org.junit.runners.Parameterized.Parameter;
067import org.junit.runners.Parameterized.Parameters;
068
069@RunWith(Parameterized.class)
070@Category({ LargeTests.class, ClientTests.class })
071public class TestAsyncTableBatch {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestAsyncTableBatch.class);
076
077  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078
079  private static TableName TABLE_NAME = TableName.valueOf("async");
080
081  private static byte[] FAMILY = Bytes.toBytes("cf");
082
083  private static byte[] CQ = Bytes.toBytes("cq");
084  private static byte[] CQ1 = Bytes.toBytes("cq1");
085
086  private static int COUNT = 1000;
087
088  private static AsyncConnection CONN;
089
090  private static byte[][] SPLIT_KEYS;
091
092  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
093
094  @Parameter(0)
095  public String tableType;
096
097  @Parameter(1)
098  public Function<TableName, AsyncTable<?>> tableGetter;
099
100  private static AsyncTable<?> getRawTable(TableName tableName) {
101    return CONN.getTable(tableName);
102  }
103
104  private static AsyncTable<?> getTable(TableName tableName) {
105    return CONN.getTable(tableName, ForkJoinPool.commonPool());
106  }
107
108  @Parameters(name = "{index}: type={0}")
109  public static List<Object[]> params() {
110    Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable;
111    Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable;
112    return Arrays.asList(new Object[] { "raw", rawTableGetter },
113      new Object[] { "normal", tableGetter });
114  }
115
116  @BeforeClass
117  public static void setUp() throws Exception {
118    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
119      MAX_KEY_VALUE_SIZE);
120    TEST_UTIL.startMiniCluster(3);
121    SPLIT_KEYS = new byte[8][];
122    for (int i = 111; i < 999; i += 111) {
123      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
124    }
125    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
126  }
127
128  @AfterClass
129  public static void tearDown() throws Exception {
130    CONN.close();
131    TEST_UTIL.shutdownMiniCluster();
132  }
133
134  @Before
135  public void setUpBeforeTest() throws IOException, InterruptedException {
136    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
137    TEST_UTIL.waitTableAvailable(TABLE_NAME);
138  }
139
140  @After
141  public void tearDownAfterTest() throws IOException {
142    Admin admin = TEST_UTIL.getAdmin();
143    if (admin.isTableEnabled(TABLE_NAME)) {
144      admin.disableTable(TABLE_NAME);
145    }
146    admin.deleteTable(TABLE_NAME);
147  }
148
149  private byte[] getRow(int i) {
150    return Bytes.toBytes(String.format("%03d", i));
151  }
152
153  @Test
154  public void test()
155      throws InterruptedException, ExecutionException, IOException, TimeoutException {
156    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
157    table.putAll(IntStream.range(0, COUNT)
158        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
159        .collect(Collectors.toList())).get();
160    List<Result> results = table.getAll(IntStream.range(0, COUNT)
161        .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
162        .flatMap(l -> l.stream()).collect(Collectors.toList())).get();
163    assertEquals(2 * COUNT, results.size());
164    for (int i = 0; i < COUNT; i++) {
165      assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
166      assertTrue(results.get(2 * i + 1).isEmpty());
167    }
168    Admin admin = TEST_UTIL.getAdmin();
169    admin.flush(TABLE_NAME);
170    List<Future<?>> splitFutures =
171      TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> {
172        byte[] startKey = r.getRegionInfo().getStartKey();
173        int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
174        byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
175        try {
176          return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint);
177        } catch (IOException e) {
178          throw new UncheckedIOException(e);
179        }
180      }).collect(Collectors.toList());
181    for (Future<?> future : splitFutures) {
182      future.get(30, TimeUnit.SECONDS);
183    }
184    table.deleteAll(
185      IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
186        .get();
187    results = table
188        .getAll(
189          IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
190        .get();
191    assertEquals(COUNT, results.size());
192    results.forEach(r -> assertTrue(r.isEmpty()));
193  }
194
195  @Test
196  public void testWithRegionServerFailover() throws Exception {
197    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
198    table.putAll(IntStream.range(0, COUNT)
199        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
200        .collect(Collectors.toList())).get();
201    TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests");
202    Thread.sleep(100);
203    table.putAll(IntStream.range(COUNT, 2 * COUNT)
204        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
205        .collect(Collectors.toList())).get();
206    List<Result> results = table.getAll(
207      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
208        .get();
209    assertEquals(2 * COUNT, results.size());
210    results.forEach(r -> assertFalse(r.isEmpty()));
211    table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i)))
212        .collect(Collectors.toList())).get();
213    results = table.getAll(
214      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
215        .get();
216    assertEquals(2 * COUNT, results.size());
217    results.forEach(r -> assertTrue(r.isEmpty()));
218  }
219
220  @Test
221  public void testMixed() throws InterruptedException, ExecutionException, IOException {
222    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
223    table.putAll(IntStream.range(0, 7)
224        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
225        .collect(Collectors.toList())).get();
226    List<Row> actions = new ArrayList<>();
227    actions.add(new Get(Bytes.toBytes(0)));
228    actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L)));
229    actions.add(new Delete(Bytes.toBytes(2)));
230    actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
231    actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
232    RowMutations rm = new RowMutations(Bytes.toBytes(5));
233    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
234    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
235    actions.add(rm);
236    actions.add(new Get(Bytes.toBytes(6)));
237
238    List<Object> results = table.batchAll(actions).get();
239    assertEquals(7, results.size());
240    Result getResult = (Result) results.get(0);
241    assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
242    assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
243    assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
244    Result incrementResult = (Result) results.get(3);
245    assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
246    Result appendResult = (Result) results.get(4);
247    byte[] appendValue = appendResult.getValue(FAMILY, CQ);
248    assertEquals(12, appendValue.length);
249    assertEquals(4, Bytes.toLong(appendValue));
250    assertEquals(4, Bytes.toInt(appendValue, 8));
251    assertEquals(100,
252      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
253    assertEquals(200,
254      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
255    getResult = (Result) results.get(6);
256    assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
257  }
258
259  public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
260
261    @Override
262    public Optional<RegionObserver> getRegionObserver() {
263      return Optional.of(this);
264    }
265
266    @Override
267    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
268        List<Cell> results) throws IOException {
269      if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
270        throw new DoNotRetryRegionException("Inject Error");
271      }
272    }
273  }
274
275  @Test
276  public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
277    Admin admin = TEST_UTIL.getAdmin();
278    TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME))
279        .setCoprocessor(ErrorInjectObserver.class.getName()).build();
280    admin.modifyTable(htd);
281    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
282    table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
283        .collect(Collectors.toList())).get();
284    List<CompletableFuture<Result>> futures = table
285        .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
286    for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
287      assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
288    }
289    try {
290      futures.get(SPLIT_KEYS.length - 1).get();
291      fail();
292    } catch (ExecutionException e) {
293      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
294    }
295  }
296
297  @Test
298  public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException {
299    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
300    List<CompletableFuture<Object>> futures = table.batch(Arrays.asList(
301      new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ,
302        Bytes.toBytes("bad")),
303      new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1),
304      new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good"))));
305    try {
306      futures.get(0).get();
307      fail();
308    } catch (ExecutionException e) {
309      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
310      assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class));
311    }
312    assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ)));
313    assertTrue(((Result) futures.get(2).get()).isEmpty());
314    assertEquals("good",
315      Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
316  }
317
318  @Test
319  public void testInvalidPut() {
320    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
321    try {
322      table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0))));
323      fail("Should fail since the put does not contain any cells");
324    } catch (IllegalArgumentException e) {
325      assertThat(e.getMessage(), containsString("No columns to insert"));
326    }
327
328    try {
329      table.batch(
330        Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
331          new Delete(Bytes.toBytes(0))));
332      fail("Should fail since the put exceeds the max key value size");
333    } catch (IllegalArgumentException e) {
334      assertThat(e.getMessage(), containsString("KeyValue size too large"));
335    }
336  }
337
338  @Test
339  public void testWithCheckAndMutate() throws Exception {
340    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
341
342    byte[] row1 = Bytes.toBytes("row1");
343    byte[] row2 = Bytes.toBytes("row2");
344    byte[] row3 = Bytes.toBytes("row3");
345    byte[] row4 = Bytes.toBytes("row4");
346    byte[] row5 = Bytes.toBytes("row5");
347    byte[] row6 = Bytes.toBytes("row6");
348    byte[] row7 = Bytes.toBytes("row7");
349
350    table.putAll(Arrays.asList(
351      new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
352      new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
353      new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
354      new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
355      new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
356      new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
357      new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))).get();
358
359    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
360      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
361      .build(new RowMutations(row1)
362        .add((Mutation) new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
363        .add((Mutation) new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
364        .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
365        .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
366    Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
367    RowMutations mutations = new RowMutations(row3)
368      .add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
369      .add((Mutation) new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
370      .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
371      .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
372    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
373      .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
374      .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
375    Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
376    CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
377      .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
378      .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
379    CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
380      .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
381      .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
382
383    List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
384      checkAndMutate3, checkAndMutate4);
385    List<Object> results = table.batchAll(actions).get();
386
387    CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results.get(0);
388    assertTrue(checkAndMutateResult.isSuccess());
389    assertEquals(3L,
390      Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
391    assertEquals("d",
392      Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
393
394    assertEquals("b",
395      Bytes.toString(((Result) results.get(1)).getValue(FAMILY, Bytes.toBytes("B"))));
396
397    Result result = (Result) results.get(2);
398    assertTrue(result.getExists());
399    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
400    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
401
402    checkAndMutateResult = (CheckAndMutateResult) results.get(3);
403    assertFalse(checkAndMutateResult.isSuccess());
404    assertNull(checkAndMutateResult.getResult());
405
406    assertTrue(((Result) results.get(4)).isEmpty());
407
408    checkAndMutateResult = (CheckAndMutateResult) results.get(5);
409    assertTrue(checkAndMutateResult.isSuccess());
410    assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
411      .getValue(FAMILY, Bytes.toBytes("F"))));
412
413    checkAndMutateResult = (CheckAndMutateResult) results.get(6);
414    assertTrue(checkAndMutateResult.isSuccess());
415    assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
416      .getValue(FAMILY, Bytes.toBytes("G"))));
417
418    result = table.get(new Get(row1)).get();
419    assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
420    assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
421    assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
422    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
423
424    result = table.get(new Get(row3)).get();
425    assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
426    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
427    assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
428    assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
429    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
430
431    result = table.get(new Get(row4)).get();
432    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
433
434    result = table.get(new Get(row5)).get();
435    assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
436
437    result = table.get(new Get(row6)).get();
438    assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
439
440    result = table.get(new Get(row7)).get();
441    assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
442  }
443}