001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.concurrent.ForkJoinPool; 023import java.util.stream.Collectors; 024import java.util.stream.IntStream; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtility; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 029import org.apache.hadoop.hbase.testclassification.ClientTests; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.junit.AfterClass; 033import org.junit.BeforeClass; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038@Category({ MediumTests.class, ClientTests.class }) 039public class TestAsyncTableScannerCloseWhileSuspending { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestAsyncTableScannerCloseWhileSuspending.class); 044 045 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 046 047 private static TableName TABLE_NAME = TableName.valueOf("async"); 048 049 private static byte[] FAMILY = Bytes.toBytes("cf"); 050 051 private static byte[] CQ = Bytes.toBytes("cq"); 052 053 private static AsyncConnection CONN; 054 055 private static AsyncTable<?> TABLE; 056 057 @BeforeClass 058 public static void setUp() throws Exception { 059 TEST_UTIL.startMiniCluster(1); 060 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 061 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 062 TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 063 TABLE.putAll(IntStream.range(0, 100).mapToObj( 064 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 065 .collect(Collectors.toList())).get(); 066 } 067 068 @AfterClass 069 public static void tearDown() throws Exception { 070 CONN.close(); 071 TEST_UTIL.shutdownMiniCluster(); 072 } 073 074 private int getScannersCount() { 075 return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 076 .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()) 077 .sum(); 078 } 079 080 @Test 081 public void testCloseScannerWhileSuspending() throws Exception { 082 try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) { 083 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 084 085 @Override 086 public boolean evaluate() throws Exception { 087 return ((AsyncTableResultScanner) scanner).isSuspended(); 088 } 089 090 @Override 091 public String explainFailure() throws Exception { 092 return "The given scanner has been suspended in time"; 093 } 094 }); 095 assertEquals(1, getScannersCount()); 096 } 097 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 098 099 @Override 100 public boolean evaluate() throws Exception { 101 return getScannersCount() == 0; 102 } 103 104 @Override 105 public String explainFailure() throws Exception { 106 return "Still have " + getScannersCount() + " scanners opened"; 107 } 108 }); 109 } 110}