001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 024import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertTrue; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.stream.IntStream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HRegionLocation; 039import org.apache.hadoop.hbase.RegionLocations; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.regionserver.InternalScanner; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.testclassification.ClientTests; 048import org.apache.hadoop.hbase.testclassification.MediumTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.Threads; 051import org.junit.AfterClass; 052import org.junit.BeforeClass; 053import org.junit.ClassRule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056 057import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 058 059@Category({ MediumTests.class, ClientTests.class }) 060public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocatorConcurrenyLimit.class); 065 066 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 067 068 private static TableName TABLE_NAME = TableName.valueOf("async"); 069 070 private static byte[] FAMILY = Bytes.toBytes("cf"); 071 072 private static AsyncConnectionImpl CONN; 073 074 private static AsyncNonMetaRegionLocator LOCATOR; 075 076 private static byte[][] SPLIT_KEYS; 077 078 private static int MAX_ALLOWED = 2; 079 080 private static AtomicInteger CONCURRENCY = new AtomicInteger(0); 081 082 private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0); 083 084 public static final class CountingRegionObserver implements RegionCoprocessor, RegionObserver { 085 086 @Override 087 public Optional<RegionObserver> getRegionObserver() { 088 return Optional.of(this); 089 } 090 091 @Override 092 public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c, 093 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 094 if (c.getEnvironment().getRegionInfo().isMetaRegion()) { 095 int concurrency = CONCURRENCY.incrementAndGet(); 096 for (;;) { 097 int max = MAX_CONCURRENCY.get(); 098 if (concurrency <= max) { 099 break; 100 } 101 if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) { 102 break; 103 } 104 } 105 Threads.sleepWithoutInterrupt(10); 106 } 107 return hasNext; 108 } 109 110 @Override 111 public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c, 112 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 113 if (c.getEnvironment().getRegionInfo().isMetaRegion()) { 114 CONCURRENCY.decrementAndGet(); 115 } 116 return hasNext; 117 } 118 } 119 120 @BeforeClass 121 public static void setUp() throws Exception { 122 Configuration conf = TEST_UTIL.getConfiguration(); 123 conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName()); 124 conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED); 125 TEST_UTIL.startMiniCluster(3); 126 TEST_UTIL.getAdmin().balancerSwitch(false, true); 127 ConnectionRegistry registry = 128 ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); 129 CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, 130 registry.getClusterId().get(), null, User.getCurrent()); 131 LOCATOR = new AsyncNonMetaRegionLocator(CONN); 132 SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) 133 .toArray(byte[][]::new); 134 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 135 TEST_UTIL.waitTableAvailable(TABLE_NAME); 136 } 137 138 @AfterClass 139 public static void tearDown() throws Exception { 140 Closeables.close(CONN, true); 141 TEST_UTIL.shutdownMiniCluster(); 142 } 143 144 private void assertLocs(List<CompletableFuture<RegionLocations>> futures) 145 throws InterruptedException, ExecutionException { 146 assertEquals(256, futures.size()); 147 for (int i = 0; i < futures.size(); i++) { 148 HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation(); 149 if (i == 0) { 150 assertTrue(isEmptyStartRow(loc.getRegion().getStartKey())); 151 } else { 152 assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegion().getStartKey())); 153 } 154 if (i == futures.size() - 1) { 155 assertTrue(isEmptyStopRow(loc.getRegion().getEndKey())); 156 } else { 157 assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegion().getEndKey())); 158 } 159 } 160 } 161 162 @Test 163 public void test() throws InterruptedException, ExecutionException { 164 List< 165 CompletableFuture< 166 RegionLocations>> futures = 167 IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) 168 .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, 169 RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false)) 170 .collect(toList()); 171 assertLocs(futures); 172 assertTrue("max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get(), 173 MAX_CONCURRENCY.get() <= MAX_ALLOWED); 174 } 175}