001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.atomic.AtomicBoolean;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HBaseTestingUtil;
025import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
026import org.apache.hadoop.hbase.StartTestingClusterOption;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
029import org.apache.hadoop.hbase.client.Connection;
030import org.apache.hadoop.hbase.client.ConnectionFactory;
031import org.apache.hadoop.hbase.client.ResultScanner;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
035import org.apache.hadoop.hbase.quotas.OperationQuota;
036import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
037import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
038import org.apache.hadoop.hbase.quotas.TestNoopOperationQuota;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.junit.jupiter.api.AfterAll;
043import org.junit.jupiter.api.BeforeAll;
044import org.junit.jupiter.api.BeforeEach;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
049
050@Tag(MediumTests.TAG)
051@Tag(ClientTests.TAG)
052public class TestScannerLeaseCount {
053
054  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
055  private static final TableName TABLE_NAME = TableName.valueOf("ScannerLeaseCount");
056  private static final byte[] FAM = Bytes.toBytes("Fam");
057  private static final String SCAN_IDENTIFIER_NAME = "_scan_id_";
058  private static final byte[] SCAN_IDENTIFIER = Bytes.toBytes("_scan_id_");
059  private static final Scan SCAN = new Scan().setAttribute(SCAN_IDENTIFIER_NAME, SCAN_IDENTIFIER);
060
061  private static volatile boolean SHOULD_THROW = false;
062  private static final AtomicBoolean EXCEPTION_THROWN = new AtomicBoolean(false);
063  private static final AtomicBoolean SCAN_SEEN = new AtomicBoolean(false);
064
065  private static Connection CONN;
066  private static Table TABLE;
067
068  @BeforeAll
069  public static void setUp() throws Exception {
070    StartTestingClusterOption option =
071      StartTestingClusterOption.builder().rsClass(MockedQuotaManagerRegionServer.class).build();
072    UTIL.startMiniCluster(option);
073    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
074      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAM)).build());
075    Configuration conf = new Configuration(UTIL.getConfiguration());
076    CONN = ConnectionFactory.createConnection(conf);
077    TABLE = CONN.getTable(TABLE_NAME);
078    UTIL.loadTable(TABLE, FAM);
079  }
080
081  @AfterAll
082  public static void tearDown() throws Exception {
083    try {
084      TABLE.close();
085    } catch (Exception ignore) {
086    }
087    try {
088      CONN.close();
089    } catch (Exception ignore) {
090    }
091    UTIL.shutdownMiniCluster();
092  }
093
094  @BeforeEach
095  public void before() {
096    SHOULD_THROW = false;
097    SCAN_SEEN.set(false);
098    EXCEPTION_THROWN.set(false);
099  }
100
101  @Test
102  public void itIncreasesScannerCount() throws Exception {
103    try (ResultScanner ignore = TABLE.getScanner(SCAN)) {
104      // We need to wait until the scan and lease are created server-side.
105      // Otherwise, our scanner counting will not reflect the new scan that was created
106      UTIL.waitFor(1000, () -> SCAN_SEEN.get() && !EXCEPTION_THROWN.get());
107    }
108  }
109
110  @Test
111  public void itDoesNotIncreaseScannerLeaseCount() throws Exception {
112    SHOULD_THROW = true;
113    try (ResultScanner ignore = TABLE.getScanner(SCAN)) {
114      // We need to wait until the scan and lease are created server-side.
115      // Otherwise, our scanner counting will not reflect the new scan that was created
116      UTIL.waitFor(1000, () -> !SCAN_SEEN.get() && EXCEPTION_THROWN.get());
117    }
118  }
119
120  public static final class MockedQuotaManagerRegionServer
121    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
122    private final MockedRpcQuotaManager rpcQuotaManager;
123
124    public MockedQuotaManagerRegionServer(Configuration conf)
125      throws IOException, InterruptedException {
126      super(conf);
127      this.rpcQuotaManager = new MockedRpcQuotaManager(this);
128    }
129
130    @Override
131    public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
132      return rpcQuotaManager;
133    }
134
135    @Override
136    protected RSRpcServices createRpcServices() throws IOException {
137      return new ScannerTrackingRSRpcServicesForTest(this);
138    }
139  }
140
141  private static class MockedRpcQuotaManager extends RegionServerRpcQuotaManager {
142    private static final RpcThrottlingException EX = new RpcThrottlingException("test_ex");
143
144    public MockedRpcQuotaManager(RegionServerServices rsServices) {
145      super(rsServices);
146    }
147
148    @Override
149    public OperationQuota checkScanQuota(Region region, ClientProtos.ScanRequest scanRequest,
150      long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
151      throws IOException, RpcThrottlingException {
152      if (SHOULD_THROW) {
153        if (isTestScan(scanRequest)) {
154          EXCEPTION_THROWN.set(true);
155        }
156        throw EX;
157      }
158      return TestNoopOperationQuota.INSTANCE;
159    }
160
161    @Override
162    public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
163      throws IOException, RpcThrottlingException {
164      if (SHOULD_THROW) {
165        throw EX;
166      }
167      return TestNoopOperationQuota.INSTANCE;
168    }
169
170    @Override
171    public OperationQuota checkBatchQuota(Region region, List<ClientProtos.Action> actions,
172      boolean hasCondition) throws IOException, RpcThrottlingException {
173      if (SHOULD_THROW) {
174        throw EX;
175      }
176      return TestNoopOperationQuota.INSTANCE;
177    }
178
179    @Override
180    public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads,
181      boolean isAtomic) throws IOException, RpcThrottlingException {
182      if (SHOULD_THROW) {
183        throw EX;
184      }
185      return TestNoopOperationQuota.INSTANCE;
186    }
187  }
188
189  private static class ScannerTrackingRSRpcServicesForTest extends RSRpcServices {
190    public ScannerTrackingRSRpcServicesForTest(HRegionServer rs) throws IOException {
191      super(rs);
192    }
193
194    @Override
195    RegionScannerContext checkQuotaAndGetRegionScannerContext(ClientProtos.ScanRequest request,
196      ClientProtos.ScanResponse.Builder builder) throws IOException {
197      RegionScannerContext rsx = super.checkQuotaAndGetRegionScannerContext(request, builder);
198      if (isTestScan(request)) {
199        SCAN_SEEN.set(true);
200      }
201      return rsx;
202    }
203  }
204
205  private static boolean isTestScan(ClientProtos.ScanRequest request) {
206    ClientProtos.Scan scan = request.getScan();
207    return scan.getAttributeList().stream()
208      .anyMatch(nbp -> nbp.getName().equals(SCAN_IDENTIFIER_NAME)
209        && Bytes.equals(nbp.getValue().toByteArray(), SCAN_IDENTIFIER));
210  }
211}