001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.get; 021import static org.junit.Assert.assertEquals; 022 023import java.lang.reflect.InvocationHandler; 024import java.lang.reflect.Method; 025import java.lang.reflect.Proxy; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.hbase.ClientMetaTableAccessor; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.HRegionLocation; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 035import org.apache.hadoop.hbase.testclassification.ClientTests; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043 044import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 045 046/** 047 * Asserts the single-RPC promise of the paginated meta-scan path 048 * ({@link ClientMetaTableAccessor#getTableHRegionLocations(AsyncTable, TableName, byte[], int)}) by 049 * wrapping the meta {@link AsyncTable} so we can count {@link AdvancedScanResultConsumer#onNext} 050 * invocations - one per ScannerNext server response. 051 * <p/> 052 * Cluster runs with {@code hbase.meta.scanner.caching = 2} so the {@code limit > caching} branch is 053 * exercised cheaply with a small table (5 user regions). 054 */ 055@Category({ MediumTests.class, ClientTests.class }) 056public class TestRegionLocatorPagedScanRpcCount { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestRegionLocatorPagedScanRpcCount.class); 061 062 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 063 private static final TableName TABLE_NAME = TableName.valueOf("LocatorPaged"); 064 private static final byte[] FAMILY = Bytes.toBytes("family"); 065 066 /** Caching is small enough that {@code limit > META_CACHING} is easy to set up. */ 067 private static final int META_CACHING = 2; 068 private static final int NUM_REGIONS = 5; 069 070 private static AsyncConnection CONN; 071 072 @BeforeClass 073 public static void setUp() throws Exception { 074 UTIL.getConfiguration().setInt(HConstants.HBASE_META_SCANNER_CACHING, META_CACHING); 075 UTIL.startMiniCluster(1); 076 byte[][] splitKeys = new byte[NUM_REGIONS - 1][]; 077 for (int i = 0; i < splitKeys.length; i++) { 078 splitKeys[i] = Bytes.toBytes(Integer.toString(i + 1)); 079 } 080 TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME) 081 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 082 UTIL.getAdmin().createTable(td, splitKeys); 083 UTIL.waitTableAvailable(TABLE_NAME); 084 UTIL.getAdmin().balancerSwitch(false, true); 085 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 086 } 087 088 @AfterClass 089 public static void tearDown() throws Exception { 090 Closeables.close(CONN, true); 091 UTIL.shutdownMiniCluster(); 092 } 093 094 @Test 095 public void testSingleRpcWhenLimitWithinCaching() throws Exception { 096 // limit (2) <= caching (2): trivially one ScannerNext. Baseline. 097 int rpcs = runPagedScanAndCountRpcs(2); 098 assertEquals("expected exactly one ScannerNext RPC for limit <= caching", 1, rpcs); 099 } 100 101 @Test 102 public void testSingleRpcWhenLimitExceedsCaching() throws Exception { 103 // limit (5) > caching (2): without the isPagedScan fix this would be ceil(5/2) = 3 104 // ScannerNext RPCs. With the fix, getMetaScan sizes caching to limit -> 1 RPC. 105 int rpcs = runPagedScanAndCountRpcs(NUM_REGIONS); 106 assertEquals("expected exactly one ScannerNext RPC for paged scan even when limit > caching", 1, 107 rpcs); 108 } 109 110 @Test 111 public void testUnboundedPathStillUsesConfiguredCaching() throws Exception { 112 // The unbounded getTableHRegionLocations(metaTable, tableName) overload (no limit) must 113 // continue to use the configured caching (META_CACHING=2). For NUM_REGIONS=5 user regions, 114 // expect ceil(5 / 2) = 3 ScannerNext batches plus possibly a final empty batch when the 115 // server-side scan reaches the stopRow. Assert it is strictly more than one to prove the 116 // isPagedScan flag did not bleed into the unbounded path. 117 AtomicInteger onNextCalls = new AtomicInteger(); 118 AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls); 119 List<HRegionLocation> all = 120 get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME)); 121 assertEquals(NUM_REGIONS, all.size()); 122 int rpcs = onNextCalls.get(); 123 assertEquals( 124 "unbounded scan should still split across " 125 + "ceil(NUM_REGIONS / caching) ScannerNext batches; got " + rpcs, 126 (int) Math.ceil((double) NUM_REGIONS / META_CACHING), rpcs); 127 } 128 129 private int runPagedScanAndCountRpcs(int limit) throws Exception { 130 AtomicInteger onNextCalls = new AtomicInteger(); 131 AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls); 132 List<HRegionLocation> page = 133 get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME, null, limit)); 134 assertEquals("paged call returned wrong number of regions", limit, page.size()); 135 return onNextCalls.get(); 136 } 137 138 /** 139 * Returns a delegating proxy for the meta {@link AsyncTable} that intercepts 140 * {@code scan(Scan, AdvancedScanResultConsumer)} and wraps the supplied consumer so every 141 * {@link AdvancedScanResultConsumer#onNext} invocation increments {@code onNextCalls}. 142 */ 143 @SuppressWarnings("unchecked") 144 private static AsyncTable<AdvancedScanResultConsumer> wrapMetaTable(AtomicInteger onNextCalls) { 145 AsyncTable<AdvancedScanResultConsumer> delegate = CONN.getTable(TableName.META_TABLE_NAME); 146 InvocationHandler handler = new InvocationHandler() { 147 @Override 148 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 149 if ( 150 "scan".equals(method.getName()) && args != null && args.length == 2 151 && args[1] instanceof AdvancedScanResultConsumer 152 ) { 153 Scan scan = (Scan) args[0]; 154 AdvancedScanResultConsumer original = (AdvancedScanResultConsumer) args[1]; 155 AdvancedScanResultConsumer counting = new AdvancedScanResultConsumer() { 156 @Override 157 public void onNext(Result[] results, ScanController controller) { 158 onNextCalls.incrementAndGet(); 159 original.onNext(results, controller); 160 } 161 162 @Override 163 public void onError(Throwable error) { 164 original.onError(error); 165 } 166 167 @Override 168 public void onComplete() { 169 original.onComplete(); 170 } 171 172 @Override 173 public void onHeartbeat(ScanController controller) { 174 original.onHeartbeat(controller); 175 } 176 177 @Override 178 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 179 original.onScanMetricsCreated(scanMetrics); 180 } 181 }; 182 return method.invoke(delegate, scan, counting); 183 } 184 return method.invoke(delegate, args); 185 } 186 }; 187 return (AsyncTable<AdvancedScanResultConsumer>) Proxy.newProxyInstance( 188 AsyncTable.class.getClassLoader(), new Class<?>[] { AsyncTable.class }, handler); 189 } 190}