001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Optional;
026import java.util.concurrent.atomic.AtomicReference;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Get;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.Result;
038import org.apache.hadoop.hbase.client.ResultScanner;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.coprocessor.ObserverContext;
042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
044import org.apache.hadoop.hbase.coprocessor.RegionObserver;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.ipc.ServerCall;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.AfterClass;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Rule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.rules.TestName;
057
058import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
059import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
067
068@Category({ RegionServerTests.class, MediumTests.class })
069public class TestShortCircuitGet {
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestShortCircuitGet.class);
073
074  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
075  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
076  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
077  private static final byte[] VALUE = Bytes.toBytes("testValue");
078
079  static final byte[] r0 = Bytes.toBytes("row-0");
080  static final byte[] r1 = Bytes.toBytes("row-1");
081  static final byte[] r2 = Bytes.toBytes("row-2");
082  static final byte[] r3 = Bytes.toBytes("row-3");
083  static final byte[] r4 = Bytes.toBytes("row-4");
084  static final byte[] r5 = Bytes.toBytes("row-5");
085  static final byte[] r6 = Bytes.toBytes("row-6");
086  static final TableName tableName = TableName.valueOf("TestShortCircuitGet");
087
088  @Rule
089  public TestName name = new TestName();
090
091  @BeforeClass
092  public static void setUpBeforeClass() throws Exception {
093    Configuration conf = TEST_UTIL.getConfiguration();
094    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
095    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);
096
097    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
098    conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
099    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
100    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
101    TEST_UTIL.startMiniCluster(1);
102  }
103
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    TEST_UTIL.shutdownMiniCluster();
107  }
108
109  /**
110   * This test is for HBASE-26821,when we initiate get or scan in cp, the {@link RegionScanner} for
111   * get and scan is close when get or scan is completed.
112   */
113  @Test
114  public void testScannerCloseWhenScanAndGetInCP() throws Exception {
115    Table table = null;
116    try {
117      table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1,
118        HConstants.DEFAULT_BLOCKSIZE, MyScanObserver.class.getName());
119      putToTable(table, r0);
120      putToTable(table, r1);
121      putToTable(table, r2);
122      putToTable(table, r3);
123      putToTable(table, r4);
124      putToTable(table, r5);
125      putToTable(table, r6);
126    } finally {
127      if (table != null) {
128        table.close();
129      }
130    }
131
132    final Configuration conf = TEST_UTIL.getConfiguration();
133    ResultScanner resultScanner = null;
134    Connection conn = null;
135    Table clientTable = null;
136    try {
137      conn = ConnectionFactory.createConnection(conf);
138      clientTable = conn.getTable(tableName);
139      Scan scan = new Scan();
140      scan.setCaching(1);
141      scan.withStartRow(r0, true).withStopRow(r1, true);
142      resultScanner = table.getScanner(scan);
143      Result result = resultScanner.next();
144      assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow()));
145      result = resultScanner.next();
146      assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
147      assertNull(resultScanner.next());
148    } finally {
149      if (resultScanner != null) {
150        resultScanner.close();
151      }
152      if (clientTable != null) {
153        clientTable.close();
154      }
155      if (conn != null) {
156        conn.close();
157      }
158    }
159
160    assertTrue(MyRSRpcServices.exceptionRef.get() == null);
161    assertTrue(MyScanObserver.exceptionRef.get() == null);
162  }
163
164  private void putToTable(Table ht, byte[] rowkey) throws IOException {
165    Put put = new Put(rowkey);
166    put.addColumn(FAMILY, QUALIFIER, VALUE);
167    ht.put(put);
168  }
169
170  private static class MyRegionServer extends MiniHBaseClusterRegionServer {
171    public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
172      super(conf);
173    }
174
175    @Override
176    protected RSRpcServices createRpcServices() throws IOException {
177      return new MyRSRpcServices(this);
178    }
179  }
180
181  private static class MyRSRpcServices extends RSRpcServices {
182    private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
183
184    public MyRSRpcServices(HRegionServer rs) throws IOException {
185      super(rs);
186    }
187
188    @Override
189    public MultiResponse multi(RpcController rpcc, MultiRequest request) throws ServiceException {
190      try {
191        if (!MyScanObserver.inCP) {
192          return super.multi(rpcc, request);
193        }
194
195        assertTrue(!RpcServer.getCurrentCall().isPresent());
196        return super.multi(rpcc, request);
197      } catch (Throwable e) {
198        exceptionRef.set(e);
199        throw new ServiceException(e);
200      }
201    }
202
203    @Override
204    public ScanResponse scan(RpcController controller, ScanRequest request)
205      throws ServiceException {
206      try {
207        if (!MyScanObserver.inCP) {
208          return super.scan(controller, request);
209        }
210
211        HRegion region = null;
212        if (request.hasRegion()) {
213          region = this.getRegion(request.getRegion());
214        }
215
216        if (
217          region != null && TableName.isMetaTableName(region.getTableDescriptor().getTableName())
218        ) {
219          return super.scan(controller, request);
220        }
221
222        assertTrue(!RpcServer.getCurrentCall().isPresent());
223        return super.scan(controller, request);
224      } catch (Throwable e) {
225        exceptionRef.set(e);
226        throw new ServiceException(e);
227      }
228    }
229
230    @Override
231    public GetResponse get(RpcController controller, GetRequest request) throws ServiceException {
232      try {
233        if (!MyScanObserver.inCP) {
234          return super.get(controller, request);
235        }
236
237        HRegion region = null;
238        if (request.hasRegion()) {
239          region = this.getRegion(request.getRegion());
240        }
241        if (
242          region != null && TableName.isMetaTableName(region.getTableDescriptor().getTableName())
243        ) {
244          return super.get(controller, request);
245        }
246
247        assertTrue(!RpcServer.getCurrentCall().isPresent());
248        return super.get(controller, request);
249      } catch (Throwable e) {
250        exceptionRef.set(e);
251        throw new ServiceException(e);
252      }
253    }
254  }
255
256  public static class MyScanObserver implements RegionCoprocessor, RegionObserver {
257
258    private static volatile boolean inCP = false;
259    private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
260
261    @Override
262    public Optional<RegionObserver> getRegionObserver() {
263      return Optional.of(this);
264    }
265
266    @SuppressWarnings("rawtypes")
267    @Override
268    public RegionScanner postScannerOpen(
269      final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Scan scan,
270      final RegionScanner regionScanner) throws IOException {
271
272      if (inCP) {
273        return regionScanner;
274      }
275
276      HRegion region = (HRegion) observerContext.getEnvironment().getRegion();
277      int prevScannerCount = region.scannerReadPoints.size();
278      Table table1 = null;
279      Get get2 = new Get(r2);
280      inCP = true;
281      try (Connection connection = observerContext.getEnvironment()
282        .createConnection(observerContext.getEnvironment().getConfiguration())) {
283        try {
284          table1 = connection.getTable(tableName);
285          Result result = table1.get(get2);
286          assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
287
288        } finally {
289          if (table1 != null) {
290            table1.close();
291          }
292          inCP = false;
293        }
294
295        // RegionScanner is closed and there is no rpcCallBack set
296        assertTrue(prevScannerCount == region.scannerReadPoints.size());
297        ServerCall serverCall = (ServerCall) RpcServer.getCurrentCall().get();
298        assertTrue(serverCall.getCallBack() == null);
299
300        Get get3 = new Get(r3);
301        Get get4 = new Get(r4);
302        Table table2 = null;
303        inCP = true;
304        try {
305          table2 = connection.getTable(tableName);
306          Result[] results = table2.get(Arrays.asList(get3, get4));
307          assertTrue("Expected row: row-3", Bytes.equals(r3, results[0].getRow()));
308          assertTrue("Expected row: row-4", Bytes.equals(r4, results[1].getRow()));
309        } finally {
310          if (table2 != null) {
311            table2.close();
312          }
313          inCP = false;
314        }
315
316        // RegionScanner is closed and there is no rpcCallBack set
317        assertTrue(prevScannerCount == region.scannerReadPoints.size());
318        serverCall = (ServerCall) RpcServer.getCurrentCall().get();
319        assertTrue(serverCall.getCallBack() == null);
320
321        Scan newScan = new Scan();
322        newScan.setCaching(1);
323        newScan.withStartRow(r5, true).withStopRow(r6, true);
324        Table table3 = null;
325        ResultScanner resultScanner = null;
326        inCP = true;
327        try {
328          table3 = connection.getTable(tableName);
329          resultScanner = table3.getScanner(newScan);
330          Result result = resultScanner.next();
331          assertTrue("Expected row: row-5", Bytes.equals(r5, result.getRow()));
332          result = resultScanner.next();
333          assertTrue("Expected row: row-6", Bytes.equals(r6, result.getRow()));
334          result = resultScanner.next();
335          assertNull(result);
336        } finally {
337          if (resultScanner != null) {
338            resultScanner.close();
339          }
340          if (table3 != null) {
341            table3.close();
342          }
343          inCP = false;
344        }
345
346        // RegionScanner is closed and there is no rpcCallBack set
347        assertTrue(prevScannerCount == region.scannerReadPoints.size());
348        serverCall = (ServerCall) RpcServer.getCurrentCall().get();
349        assertTrue(serverCall.getCallBack() == null);
350        return regionScanner;
351      } catch (Throwable e) {
352        exceptionRef.set(e);
353        throw e;
354      }
355    }
356  }
357
358}