001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.get; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022 023import java.lang.reflect.InvocationHandler; 024import java.lang.reflect.Method; 025import java.lang.reflect.Proxy; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.hbase.ClientMetaTableAccessor; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.testclassification.ClientTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.junit.jupiter.api.AfterAll; 038import org.junit.jupiter.api.BeforeAll; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041 042import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 043 044/** 045 * Asserts the single-RPC promise of the paginated meta-scan path 046 * ({@link ClientMetaTableAccessor#getTableHRegionLocations(AsyncTable, TableName, byte[], int)}) by 047 * wrapping the meta {@link AsyncTable} so we can count {@link AdvancedScanResultConsumer#onNext} 048 * invocations - one per ScannerNext server response. 049 * <p/> 050 * Cluster runs with {@code hbase.meta.scanner.caching = 2} so the {@code limit > caching} branch is 051 * exercised cheaply with a small table (5 user regions). 052 */ 053@Tag(MediumTests.TAG) 054@Tag(ClientTests.TAG) 055public class TestRegionLocatorPagedScanRpcCount { 056 057 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 058 private static final TableName TABLE_NAME = TableName.valueOf("LocatorPaged"); 059 private static final byte[] FAMILY = Bytes.toBytes("family"); 060 061 /** Caching is small enough that {@code limit > META_CACHING} is easy to set up. */ 062 private static final int META_CACHING = 2; 063 private static final int NUM_REGIONS = 5; 064 065 private static AsyncConnection CONN; 066 067 @BeforeAll 068 public static void setUp() throws Exception { 069 UTIL.getConfiguration().setInt(HConstants.HBASE_META_SCANNER_CACHING, META_CACHING); 070 UTIL.startMiniCluster(1); 071 byte[][] splitKeys = new byte[NUM_REGIONS - 1][]; 072 for (int i = 0; i < splitKeys.length; i++) { 073 splitKeys[i] = Bytes.toBytes(Integer.toString(i + 1)); 074 } 075 TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME) 076 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 077 UTIL.getAdmin().createTable(td, splitKeys); 078 UTIL.waitTableAvailable(TABLE_NAME); 079 UTIL.getAdmin().balancerSwitch(false, true); 080 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 081 } 082 083 @AfterAll 084 public static void tearDown() throws Exception { 085 Closeables.close(CONN, true); 086 UTIL.shutdownMiniCluster(); 087 } 088 089 @Test 090 public void testSingleRpcWhenLimitWithinCaching() throws Exception { 091 // limit (2) <= caching (2): trivially one ScannerNext. Baseline. 092 int rpcs = runPagedScanAndCountRpcs(2); 093 assertEquals(1, rpcs, "expected exactly one ScannerNext RPC for limit <= caching"); 094 } 095 096 @Test 097 public void testSingleRpcWhenLimitExceedsCaching() throws Exception { 098 // limit (5) > caching (2): without the isPagedScan fix this would be ceil(5/2) = 3 099 // ScannerNext RPCs. With the fix, getMetaScan sizes caching to limit -> 1 RPC. 100 int rpcs = runPagedScanAndCountRpcs(NUM_REGIONS); 101 assertEquals(1, rpcs, 102 "expected exactly one ScannerNext RPC for paged scan even when limit > caching"); 103 } 104 105 @Test 106 public void testUnboundedPathStillUsesConfiguredCaching() throws Exception { 107 // The unbounded getTableHRegionLocations(metaTable, tableName) overload (no limit) must 108 // continue to use the configured caching (META_CACHING=2). For NUM_REGIONS=5 user regions, 109 // expect ceil(5 / 2) = 3 ScannerNext batches plus possibly a final empty batch when the 110 // server-side scan reaches the stopRow. Assert it is strictly more than one to prove the 111 // isPagedScan flag did not bleed into the unbounded path. 112 AtomicInteger onNextCalls = new AtomicInteger(); 113 AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls); 114 List<HRegionLocation> all = 115 get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME)); 116 assertEquals(NUM_REGIONS, all.size()); 117 int rpcs = onNextCalls.get(); 118 assertEquals((int) Math.ceil((double) NUM_REGIONS / META_CACHING), rpcs, 119 "unbounded scan should still split across " 120 + "ceil(NUM_REGIONS / caching) ScannerNext batches; got " + rpcs); 121 } 122 123 private int runPagedScanAndCountRpcs(int limit) throws Exception { 124 AtomicInteger onNextCalls = new AtomicInteger(); 125 AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls); 126 List<HRegionLocation> page = 127 get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME, null, limit)); 128 assertEquals(limit, page.size(), "paged call returned wrong number of regions"); 129 return onNextCalls.get(); 130 } 131 132 /** 133 * Returns a delegating proxy for the meta {@link AsyncTable} that intercepts 134 * {@code scan(Scan, AdvancedScanResultConsumer)} and wraps the supplied consumer so every 135 * {@link AdvancedScanResultConsumer#onNext} invocation increments {@code onNextCalls}. 136 */ 137 @SuppressWarnings("unchecked") 138 private static AsyncTable<AdvancedScanResultConsumer> wrapMetaTable(AtomicInteger onNextCalls) { 139 AsyncTable<AdvancedScanResultConsumer> delegate = CONN.getTable(TableName.META_TABLE_NAME); 140 InvocationHandler handler = new InvocationHandler() { 141 @Override 142 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 143 if ( 144 "scan".equals(method.getName()) && args != null && args.length == 2 145 && args[1] instanceof AdvancedScanResultConsumer 146 ) { 147 Scan scan = (Scan) args[0]; 148 AdvancedScanResultConsumer original = (AdvancedScanResultConsumer) args[1]; 149 AdvancedScanResultConsumer counting = new AdvancedScanResultConsumer() { 150 @Override 151 public void onNext(Result[] results, ScanController controller) { 152 onNextCalls.incrementAndGet(); 153 original.onNext(results, controller); 154 } 155 156 @Override 157 public void onError(Throwable error) { 158 original.onError(error); 159 } 160 161 @Override 162 public void onComplete() { 163 original.onComplete(); 164 } 165 166 @Override 167 public void onHeartbeat(ScanController controller) { 168 original.onHeartbeat(controller); 169 } 170 171 @Override 172 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 173 original.onScanMetricsCreated(scanMetrics); 174 } 175 }; 176 return method.invoke(delegate, scan, counting); 177 } 178 return method.invoke(delegate, args); 179 } 180 }; 181 return (AsyncTable<AdvancedScanResultConsumer>) Proxy.newProxyInstance( 182 AsyncTable.class.getClassLoader(), new Class<?>[] { AsyncTable.class }, handler); 183 } 184}