001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.stream.Collectors; 032 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Scan.ReadType; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.After; 048import org.junit.Assert; 049import org.junit.Before; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054@Category({ RegionServerTests.class, MediumTests.class }) 055public class TestSwitchToStreamRead { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); 060 061 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 062 063 private static TableName TABLE_NAME = TableName.valueOf("stream"); 064 065 private static byte[] FAMILY = Bytes.toBytes("cf"); 066 067 private static byte[] QUAL = Bytes.toBytes("cq"); 068 069 private static String VALUE_PREFIX; 070 071 private static HRegion REGION; 072 073 @Before 074 public void setUp() throws IOException { 075 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); 076 StringBuilder sb = new StringBuilder(256); 077 for (int i = 0; i < 255; i++) { 078 sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1)); 079 } 080 VALUE_PREFIX = sb.append("-").toString(); 081 REGION = UTIL.createLocalHRegion( 082 TableDescriptorBuilder.newBuilder(TABLE_NAME) 083 .setColumnFamily( 084 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) 085 .build(), 086 null, null); 087 for (int i = 0; i < 900; i++) { 088 REGION 089 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 090 } 091 REGION.flush(true); 092 for (int i = 900; i < 1000; i++) { 093 REGION 094 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 095 } 096 } 097 098 @After 099 public void tearDown() throws IOException { 100 REGION.close(true); 101 UTIL.cleanupTestDir(); 102 } 103 104 private Set<StoreFileReader> getStreamReaders() { 105 List<HStore> stores = REGION.getStores(); 106 Assert.assertEquals(1, stores.size()); 107 HStore firstStore = stores.get(0); 108 Assert.assertNotNull(firstStore); 109 Collection<HStoreFile> storeFiles = firstStore.getStorefiles(); 110 Assert.assertEquals(1, storeFiles.size()); 111 HStoreFile firstSToreFile = storeFiles.iterator().next(); 112 Assert.assertNotNull(firstSToreFile); 113 return Collections.unmodifiableSet(firstSToreFile.streamReaders); 114 } 115 116 /** 117 * Test Case for HBASE-21551 118 */ 119 @Test 120 public void testStreamReadersCleanup() throws IOException { 121 Set<StoreFileReader> streamReaders = getStreamReaders(); 122 Assert.assertEquals(0, getStreamReaders().size()); 123 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) { 124 StoreScanner storeScanner = 125 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); 126 List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream() 127 .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs) 128 .collect(Collectors.toList()); 129 Assert.assertEquals(1, sfScanners.size()); 130 StoreFileScanner sfScanner = sfScanners.get(0); 131 Assert.assertFalse(sfScanner.getReader().shared); 132 133 // There should be a stream reader 134 Assert.assertEquals(1, getStreamReaders().size()); 135 } 136 Assert.assertEquals(0, getStreamReaders().size()); 137 138 // The streamsReader should be clear after region close even if there're some opened stream 139 // scanner. 140 RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM)); 141 Assert.assertNotNull(scanner); 142 Assert.assertEquals(1, getStreamReaders().size()); 143 REGION.close(); 144 Assert.assertEquals(0, streamReaders.size()); 145 } 146 147 @Test 148 public void test() throws IOException { 149 try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { 150 StoreScanner storeScanner = (StoreScanner) (scanner) 151 .getStoreHeapForTesting().getCurrentForTesting(); 152 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 153 if (kvs instanceof StoreFileScanner) { 154 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 155 // starting from pread so we use shared reader here. 156 assertTrue(sfScanner.getReader().shared); 157 } 158 } 159 List<Cell> cells = new ArrayList<>(); 160 for (int i = 0; i < 500; i++) { 161 assertTrue(scanner.next(cells)); 162 Result result = Result.create(cells); 163 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 164 cells.clear(); 165 scanner.shipped(); 166 } 167 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 168 if (kvs instanceof StoreFileScanner) { 169 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 170 // we should have convert to use stream read now. 171 assertFalse(sfScanner.getReader().shared); 172 } 173 } 174 for (int i = 500; i < 1000; i++) { 175 assertEquals(i != 999, scanner.next(cells)); 176 Result result = Result.create(cells); 177 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 178 cells.clear(); 179 scanner.shipped(); 180 } 181 } 182 // make sure all scanners are closed. 183 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 184 assertFalse(sf.isReferencedInReads()); 185 } 186 } 187}