001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.ForkJoinPool; 024import java.util.concurrent.atomic.AtomicInteger; 025import java.util.stream.Collectors; 026import java.util.stream.IntStream; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 030import org.apache.hadoop.hbase.testclassification.ClientTests; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.junit.jupiter.api.AfterAll; 034import org.junit.jupiter.api.BeforeAll; 035import org.junit.jupiter.api.Tag; 036import org.junit.jupiter.api.Test; 037 038@Tag(MediumTests.TAG) 039@Tag(ClientTests.TAG) 040public class TestAsyncTableScannerCloseWhileSuspending { 041 042 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 043 044 private static TableName TABLE_NAME = TableName.valueOf("async"); 045 046 private static byte[] FAMILY = Bytes.toBytes("cf"); 047 048 private static byte[] CQ = Bytes.toBytes("cq"); 049 050 private static AsyncConnection CONN; 051 052 private static AsyncTable<?> TABLE; 053 054 @BeforeAll 055 public static void setUp() throws Exception { 056 TEST_UTIL.startMiniCluster(1); 057 TEST_UTIL.createTable(TABLE_NAME, FAMILY); 058 CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 059 TABLE = CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 060 TABLE.putAll(IntStream.range(0, 100).mapToObj( 061 i -> new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i))) 062 .collect(Collectors.toList())).get(); 063 } 064 065 @AfterAll 066 public static void tearDown() throws Exception { 067 CONN.close(); 068 TEST_UTIL.shutdownMiniCluster(); 069 } 070 071 private int getScannersCount() { 072 return TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() 073 .map(t -> t.getRegionServer()).mapToInt(rs -> rs.getRSRpcServices().getScannersCount()).sum(); 074 } 075 076 @Test 077 public void testCloseScannerWhileSuspending() throws Exception { 078 final AtomicInteger onNextCounter = new AtomicInteger(0); 079 final CountDownLatch latch = new CountDownLatch(1); 080 final Scan scan = new Scan().setMaxResultSize(1); 081 final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) { 082 @Override 083 public void onNext(Result[] results, ScanController controller) { 084 onNextCounter.incrementAndGet(); 085 super.onNext(results, controller); 086 } 087 088 @Override 089 public void onComplete() { 090 super.onComplete(); 091 latch.countDown(); 092 } 093 }; 094 095 CONN.getTable(TABLE_NAME).scan(scan, scanner); 096 097 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 098 099 @Override 100 public boolean evaluate() throws Exception { 101 return scanner.isSuspended(); 102 } 103 104 @Override 105 public String explainFailure() throws Exception { 106 return "The given scanner has been suspended in time"; 107 } 108 }); 109 assertEquals(1, getScannersCount()); 110 assertEquals(1, onNextCounter.get()); 111 112 scanner.close(); 113 TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() { 114 115 @Override 116 public boolean evaluate() throws Exception { 117 return getScannersCount() == 0; 118 } 119 120 @Override 121 public String explainFailure() throws Exception { 122 return "Still have " + getScannersCount() + " scanners opened"; 123 } 124 }); 125 latch.await(); 126 assertEquals(1, onNextCounter.get()); 127 } 128}