001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.util.concurrent.ForkJoinPool; 023import java.util.stream.Collectors; 024import java.util.stream.IntStream; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.HBaseTestingUtility; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 029import org.apache.hadoop.hbase.testclassification.ClientTests; 030import org.apache.hadoop.hbase.testclassification.MediumTests; 031import org.apache.hadoop.hbase.util.Bytes; 032import org.junit.AfterClass; 033import org.junit.BeforeClass; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038@Category({ MediumTests.class, ClientTests.class }) 039public class TestAsyncTableScannerCloseWhileSuspending { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestAsyncTableScannerCloseWhileSuspending.class); 044 045 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 046 047 private static TableName TABLE_NAME = TableName.valueOf("async"); 048 049 private static byte[] FAMILY = Bytes.toBytes("cf"); 050 051 private static byte[] CQ = Bytes.toBytes("cq"); 052 053 private static AsyncConnection CONN; 054 055 private static AsyncTable<?> TABLE; 056 057 @BeforeClass 058 public static void setUp() throws Exception { 059 TEST_UTIL.startMiniCluster(1); 060 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 061 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 062 TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 063 TABLE.putAll(IntStream.range(0, 100).mapToObj( 064 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 065 .collect(Collectors.toList())).get(); 066 } 067 068 @AfterClass 069 public static void tearDown() throws Exception { 070 CONN.close(); 071 TEST_UTIL.shutdownMiniCluster(); 072 } 073 074 private int getScannersCount() { 075 return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 076 .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()).sum(); 077 } 078 079 @Test 080 public void testCloseScannerWhileSuspending() throws Exception { 081 try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) { 082 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 083 084 @Override 085 public boolean evaluate() throws Exception { 086 return ((AsyncTableResultScanner) scanner).isSuspended(); 087 } 088 089 @Override 090 public String explainFailure() throws Exception { 091 return "The given scanner has been suspended in time"; 092 } 093 }); 094 assertEquals(1, getScannersCount()); 095 } 096 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 097 098 @Override 099 public boolean evaluate() throws Exception { 100 return getScannersCount() == 0; 101 } 102 103 @Override 104 public String explainFailure() throws Exception { 105 return "Still have " + getScannersCount() + " scanners opened"; 106 } 107 }); 108 } 109}