001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertNull;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.ResultScanner;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.Table;
037import org.apache.hadoop.hbase.io.ByteBuffAllocator;
038import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
039import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
040import org.apache.hadoop.hbase.ipc.RpcCall;
041import org.apache.hadoop.hbase.ipc.RpcServer;
042import org.apache.hadoop.hbase.testclassification.LargeTests;
043import org.apache.hadoop.hbase.testclassification.RegionServerTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.BeforeAll;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049
050import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
055
056@Tag(RegionServerTests.TAG)
057@Tag(LargeTests.TAG)
058public class TestRegionServerScan {
059
060  private static final byte[] CF = Bytes.toBytes("CF");
061  private static final byte[] CQ = Bytes.toBytes("CQ");
062  private static final byte[] VALUE = new byte[1200];
063
064  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
065  private static final Configuration conf = TEST_UTIL.getConfiguration();
066  private static Admin admin = null;
067  static final TableName tableName = TableName.valueOf("TestRegionServerScan");
068  static final byte[] r0 = Bytes.toBytes("row-0");
069  static final byte[] r1 = Bytes.toBytes("row-1");
070  static final byte[] r2 = Bytes.toBytes("row-2");
071
072  @BeforeAll
073  public static void setupBeforeClass() throws Exception {
074    /**
075     * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after
076     * released.
077     */
078    conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
079      DeallocateRewriteByteBuffAllocator.class.getName());
080    conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
081    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
082    conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
083    conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);
084    conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
085    conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
086    conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
087    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
088    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);
089
090    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
091    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
092    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
093    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024);
094    TEST_UTIL.startMiniCluster(1);
095    admin = TEST_UTIL.getAdmin();
096  }
097
098  @AfterAll
099  public static void tearDownAfterClass() throws Exception {
100    TEST_UTIL.shutdownMiniCluster();
101  }
102
103  @Test
104  public void testScannWhenRpcCallContextNull() throws Exception {
105    ResultScanner resultScanner = null;
106    Table table = null;
107    try {
108      table = TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);
109      putToTable(table, r0);
110      putToTable(table, r1);
111      putToTable(table, r2);
112
113      admin.flush(table.getName());
114
115      Scan scan = new Scan();
116      scan.setCaching(2);
117      scan.withStartRow(r0, true).withStopRow(r2, true);
118
119      MyRSRpcServices.inTest = true;
120      resultScanner = table.getScanner(scan);
121      Result result = resultScanner.next();
122      byte[] rowKey = result.getRow();
123      assertTrue(Bytes.equals(r0, rowKey));
124
125      result = resultScanner.next();
126      rowKey = result.getRow();
127      assertTrue(Bytes.equals(r1, rowKey));
128
129      result = resultScanner.next();
130      rowKey = result.getRow();
131      assertTrue(Bytes.equals(r2, rowKey));
132      assertNull(resultScanner.next());
133      assertTrue(MyRSRpcServices.exceptionRef.get() == null);
134    } finally {
135      MyRSRpcServices.inTest = false;
136      if (resultScanner != null) {
137        resultScanner.close();
138      }
139      if (table != null) {
140        table.close();
141      }
142    }
143  }
144
145  private static void putToTable(Table table, byte[] rowkey) throws IOException {
146    Put put = new Put(rowkey);
147    put.addColumn(CF, CQ, VALUE);
148    table.put(put);
149  }
150
151  private static class MyRegionServer extends MiniHBaseClusterRegionServer {
152    public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
153      super(conf);
154    }
155
156    @Override
157    protected RSRpcServices createRpcServices() throws IOException {
158      return new MyRSRpcServices(this);
159    }
160  }
161
162  private static class MyRSRpcServices extends RSRpcServices {
163    private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
164    private static volatile boolean inTest = false;
165
166    public MyRSRpcServices(HRegionServer rs) throws IOException {
167      super(rs);
168    }
169
170    @Override
171    public ScanResponse scan(RpcController controller, ScanRequest request)
172      throws ServiceException {
173      try {
174        if (!inTest) {
175          return super.scan(controller, request);
176        }
177
178        HRegion region = null;
179        if (request.hasRegion()) {
180          region = this.getRegion(request.getRegion());
181        }
182
183        if (region != null && !tableName.equals(region.getTableDescriptor().getTableName())) {
184          return super.scan(controller, request);
185        }
186
187        ScanResponse result = null;
188        // Simulate RpcCallContext is null for test.
189        Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
190        try {
191          result = super.scan(controller, request);
192        } finally {
193          rpcCall.ifPresent(RpcServer::setCurrentCall);
194        }
195        return result;
196      } catch (Throwable e) {
197        exceptionRef.set(e);
198        throw new ServiceException(e);
199      }
200    }
201  }
202
203}