001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; 021import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER; 022import static org.junit.Assert.assertEquals; 023 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collections; 027import java.util.List; 028import java.util.Random; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.stream.Collectors; 036import java.util.stream.IntStream; 037 038import org.apache.commons.io.IOUtils; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.MemoryCompactionPolicy; 042import org.apache.hadoop.hbase.ServerName; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 045import org.apache.hadoop.hbase.io.ByteBufferPool; 046import org.apache.hadoop.hbase.regionserver.CompactingMemStore; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.testclassification.ClientTests; 049import org.apache.hadoop.hbase.testclassification.LargeTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.RetryCounter; 052import org.apache.hadoop.hbase.util.Threads; 053import org.junit.AfterClass; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058 059/** 060 * Will split the table, and move region randomly when testing. 061 */ 062@Category({ LargeTests.class, ClientTests.class }) 063public class TestAsyncTableGetMultiThreaded { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestAsyncTableGetMultiThreaded.class); 068 069 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 070 071 private static TableName TABLE_NAME = TableName.valueOf("async"); 072 073 private static byte[] FAMILY = Bytes.toBytes("cf"); 074 075 private static byte[] QUALIFIER = Bytes.toBytes("cq"); 076 077 private static int COUNT = 1000; 078 079 private static AsyncConnection CONN; 080 081 private static AsyncTable<?> TABLE; 082 083 private static byte[][] SPLIT_KEYS; 084 085 @BeforeClass 086 public static void setUp() throws Exception { 087 setUp(MemoryCompactionPolicy.NONE); 088 } 089 090 protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception { 091 TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); 092 TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); 093 TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); 094 TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, 095 String.valueOf(memoryCompaction)); 096 097 TEST_UTIL.startMiniCluster(5); 098 SPLIT_KEYS = new byte[8][]; 099 for (int i = 111; i < 999; i += 111) { 100 SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 101 } 102 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 103 TEST_UTIL.waitTableAvailable(TABLE_NAME); 104 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 105 TABLE = CONN.getTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS) 106 .setMaxRetries(1000).build(); 107 TABLE.putAll( 108 IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 109 .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) 110 .get(); 111 } 112 113 @AfterClass 114 public static void tearDown() throws Exception { 115 IOUtils.closeQuietly(CONN); 116 TEST_UTIL.shutdownMiniCluster(); 117 } 118 119 private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { 120 while (!stop.get()) { 121 for (int i = 0; i < COUNT; i++) { 122 assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i)))) 123 .get().getValue(FAMILY, QUALIFIER))); 124 } 125 } 126 } 127 128 @Test 129 public void test() throws Exception { 130 int numThreads = 20; 131 AtomicBoolean stop = new AtomicBoolean(false); 132 ExecutorService executor = 133 Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); 134 List<Future<?>> futures = new ArrayList<>(); 135 IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { 136 run(stop); 137 return null; 138 }))); 139 Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123)); 140 Admin admin = TEST_UTIL.getAdmin(); 141 for (byte[] splitPoint : SPLIT_KEYS) { 142 int oldRegionCount = admin.getRegions(TABLE_NAME).size(); 143 admin.split(TABLE_NAME, splitPoint); 144 TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() { 145 @Override 146 public boolean evaluate() throws Exception { 147 return TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() > oldRegionCount; 148 } 149 150 @Override 151 public String explainFailure() throws Exception { 152 return "Split has not finished yet"; 153 } 154 }); 155 156 for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) { 157 region.compact(true); 158 159 //Waiting for compaction to complete and references are cleaned up 160 RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS); 161 while (CompactionState.NONE != admin 162 .getCompactionStateForRegion(region.getRegionInfo().getRegionName()) 163 && retrier.shouldRetry()) { 164 retrier.sleepUntilNextRetry(); 165 } 166 region.getStores().get(0).closeAndArchiveCompactedFiles(); 167 } 168 Thread.sleep(5000); 169 admin.balance(true); 170 Thread.sleep(5000); 171 ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); 172 ServerName newMetaServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 173 .map(t -> t.getRegionServer().getServerName()).filter(s -> !s.equals(metaServer)) 174 .findAny().get(); 175 admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), 176 Bytes.toBytes(newMetaServer.getServerName())); 177 Thread.sleep(5000); 178 } 179 stop.set(true); 180 executor.shutdown(); 181 for (Future<?> future : futures) { 182 future.get(); 183 } 184 } 185}