001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.get;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022
023import java.lang.reflect.InvocationHandler;
024import java.lang.reflect.Method;
025import java.lang.reflect.Proxy;
026import java.util.List;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.hbase.ClientMetaTableAccessor;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.HRegionLocation;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
034import org.apache.hadoop.hbase.testclassification.ClientTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.junit.jupiter.api.AfterAll;
038import org.junit.jupiter.api.BeforeAll;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041
042import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
043
044/**
045 * Asserts the single-RPC promise of the paginated meta-scan path
046 * ({@link ClientMetaTableAccessor#getTableHRegionLocations(AsyncTable, TableName, byte[], int)}) by
047 * wrapping the meta {@link AsyncTable} so we can count {@link AdvancedScanResultConsumer#onNext}
048 * invocations - one per ScannerNext server response.
049 * <p/>
050 * Cluster runs with {@code hbase.meta.scanner.caching = 2} so the {@code limit > caching} branch is
051 * exercised cheaply with a small table (5 user regions).
052 */
053@Tag(MediumTests.TAG)
054@Tag(ClientTests.TAG)
055public class TestRegionLocatorPagedScanRpcCount {
056
057  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
058  private static final TableName TABLE_NAME = TableName.valueOf("LocatorPaged");
059  private static final byte[] FAMILY = Bytes.toBytes("family");
060
061  /** Caching is small enough that {@code limit > META_CACHING} is easy to set up. */
062  private static final int META_CACHING = 2;
063  private static final int NUM_REGIONS = 5;
064
065  private static AsyncConnection CONN;
066
067  @BeforeAll
068  public static void setUp() throws Exception {
069    UTIL.getConfiguration().setInt(HConstants.HBASE_META_SCANNER_CACHING, META_CACHING);
070    UTIL.startMiniCluster(1);
071    byte[][] splitKeys = new byte[NUM_REGIONS - 1][];
072    for (int i = 0; i < splitKeys.length; i++) {
073      splitKeys[i] = Bytes.toBytes(Integer.toString(i + 1));
074    }
075    TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
076      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
077    UTIL.getAdmin().createTable(td, splitKeys);
078    UTIL.waitTableAvailable(TABLE_NAME);
079    UTIL.getAdmin().balancerSwitch(false, true);
080    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
081  }
082
083  @AfterAll
084  public static void tearDown() throws Exception {
085    Closeables.close(CONN, true);
086    UTIL.shutdownMiniCluster();
087  }
088
089  @Test
090  public void testSingleRpcWhenLimitWithinCaching() throws Exception {
091    // limit (2) <= caching (2): trivially one ScannerNext. Baseline.
092    int rpcs = runPagedScanAndCountRpcs(2);
093    assertEquals(1, rpcs, "expected exactly one ScannerNext RPC for limit <= caching");
094  }
095
096  @Test
097  public void testSingleRpcWhenLimitExceedsCaching() throws Exception {
098    // limit (5) > caching (2): without the isPagedScan fix this would be ceil(5/2) = 3
099    // ScannerNext RPCs. With the fix, getMetaScan sizes caching to limit -> 1 RPC.
100    int rpcs = runPagedScanAndCountRpcs(NUM_REGIONS);
101    assertEquals(1, rpcs,
102      "expected exactly one ScannerNext RPC for paged scan even when limit > caching");
103  }
104
105  @Test
106  public void testUnboundedPathStillUsesConfiguredCaching() throws Exception {
107    // The unbounded getTableHRegionLocations(metaTable, tableName) overload (no limit) must
108    // continue to use the configured caching (META_CACHING=2). For NUM_REGIONS=5 user regions,
109    // expect ceil(5 / 2) = 3 ScannerNext batches plus possibly a final empty batch when the
110    // server-side scan reaches the stopRow. Assert it is strictly more than one to prove the
111    // isPagedScan flag did not bleed into the unbounded path.
112    AtomicInteger onNextCalls = new AtomicInteger();
113    AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls);
114    List<HRegionLocation> all =
115      get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME));
116    assertEquals(NUM_REGIONS, all.size());
117    int rpcs = onNextCalls.get();
118    assertEquals((int) Math.ceil((double) NUM_REGIONS / META_CACHING), rpcs,
119      "unbounded scan should still split across "
120        + "ceil(NUM_REGIONS / caching) ScannerNext batches; got " + rpcs);
121  }
122
123  private int runPagedScanAndCountRpcs(int limit) throws Exception {
124    AtomicInteger onNextCalls = new AtomicInteger();
125    AsyncTable<AdvancedScanResultConsumer> metaTable = wrapMetaTable(onNextCalls);
126    List<HRegionLocation> page =
127      get(ClientMetaTableAccessor.getTableHRegionLocations(metaTable, TABLE_NAME, null, limit));
128    assertEquals(limit, page.size(), "paged call returned wrong number of regions");
129    return onNextCalls.get();
130  }
131
132  /**
133   * Returns a delegating proxy for the meta {@link AsyncTable} that intercepts
134   * {@code scan(Scan, AdvancedScanResultConsumer)} and wraps the supplied consumer so every
135   * {@link AdvancedScanResultConsumer#onNext} invocation increments {@code onNextCalls}.
136   */
137  @SuppressWarnings("unchecked")
138  private static AsyncTable<AdvancedScanResultConsumer> wrapMetaTable(AtomicInteger onNextCalls) {
139    AsyncTable<AdvancedScanResultConsumer> delegate = CONN.getTable(TableName.META_TABLE_NAME);
140    InvocationHandler handler = new InvocationHandler() {
141      @Override
142      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
143        if (
144          "scan".equals(method.getName()) && args != null && args.length == 2
145            && args[1] instanceof AdvancedScanResultConsumer
146        ) {
147          Scan scan = (Scan) args[0];
148          AdvancedScanResultConsumer original = (AdvancedScanResultConsumer) args[1];
149          AdvancedScanResultConsumer counting = new AdvancedScanResultConsumer() {
150            @Override
151            public void onNext(Result[] results, ScanController controller) {
152              onNextCalls.incrementAndGet();
153              original.onNext(results, controller);
154            }
155
156            @Override
157            public void onError(Throwable error) {
158              original.onError(error);
159            }
160
161            @Override
162            public void onComplete() {
163              original.onComplete();
164            }
165
166            @Override
167            public void onHeartbeat(ScanController controller) {
168              original.onHeartbeat(controller);
169            }
170
171            @Override
172            public void onScanMetricsCreated(ScanMetrics scanMetrics) {
173              original.onScanMetricsCreated(scanMetrics);
174            }
175          };
176          return method.invoke(delegate, scan, counting);
177        }
178        return method.invoke(delegate, args);
179      }
180    };
181    return (AsyncTable<AdvancedScanResultConsumer>) Proxy.newProxyInstance(
182      AsyncTable.class.getClassLoader(), new Class<?>[] { AsyncTable.class }, handler);
183  }
184}