001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static java.util.stream.Collectors.toList; 021import static org.apache.hadoop.hbase.client.AsyncNonMetaRegionLocator.MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; 024import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027 028import java.io.IOException; 029import java.util.List; 030import java.util.Optional; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.stream.IntStream; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 043import org.apache.hadoop.hbase.coprocessor.RegionObserver; 044import org.apache.hadoop.hbase.regionserver.InternalScanner; 045import org.apache.hadoop.hbase.security.User; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.Threads; 050import org.junit.jupiter.api.AfterAll; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054 055import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 056 057@Tag(MediumTests.TAG) 058@Tag(ClientTests.TAG) 059public class TestAsyncNonMetaRegionLocatorConcurrenyLimit { 060 061 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 062 063 private static TableName TABLE_NAME = TableName.valueOf("async"); 064 065 private static byte[] FAMILY = Bytes.toBytes("cf"); 066 067 private static AsyncConnectionImpl CONN; 068 069 private static AsyncNonMetaRegionLocator LOCATOR; 070 071 private static byte[][] SPLIT_KEYS; 072 073 private static int MAX_ALLOWED = 2; 074 075 private static AtomicInteger CONCURRENCY = new AtomicInteger(0); 076 077 private static AtomicInteger MAX_CONCURRENCY = new AtomicInteger(0); 078 079 public static final class CountingRegionObserver implements RegionCoprocessor, RegionObserver { 080 081 @Override 082 public Optional<RegionObserver> getRegionObserver() { 083 return Optional.of(this); 084 } 085 086 @Override 087 public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c, 088 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 089 if (c.getEnvironment().getRegionInfo().isMetaRegion()) { 090 int concurrency = CONCURRENCY.incrementAndGet(); 091 for (;;) { 092 int max = MAX_CONCURRENCY.get(); 093 if (concurrency <= max) { 094 break; 095 } 096 if (MAX_CONCURRENCY.compareAndSet(max, concurrency)) { 097 break; 098 } 099 } 100 Threads.sleepWithoutInterrupt(10); 101 } 102 return hasNext; 103 } 104 105 @Override 106 public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c, 107 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 108 if (c.getEnvironment().getRegionInfo().isMetaRegion()) { 109 CONCURRENCY.decrementAndGet(); 110 } 111 return hasNext; 112 } 113 } 114 115 @BeforeAll 116 public static void setUp() throws Exception { 117 Configuration conf = TEST_UTIL.getConfiguration(); 118 conf.set(REGION_COPROCESSOR_CONF_KEY, CountingRegionObserver.class.getName()); 119 conf.setInt(MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, MAX_ALLOWED); 120 TEST_UTIL.startMiniCluster(3); 121 TEST_UTIL.getAdmin().balancerSwitch(false, true); 122 ConnectionRegistry registry = 123 ConnectionRegistryFactory.create(TEST_UTIL.getConfiguration(), User.getCurrent()); 124 CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, 125 registry.getClusterId().get(), null, User.getCurrent()); 126 LOCATOR = new AsyncNonMetaRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER); 127 SPLIT_KEYS = IntStream.range(1, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) 128 .toArray(byte[][]::new); 129 TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLIT_KEYS); 130 TEST_UTIL.waitTableAvailable(TABLE_NAME); 131 } 132 133 @AfterAll 134 public static void tearDown() throws Exception { 135 Closeables.close(CONN, true); 136 TEST_UTIL.shutdownMiniCluster(); 137 } 138 139 private void assertLocs(List<CompletableFuture<RegionLocations>> futures) 140 throws InterruptedException, ExecutionException { 141 assertEquals(256, futures.size()); 142 for (int i = 0; i < futures.size(); i++) { 143 HRegionLocation loc = futures.get(i).get().getDefaultRegionLocation(); 144 if (i == 0) { 145 assertTrue(isEmptyStartRow(loc.getRegion().getStartKey())); 146 } else { 147 assertEquals(String.format("%02x", i), Bytes.toString(loc.getRegion().getStartKey())); 148 } 149 if (i == futures.size() - 1) { 150 assertTrue(isEmptyStopRow(loc.getRegion().getEndKey())); 151 } else { 152 assertEquals(String.format("%02x", i + 1), Bytes.toString(loc.getRegion().getEndKey())); 153 } 154 } 155 } 156 157 @Test 158 public void test() throws InterruptedException, ExecutionException { 159 List< 160 CompletableFuture< 161 RegionLocations>> futures = 162 IntStream.range(0, 256).mapToObj(i -> Bytes.toBytes(String.format("%02x", i))) 163 .map(r -> LOCATOR.getRegionLocations(TABLE_NAME, r, 164 RegionReplicaUtil.DEFAULT_REPLICA_ID, RegionLocateType.CURRENT, false)) 165 .collect(toList()); 166 assertLocs(futures); 167 assertTrue(MAX_CONCURRENCY.get() <= MAX_ALLOWED, 168 "max allowed is " + MAX_ALLOWED + " but actual is " + MAX_CONCURRENCY.get()); 169 } 170}