001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicReference;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.ByteBuffAllocator;
039import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
040import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
041import org.apache.hadoop.hbase.ipc.RpcCall;
042import org.apache.hadoop.hbase.ipc.RpcServer;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.testclassification.RegionServerTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.AfterClass;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Rule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.rules.TestName;
053
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
059
060@Category({ RegionServerTests.class, LargeTests.class })
061public class TestRegionServerScan {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestRegionServerScan.class);
065
066  @Rule
067  public TestName name = new TestName();
068
069  private static final byte[] CF = Bytes.toBytes("CF");
070  private static final byte[] CQ = Bytes.toBytes("CQ");
071  private static final byte[] VALUE = new byte[1200];
072
073  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
074  private static final Configuration conf = TEST_UTIL.getConfiguration();
075  private static Admin admin = null;
076  static final TableName tableName = TableName.valueOf("TestRegionServerScan");
077  static final byte[] r0 = Bytes.toBytes("row-0");
078  static final byte[] r1 = Bytes.toBytes("row-1");
079  static final byte[] r2 = Bytes.toBytes("row-2");
080
081  @BeforeClass
082  public static void setupBeforeClass() throws Exception {
083    /**
084     * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after
085     * released.
086     */
087    conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
088      DeallocateRewriteByteBuffAllocator.class.getName());
089    conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
090    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
091    conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
092    conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);
093    conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
094    conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
095    conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
096    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
097    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);
098
099    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
100    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
101    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
102    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024);
103    TEST_UTIL.startMiniCluster(1);
104    admin = TEST_UTIL.getAdmin();
105  }
106
107  @AfterClass
108  public static void tearDownAfterClass() throws Exception {
109    TEST_UTIL.shutdownMiniCluster();
110  }
111
112  @Test
113  public void testScannWhenRpcCallContextNull() throws Exception {
114    ResultScanner resultScanner = null;
115    Table table = null;
116    try {
117      table = TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);
118      putToTable(table, r0);
119      putToTable(table, r1);
120      putToTable(table, r2);
121
122      admin.flush(table.getName());
123
124      Scan scan = new Scan();
125      scan.setCaching(2);
126      scan.withStartRow(r0, true).withStopRow(r2, true);
127
128      MyRSRpcServices.inTest = true;
129      resultScanner = table.getScanner(scan);
130      Result result = resultScanner.next();
131      byte[] rowKey = result.getRow();
132      assertTrue(Bytes.equals(r0, rowKey));
133
134      result = resultScanner.next();
135      rowKey = result.getRow();
136      assertTrue(Bytes.equals(r1, rowKey));
137
138      result = resultScanner.next();
139      rowKey = result.getRow();
140      assertTrue(Bytes.equals(r2, rowKey));
141      assertNull(resultScanner.next());
142      assertTrue(MyRSRpcServices.exceptionRef.get() == null);
143    } finally {
144      MyRSRpcServices.inTest = false;
145      if (resultScanner != null) {
146        resultScanner.close();
147      }
148      if (table != null) {
149        table.close();
150      }
151    }
152  }
153
154  private static void putToTable(Table table, byte[] rowkey) throws IOException {
155    Put put = new Put(rowkey);
156    put.addColumn(CF, CQ, VALUE);
157    table.put(put);
158  }
159
160  private static class MyRegionServer extends MiniHBaseClusterRegionServer {
161    public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
162      super(conf);
163    }
164
165    @Override
166    protected RSRpcServices createRpcServices() throws IOException {
167      return new MyRSRpcServices(this);
168    }
169  }
170
171  private static class MyRSRpcServices extends RSRpcServices {
172    private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
173    private static volatile boolean inTest = false;
174
175    public MyRSRpcServices(HRegionServer rs) throws IOException {
176      super(rs);
177    }
178
179    @Override
180    public ScanResponse scan(RpcController controller, ScanRequest request)
181      throws ServiceException {
182      try {
183        if (!inTest) {
184          return super.scan(controller, request);
185        }
186
187        HRegion region = null;
188        if (request.hasRegion()) {
189          region = this.getRegion(request.getRegion());
190        }
191
192        if (region != null && !tableName.equals(region.getTableDescriptor().getTableName())) {
193          return super.scan(controller, request);
194        }
195
196        ScanResponse result = null;
197        // Simulate RpcCallContext is null for test.
198        Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
199        try {
200          result = super.scan(controller, request);
201        } finally {
202          rpcCall.ifPresent(RpcServer::setCurrentCall);
203        }
204        return result;
205      } catch (Throwable e) {
206        exceptionRef.set(e);
207        throw new ServiceException(e);
208      }
209    }
210  }
211
212}