001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import java.util.stream.Collectors; 032 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtility; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.Result; 040import org.apache.hadoop.hbase.client.Scan; 041import org.apache.hadoop.hbase.client.Scan.ReadType; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.filter.Filter; 044import org.apache.hadoop.hbase.filter.FilterBase; 045import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 046import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.junit.After; 051import org.junit.Assert; 052import org.junit.Before; 053import org.junit.ClassRule; 054import org.junit.Ignore; 055import org.junit.Test; 056import org.junit.experimental.categories.Category; 057 058@Category({ RegionServerTests.class, MediumTests.class }) 059public class TestSwitchToStreamRead { 060 061 @ClassRule 062 public static final HBaseClassTestRule CLASS_RULE = 063 HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); 064 065 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 066 067 private static TableName TABLE_NAME = TableName.valueOf("stream"); 068 069 private static byte[] FAMILY = Bytes.toBytes("cf"); 070 071 private static byte[] QUAL = Bytes.toBytes("cq"); 072 073 private static String VALUE_PREFIX; 074 075 private static HRegion REGION; 076 077 @Before 078 public void setUp() throws IOException { 079 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); 080 StringBuilder sb = new StringBuilder(256); 081 for (int i = 0; i < 255; i++) { 082 sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1)); 083 } 084 VALUE_PREFIX = sb.append("-").toString(); 085 REGION = UTIL.createLocalHRegion( 086 TableDescriptorBuilder.newBuilder(TABLE_NAME) 087 .setColumnFamily( 088 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) 089 .build(), 090 null, null); 091 for (int i = 0; i < 900; i++) { 092 REGION 093 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 094 } 095 REGION.flush(true); 096 for (int i = 900; i < 1000; i++) { 097 REGION 098 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 099 } 100 } 101 102 @After 103 public void tearDown() throws IOException { 104 REGION.close(true); 105 UTIL.cleanupTestDir(); 106 } 107 108 private Set<StoreFileReader> getStreamReaders() { 109 List<HStore> stores = REGION.getStores(); 110 Assert.assertEquals(1, stores.size()); 111 HStore firstStore = stores.get(0); 112 Assert.assertNotNull(firstStore); 113 Collection<HStoreFile> storeFiles = firstStore.getStorefiles(); 114 Assert.assertEquals(1, storeFiles.size()); 115 HStoreFile firstSToreFile = storeFiles.iterator().next(); 116 Assert.assertNotNull(firstSToreFile); 117 return Collections.unmodifiableSet(firstSToreFile.streamReaders); 118 } 119 120 /** 121 * Test Case for HBASE-21551 122 */ 123 @Test 124 public void testStreamReadersCleanup() throws IOException { 125 Set<StoreFileReader> streamReaders = getStreamReaders(); 126 Assert.assertEquals(0, getStreamReaders().size()); 127 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) { 128 StoreScanner storeScanner = 129 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); 130 List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream() 131 .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs) 132 .collect(Collectors.toList()); 133 Assert.assertEquals(1, sfScanners.size()); 134 StoreFileScanner sfScanner = sfScanners.get(0); 135 Assert.assertFalse(sfScanner.getReader().shared); 136 137 // There should be a stream reader 138 Assert.assertEquals(1, getStreamReaders().size()); 139 } 140 Assert.assertEquals(0, getStreamReaders().size()); 141 142 // The streamsReader should be clear after region close even if there're some opened stream 143 // scanner. 144 RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM)); 145 Assert.assertNotNull(scanner); 146 Assert.assertEquals(1, getStreamReaders().size()); 147 REGION.close(); 148 Assert.assertEquals(0, streamReaders.size()); 149 } 150 151 @Test 152 public void test() throws IOException { 153 try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { 154 StoreScanner storeScanner = 155 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); 156 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 157 if (kvs instanceof StoreFileScanner) { 158 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 159 // starting from pread so we use shared reader here. 160 assertTrue(sfScanner.getReader().shared); 161 } 162 } 163 List<Cell> cells = new ArrayList<>(); 164 for (int i = 0; i < 500; i++) { 165 assertTrue(scanner.next(cells)); 166 Result result = Result.create(cells); 167 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 168 cells.clear(); 169 scanner.shipped(); 170 } 171 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 172 if (kvs instanceof StoreFileScanner) { 173 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 174 // we should have convert to use stream read now. 175 assertFalse(sfScanner.getReader().shared); 176 } 177 } 178 for (int i = 500; i < 1000; i++) { 179 assertEquals(i != 999, scanner.next(cells)); 180 Result result = Result.create(cells); 181 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 182 cells.clear(); 183 scanner.shipped(); 184 } 185 } 186 // make sure all scanners are closed. 187 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 188 assertFalse(sf.isReferencedInReads()); 189 } 190 } 191 192 public static final class MatchLastRowKeyFilter extends FilterBase { 193 194 @Override 195 public boolean filterRowKey(Cell cell) throws IOException { 196 return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999; 197 } 198 } 199 200 private void testFilter(Filter filter) throws IOException { 201 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) { 202 StoreScanner storeScanner = 203 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); 204 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 205 if (kvs instanceof StoreFileScanner) { 206 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 207 // starting from pread so we use shared reader here. 208 assertTrue(sfScanner.getReader().shared); 209 } 210 } 211 List<Cell> cells = new ArrayList<>(); 212 // should return before finishing the scan as we want to switch from pread to stream 213 assertTrue(scanner.next(cells, 214 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 215 assertTrue(cells.isEmpty()); 216 scanner.shipped(); 217 218 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 219 if (kvs instanceof StoreFileScanner) { 220 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 221 // we should have convert to use stream read now. 222 assertFalse(sfScanner.getReader().shared); 223 } 224 } 225 assertFalse(scanner.next(cells, 226 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 227 Result result = Result.create(cells); 228 assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL))); 229 cells.clear(); 230 scanner.shipped(); 231 } 232 // make sure all scanners are closed. 233 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 234 assertFalse(sf.isReferencedInReads()); 235 } 236 } 237 238 // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next 239 // until the row key is changed. And there we can only use NoLimitScannerContext so we can not 240 // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to 241 // an infinite loop. Need to dig more, the code are way too complicated... 242 @Ignore 243 @Test 244 public void testFilterRowKey() throws IOException { 245 testFilter(new MatchLastRowKeyFilter()); 246 } 247 248 public static final class MatchLastRowCellNextColFilter extends FilterBase { 249 250 @Override 251 public ReturnCode filterCell(Cell c) throws IOException { 252 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 253 return ReturnCode.INCLUDE; 254 } else { 255 return ReturnCode.NEXT_COL; 256 } 257 } 258 } 259 260 @Test 261 public void testFilterCellNextCol() throws IOException { 262 testFilter(new MatchLastRowCellNextColFilter()); 263 } 264 265 public static final class MatchLastRowCellNextRowFilter extends FilterBase { 266 267 @Override 268 public ReturnCode filterCell(Cell c) throws IOException { 269 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 270 return ReturnCode.INCLUDE; 271 } else { 272 return ReturnCode.NEXT_ROW; 273 } 274 } 275 } 276 277 @Test 278 public void testFilterCellNextRow() throws IOException { 279 testFilter(new MatchLastRowCellNextRowFilter()); 280 } 281 282 public static final class MatchLastRowFilterRowFilter extends FilterBase { 283 284 private boolean exclude; 285 286 @Override 287 public void filterRowCells(List<Cell> kvs) throws IOException { 288 Cell c = kvs.get(0); 289 exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999; 290 } 291 292 @Override 293 public void reset() throws IOException { 294 exclude = false; 295 } 296 297 @Override 298 public boolean filterRow() throws IOException { 299 return exclude; 300 } 301 302 @Override 303 public boolean hasFilterRow() { 304 return true; 305 } 306 } 307 308 @Test 309 public void testFilterRow() throws IOException { 310 testFilter(new MatchLastRowFilterRowFilter()); 311 } 312}