001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.containsString;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertThat;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.io.UncheckedIOException;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.List;
034import java.util.Optional;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ForkJoinPool;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041import java.util.function.Function;
042import java.util.stream.Collectors;
043import java.util.stream.IntStream;
044import org.apache.hadoop.hbase.Cell;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.coprocessor.ObserverContext;
049import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
050import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
051import org.apache.hadoop.hbase.coprocessor.RegionObserver;
052import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
053import org.apache.hadoop.hbase.testclassification.ClientTests;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.junit.After;
057import org.junit.AfterClass;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.runner.RunWith;
064import org.junit.runners.Parameterized;
065import org.junit.runners.Parameterized.Parameter;
066import org.junit.runners.Parameterized.Parameters;
067
068@RunWith(Parameterized.class)
069@Category({ LargeTests.class, ClientTests.class })
070public class TestAsyncTableBatch {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestAsyncTableBatch.class);
075
076  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
077
078  private static TableName TABLE_NAME = TableName.valueOf("async");
079
080  private static byte[] FAMILY = Bytes.toBytes("cf");
081
082  private static byte[] CQ = Bytes.toBytes("cq");
083  private static byte[] CQ1 = Bytes.toBytes("cq1");
084
085  private static int COUNT = 1000;
086
087  private static AsyncConnection CONN;
088
089  private static byte[][] SPLIT_KEYS;
090
091  private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
092
093  @Parameter(0)
094  public String tableType;
095
096  @Parameter(1)
097  public Function<TableName, AsyncTable<?>> tableGetter;
098
099  private static AsyncTable<?> getRawTable(TableName tableName) {
100    return CONN.getTable(tableName);
101  }
102
103  private static AsyncTable<?> getTable(TableName tableName) {
104    return CONN.getTable(tableName, ForkJoinPool.commonPool());
105  }
106
107  @Parameters(name = "{index}: type={0}")
108  public static List<Object[]> params() {
109    Function<TableName, AsyncTable<?>> rawTableGetter = TestAsyncTableBatch::getRawTable;
110    Function<TableName, AsyncTable<?>> tableGetter = TestAsyncTableBatch::getTable;
111    return Arrays.asList(new Object[] { "raw", rawTableGetter },
112      new Object[] { "normal", tableGetter });
113  }
114
115  @BeforeClass
116  public static void setUp() throws Exception {
117    TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
118      MAX_KEY_VALUE_SIZE);
119    TEST_UTIL.startMiniCluster(3);
120    SPLIT_KEYS = new byte[8][];
121    for (int i = 111; i < 999; i += 111) {
122      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
123    }
124    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
125  }
126
127  @AfterClass
128  public static void tearDown() throws Exception {
129    CONN.close();
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  @Before
134  public void setUpBeforeTest() throws IOException, InterruptedException {
135    TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS);
136    TEST_UTIL.waitTableAvailable(TABLE_NAME);
137  }
138
139  @After
140  public void tearDownAfterTest() throws IOException {
141    Admin admin = TEST_UTIL.getAdmin();
142    if (admin.isTableEnabled(TABLE_NAME)) {
143      admin.disableTable(TABLE_NAME);
144    }
145    admin.deleteTable(TABLE_NAME);
146  }
147
148  private byte[] getRow(int i) {
149    return Bytes.toBytes(String.format("%03d", i));
150  }
151
152  @Test
153  public void test()
154      throws InterruptedException, ExecutionException, IOException, TimeoutException {
155    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
156    table.putAll(IntStream.range(0, COUNT)
157        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
158        .collect(Collectors.toList())).get();
159    List<Result> results = table.getAll(IntStream.range(0, COUNT)
160        .mapToObj(i -> Arrays.asList(new Get(getRow(i)), new Get(Arrays.copyOf(getRow(i), 4))))
161        .flatMap(l -> l.stream()).collect(Collectors.toList())).get();
162    assertEquals(2 * COUNT, results.size());
163    for (int i = 0; i < COUNT; i++) {
164      assertEquals(i, Bytes.toInt(results.get(2 * i).getValue(FAMILY, CQ)));
165      assertTrue(results.get(2 * i + 1).isEmpty());
166    }
167    Admin admin = TEST_UTIL.getAdmin();
168    admin.flush(TABLE_NAME);
169    List<Future<?>> splitFutures =
170      TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).stream().map(r -> {
171        byte[] startKey = r.getRegionInfo().getStartKey();
172        int number = startKey.length == 0 ? 55 : Integer.parseInt(Bytes.toString(startKey));
173        byte[] splitPoint = Bytes.toBytes(String.format("%03d", number + 55));
174        try {
175          return admin.splitRegionAsync(r.getRegionInfo().getRegionName(), splitPoint);
176        } catch (IOException e) {
177          throw new UncheckedIOException(e);
178        }
179      }).collect(Collectors.toList());
180    for (Future<?> future : splitFutures) {
181      future.get(30, TimeUnit.SECONDS);
182    }
183    table.deleteAll(
184      IntStream.range(0, COUNT).mapToObj(i -> new Delete(getRow(i))).collect(Collectors.toList()))
185        .get();
186    results = table
187        .getAll(
188          IntStream.range(0, COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
189        .get();
190    assertEquals(COUNT, results.size());
191    results.forEach(r -> assertTrue(r.isEmpty()));
192  }
193
194  @Test
195  public void testWithRegionServerFailover() throws Exception {
196    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
197    table.putAll(IntStream.range(0, COUNT)
198        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
199        .collect(Collectors.toList())).get();
200    TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).abort("Aborting for tests");
201    Thread.sleep(100);
202    table.putAll(IntStream.range(COUNT, 2 * COUNT)
203        .mapToObj(i -> new Put(getRow(i)).addColumn(FAMILY, CQ, Bytes.toBytes(i)))
204        .collect(Collectors.toList())).get();
205    List<Result> results = table.getAll(
206      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
207        .get();
208    assertEquals(2 * COUNT, results.size());
209    results.forEach(r -> assertFalse(r.isEmpty()));
210    table.deleteAll(IntStream.range(0, 2 * COUNT).mapToObj(i -> new Delete(getRow(i)))
211        .collect(Collectors.toList())).get();
212    results = table.getAll(
213      IntStream.range(0, 2 * COUNT).mapToObj(i -> new Get(getRow(i))).collect(Collectors.toList()))
214        .get();
215    assertEquals(2 * COUNT, results.size());
216    results.forEach(r -> assertTrue(r.isEmpty()));
217  }
218
219  @Test
220  public void testMixed() throws InterruptedException, ExecutionException, IOException {
221    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
222    table.putAll(IntStream.range(0, 7)
223        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long) i)))
224        .collect(Collectors.toList())).get();
225    List<Row> actions = new ArrayList<>();
226    actions.add(new Get(Bytes.toBytes(0)));
227    actions.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, CQ, Bytes.toBytes(2L)));
228    actions.add(new Delete(Bytes.toBytes(2)));
229    actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
230    actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
231    RowMutations rm = new RowMutations(Bytes.toBytes(5));
232    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes(100L)));
233    rm.add((Mutation) new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes(200L)));
234    actions.add(rm);
235    actions.add(new Get(Bytes.toBytes(6)));
236
237    List<Object> results = table.batchAll(actions).get();
238    assertEquals(7, results.size());
239    Result getResult = (Result) results.get(0);
240    assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
241    assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY, CQ)));
242    assertTrue(table.get(new Get(Bytes.toBytes(2))).get().isEmpty());
243    Result incrementResult = (Result) results.get(3);
244    assertEquals(4, Bytes.toLong(incrementResult.getValue(FAMILY, CQ)));
245    Result appendResult = (Result) results.get(4);
246    byte[] appendValue = appendResult.getValue(FAMILY, CQ);
247    assertEquals(12, appendValue.length);
248    assertEquals(4, Bytes.toLong(appendValue));
249    assertEquals(4, Bytes.toInt(appendValue, 8));
250    assertEquals(100,
251      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
252    assertEquals(200,
253      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
254    getResult = (Result) results.get(6);
255    assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
256  }
257
258  public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver {
259
260    @Override
261    public Optional<RegionObserver> getRegionObserver() {
262      return Optional.of(this);
263    }
264
265    @Override
266    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
267        List<Cell> results) throws IOException {
268      if (e.getEnvironment().getRegionInfo().getEndKey().length == 0) {
269        throw new DoNotRetryRegionException("Inject Error");
270      }
271    }
272  }
273
274  @Test
275  public void testPartialSuccess() throws IOException, InterruptedException, ExecutionException {
276    Admin admin = TEST_UTIL.getAdmin();
277    TableDescriptor htd = TableDescriptorBuilder.newBuilder(admin.getDescriptor(TABLE_NAME))
278        .setCoprocessor(ErrorInjectObserver.class.getName()).build();
279    admin.modifyTable(htd);
280    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
281    table.putAll(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Put(k).addColumn(FAMILY, CQ, k))
282        .collect(Collectors.toList())).get();
283    List<CompletableFuture<Result>> futures = table
284        .get(Arrays.asList(SPLIT_KEYS).stream().map(k -> new Get(k)).collect(Collectors.toList()));
285    for (int i = 0; i < SPLIT_KEYS.length - 1; i++) {
286      assertArrayEquals(SPLIT_KEYS[i], futures.get(i).get().getValue(FAMILY, CQ));
287    }
288    try {
289      futures.get(SPLIT_KEYS.length - 1).get();
290      fail();
291    } catch (ExecutionException e) {
292      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
293    }
294  }
295
296  @Test
297  public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException {
298    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
299    List<CompletableFuture<Object>> futures = table.batch(Arrays.asList(
300      new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ,
301        Bytes.toBytes("bad")),
302      new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1),
303      new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good"))));
304    try {
305      futures.get(0).get();
306      fail();
307    } catch (ExecutionException e) {
308      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
309      assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class));
310    }
311    assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ)));
312    assertTrue(((Result) futures.get(2).get()).isEmpty());
313    assertEquals("good",
314      Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
315  }
316
317  @Test
318  public void testInvalidPut() {
319    AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
320    try {
321      table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)), new Put(Bytes.toBytes(0))));
322      fail("Should fail since the put does not contain any cells");
323    } catch (IllegalArgumentException e) {
324      assertThat(e.getMessage(), containsString("No columns to insert"));
325    }
326
327    try {
328      table.batch(
329        Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(FAMILY, CQ, new byte[MAX_KEY_VALUE_SIZE]),
330          new Delete(Bytes.toBytes(0))));
331      fail("Should fail since the put exceeds the max key value size");
332    } catch (IllegalArgumentException e) {
333      assertThat(e.getMessage(), containsString("KeyValue size too large"));
334    }
335  }
336}