001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.get;
021import static org.junit.Assert.assertEquals;
022
023import java.lang.reflect.InvocationHandler;
024import java.lang.reflect.Method;
025import java.lang.reflect.Proxy;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.hbase.ClientMetaTableAccessor;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
035import org.apache.hadoop.hbase.testclassification.ClientTests;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043
044import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
045
046/**
047 * Asserts the single-RPC promise of the paginated meta-scan path
048 * ({@link ClientMetaTableAccessor#getTableHRegionLocations(AsyncTable, TableName, byte[], int)}) by
049 * wrapping the meta {@link AsyncTable} so we can count {@link AdvancedScanResultConsumer#onNext}
050 * invocations - one per ScannerNext server response.
051 * <p/>
052 * Cluster runs with {@code hbase.meta.scanner.caching = 2} so the {@code limit > caching} branch is
053 * exercised cheaply with a small table (5 user regions).
054 */
055@Category({ MediumTests.class, ClientTests.class })
056public class TestRegionLocatorPagedScanRpcCount {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestRegionLocatorPagedScanRpcCount.class);
061
062  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
063  private static final TableName TABLE_NAME = TableName.valueOf("LocatorPaged");
064  private static final byte[] FAMILY = Bytes.toBytes("family");
065
066  /** Caching is small enough that {@code limit > META_CACHING} is easy to set up. */
067  private static final int META_CACHING = 2;
068  private static final int NUM_REGIONS = 5;
069
070  private static AsyncConnection CONN;
071
072  @BeforeClass
073  public static void setUp() throws Exception {
074    UTIL.getConfiguration().setInt(HConstants.HBASE_META_SCANNER_CACHING, META_CACHING);
075    UTIL.startMiniCluster(1);
076    byte[][] splitKeys = new byte[NUM_REGIONS - 1][];
077    for (int i = 0; i < splitKeys.length; i++) {
078      splitKeys[i] = Bytes.toBytes(Integer.toString(i + 1));
079    }
080    TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
081      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
082    UTIL.getAdmin().createTable(td, splitKeys);
083    UTIL.waitTableAvailable(TABLE_NAME);
084    UTIL.getAdmin().balancerSwitch(false, true);
085    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
086  }
087
088  @AfterClass
089  public static void tearDown() throws Exception {
090    Closeables.close(CONN, true);
091    UTIL.shutdownMiniCluster();
092  }
093
094  @Test
095  public void testSingleRpcWhenLimitWithinCaching() throws Exception {
096    // limit (2) <= caching (2): trivially one ScannerNext. Baseline.
097    int rpcs = runPagedScanAndCountRpcs(2);
098    assertEquals("expected exactly one ScannerNext RPC for limit <= caching", 1, rpcs);
099  }
100
101  @Test
102  public void testSingleRpcWhenLimitExceedsCaching() throws Exception {
103    // limit (5) > caching (2): without the isPagedScan fix this would be ceil(5/2) = 3
104    // ScannerNext RPCs. With the fix, getMetaScan sizes caching to limit -> 1 RPC.
105    int rpcs = runPagedScanAndCountRpcs(NUM_REGIONS);
106    assertEquals("expected exactly one ScannerNext RPC for paged scan even when limit > caching", 1,
107      rpcs);
108  }
109
110  @Test
111  public void testUnboundedPathStillUsesConfiguredCaching() throws Exception {
112    // The unbounded getTableHRegionLocations(metaTable, tableName) overload (no limit) must
113    // continue to use the configured caching (META_CACHING=2). For NUM_REGIONS=5 user regions,
114    // expect ceil(5 / 2) = 3 ScannerNext batches plus possibly a final empty batch when the
115    // server-side scan reaches the stopRow. Assert it is strictly more than one to prove the
116    // isPagedScan flag did not bleed into the unbounded path.
117    AtomicInteger onNextCalls = new AtomicInteger();
118    AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls);
119    List<HRegionLocation> all =
120      get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME));
121    assertEquals(NUM_REGIONS, all.size());
122    int rpcs = onNextCalls.get();
123    assertEquals(
124      "unbounded scan should still split across "
125        + "ceil(NUM_REGIONS / caching) ScannerNext batches; got " + rpcs,
126      (int) Math.ceil((double) NUM_REGIONS / META_CACHING), rpcs);
127  }
128
129  private int runPagedScanAndCountRpcs(int limit) throws Exception {
130    AtomicInteger onNextCalls = new AtomicInteger();
131    AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls);
132    List<HRegionLocation> page =
133      get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME, null, limit));
134    assertEquals("paged call returned wrong number of regions", limit, page.size());
135    return onNextCalls.get();
136  }
137
138  /**
139   * Returns a delegating proxy for the meta {@link AsyncTable} that intercepts
140   * {@code scan(Scan, AdvancedScanResultConsumer)} and wraps the supplied consumer so every
141   * {@link AdvancedScanResultConsumer#onNext} invocation increments {@code onNextCalls}.
142   */
143  @SuppressWarnings("unchecked")
144  private static AsyncTable<AdvancedScanResultConsumer> wrapMetaTable(AtomicInteger onNextCalls) {
145    AsyncTable<AdvancedScanResultConsumer> delegate = CONN.getTable(TableName.META_TABLE_NAME);
146    InvocationHandler handler = new InvocationHandler() {
147      @Override
148      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
149        if (
150          "scan".equals(method.getName()) && args != null && args.length == 2
151            && args[1] instanceof AdvancedScanResultConsumer
152        ) {
153          Scan scan = (Scan) args[0];
154          AdvancedScanResultConsumer original = (AdvancedScanResultConsumer) args[1];
155          AdvancedScanResultConsumer counting = new AdvancedScanResultConsumer() {
156            @Override
157            public void onNext(Result[] results, ScanController controller) {
158              onNextCalls.incrementAndGet();
159              original.onNext(results, controller);
160            }
161
162            @Override
163            public void onError(Throwable error) {
164              original.onError(error);
165            }
166
167            @Override
168            public void onComplete() {
169              original.onComplete();
170            }
171
172            @Override
173            public void onHeartbeat(ScanController controller) {
174              original.onHeartbeat(controller);
175            }
176
177            @Override
178            public void onScanMetricsCreated(ScanMetrics scanMetrics) {
179              original.onScanMetricsCreated(scanMetrics);
180            }
181          };
182          return method.invoke(delegate, scan, counting);
183        }
184        return method.invoke(delegate, args);
185      }
186    };
187    return (AsyncTable<AdvancedScanResultConsumer>) Proxy.newProxyInstance(
188      AsyncTable.class.getClassLoader(), new Class<?>[] { AsyncTable.class }, handler);
189  }
190}