001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 024import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.stream.IntStream; 035import org.apache.commons.io.IOUtils; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.RegionLocations; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.coprocessor.ObserverContext; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 045import org.apache.hadoop.hbase.coprocessor.RegionObserver; 046import org.apache.hadoop.hbase.regionserver.InternalScanner; 047import org.apache.hadoop.hbase.security.User; 048import org.apache.hadoop.hbase.testclassification.ClientTests; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.Threads; 052import org.junit.AfterClass; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057 058@Category({ MediumTests.class, ClientTests.class }) 059public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class); 064 065 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 066 067 private static TableName TABLE_NAME = TableName.valueOf("async"); 068 069 private static byte[] FAMILY = Bytes.toBytes("cf"); 070 071 private static AsyncConnectionImpl CONN; 072 073 private static AsyncNonMetaRegionLocator LOCATOR; 074 075 private static byte[][] SPLIT_KEYS; 076 077 private static int MAX_ALLOWED = 2; 078 079 private static AtomicInteger CONCURRENCY = new AtomicInteger(0); 080 081 private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0); 082 083 public static final class CountingRegionObserver implements RegionCoprocessor, RegionObserver { 084 085 @Override 086 public Optional<RegionObserver> getRegionObserver() { 087 return Optional.of(this); 088 } 089 090 @Override 091 public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, 092 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 093 if (c.getEnvironment().getRegionInfo().isMetaRegion()) { 094 int concurrency = CONCURRENCY.incrementAndGet(); 095 for (;;) { 096 int max = MAX_CONCURRENCY.get(); 097 if (concurrency <= max) { 098 break; 099 } 100 if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) { 101 break; 102 } 103 } 104 Threads.sleepWithoutInterrupt(10); 105 } 106 return hasNext; 107 } 108 109 @Override 110 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, 111 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 112 if (c.getEnvironment().getRegionInfo().isMetaRegion()) { 113 CONCURRENCY.decrementAndGet(); 114 } 115 return hasNext; 116 } 117 } 118 119 @BeforeClass 120 public static void setUp() throws Exception { 121 Configuration conf = TEST_UTIL.getConfiguration(); 122 conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName()); 123 conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED); 124 TEST_UTIL.startMiniCluster(3); 125 TEST_UTIL.getAdmin().balancerSwitch(false, true); 126 ConnectionRegistry registry = 127 ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); 128 CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, 129 registry.getClusterId().get(), User.getCurrent()); 130 LOCATOR = new AsyncNonMetaRegionLocator(CONN); 131 SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) 132 .toArray(byte[][]::new); 133 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 134 TEST_UTIL.waitTableAvailable(TABLE_NAME); 135 } 136 137 @AfterClass 138 public static void tearDown() throws Exception { 139 IOUtils.closeQuietly(CONN); 140 TEST_UTIL.shutdownMiniCluster(); 141 } 142 143 private void assertLocs(List<CompletableFuture<RegionLocations>> futures) 144 throws InterruptedException, ExecutionException { 145 assertEquals(256, futures.size()); 146 for (int i = 0; i < futures.size(); i++) { 147 HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation(); 148 if (i == 0) { 149 assertTrue(isEmptyStartRow(loc.getRegion().getStartKey())); 150 } else { 151 assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegion().getStartKey())); 152 } 153 if (i == futures.size() - 1) { 154 assertTrue(isEmptyStopRow(loc.getRegion().getEndKey())); 155 } else { 156 assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegion().getEndKey())); 157 } 158 } 159 } 160 161 @Test 162 public void test() throws InterruptedException, ExecutionException { 163 List<CompletableFuture<RegionLocations>> futures = 164 IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) 165 .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, RegionReplicaUtil.DEFAULT_REPLICA_ID, 166 RegionLocateType.CURRENT, false)) 167 .collect(toList()); 168 assertLocs(futures); 169 assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(), 170 MAX_CONCURRENCY.get() <= MAX_ALLOWED); 171 } 172}