001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
021import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
022import static org.junit.Assert.assertEquals;
023
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.List;
028import java.util.Random;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035import java.util.stream.Collectors;
036import java.util.stream.IntStream;
037
038import org.apache.commons.io.IOUtils;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.MemoryCompactionPolicy;
042import org.apache.hadoop.hbase.ServerName;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
045import org.apache.hadoop.hbase.io.ByteBufferPool;
046import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.testclassification.ClientTests;
049import org.apache.hadoop.hbase.testclassification.LargeTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.RetryCounter;
052import org.apache.hadoop.hbase.util.Threads;
053import org.junit.AfterClass;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058
059/**
060 * Will split the table, and move region randomly when testing.
061 */
062@Category({ LargeTests.class, ClientTests.class })
063public class TestAsyncTableGetMultiThreaded {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestAsyncTableGetMultiThreaded.class);
068
069  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
070
071  private static TableName TABLE_NAME = TableName.valueOf("async");
072
073  private static byte[] FAMILY = Bytes.toBytes("cf");
074
075  private static byte[] QUALIFIER = Bytes.toBytes("cq");
076
077  private static int COUNT = 1000;
078
079  private static AsyncConnection CONN;
080
081  private static AsyncTable<?> TABLE;
082
083  private static byte[][] SPLIT_KEYS;
084
085  @BeforeClass
086  public static void setUp() throws Exception {
087    setUp(MemoryCompactionPolicy.NONE);
088  }
089
090  protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception {
091    TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
092    TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
093    TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
094    TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
095      String.valueOf(memoryCompaction));
096
097    TEST_UTIL.startMiniCluster(5);
098    SPLIT_KEYS = new byte[8][];
099    for (int i = 111; i < 999; i += 111) {
100      SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
101    }
102    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
103    TEST_UTIL.waitTableAvailable(TABLE_NAME);
104    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
105    TABLE = CONN.getTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS)
106        .setMaxRetries(1000).build();
107    TABLE.putAll(
108      IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
109          .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
110        .get();
111  }
112
113  @AfterClass
114  public static void tearDown() throws Exception {
115    IOUtils.closeQuietly(CONN);
116    TEST_UTIL.shutdownMiniCluster();
117  }
118
119  private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
120    while (!stop.get()) {
121      for (int i = 0; i < COUNT; i++) {
122        assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i))))
123            .get().getValue(FAMILY, QUALIFIER)));
124      }
125    }
126  }
127
128  @Test
129  public void test() throws Exception {
130    int numThreads = 20;
131    AtomicBoolean stop = new AtomicBoolean(false);
132    ExecutorService executor =
133      Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-"));
134    List<Future<?>> futures = new ArrayList<>();
135    IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
136      run(stop);
137      return null;
138    })));
139    Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
140    Admin admin = TEST_UTIL.getAdmin();
141    for (byte[] splitPoint : SPLIT_KEYS) {
142      int oldRegionCount = admin.getRegions(TABLE_NAME).size();
143      admin.split(TABLE_NAME, splitPoint);
144      TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
145        @Override
146        public boolean evaluate() throws Exception {
147          return TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() > oldRegionCount;
148        }
149
150        @Override
151        public String explainFailure() throws Exception {
152          return "Split has not finished yet";
153        }
154      });
155
156      for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
157        region.compact(true);
158
159        //Waiting for compaction to complete and references are cleaned up
160        RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS);
161        while (CompactionState.NONE != admin
162            .getCompactionStateForRegion(region.getRegionInfo().getRegionName())
163            && retrier.shouldRetry()) {
164          retrier.sleepUntilNextRetry();
165        }
166        region.getStores().get(0).closeAndArchiveCompactedFiles();
167      }
168      Thread.sleep(5000);
169      admin.balance(true);
170      Thread.sleep(5000);
171      ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
172      ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
173          .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer))
174          .findAny().get();
175      admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
176        Bytes.toBytes(newMetaServer.getServerName()));
177      Thread.sleep(5000);
178    }
179    stop.set(true);
180    executor.shutdown();
181    for (Future<?> future : futures) {
182      future.get();
183    }
184  }
185}