001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertSame; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ThreadLocalRandom; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Cell; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Scan; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.filter.Filter; 044import org.apache.hadoop.hbase.filter.FilterBase; 045import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 046import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.testclassification.SmallTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.junit.After; 051import org.junit.Before; 052import org.junit.ClassRule; 053import org.junit.Ignore; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056 057@Category({ RegionServerTests.class, SmallTests.class }) 058public class TestSwitchToStreamRead { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); 063 064 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 065 066 private static TableName TABLE_NAME = TableName.valueOf("stream"); 067 068 private static byte[] FAMILY = Bytes.toBytes("cf"); 069 070 private static byte[] QUAL = Bytes.toBytes("cq"); 071 072 private static String VALUE_PREFIX; 073 074 private static HRegion REGION; 075 076 @Before 077 public void setUp() throws IOException { 078 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); 079 StringBuilder sb = new StringBuilder(256); 080 for (int i = 0; i < 255; i++) { 081 sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1)); 082 } 083 VALUE_PREFIX = sb.append("-").toString(); 084 REGION = UTIL.createLocalHRegion( 085 TableDescriptorBuilder.newBuilder(TABLE_NAME) 086 .setColumnFamily( 087 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) 088 .build(), 089 null, null); 090 for (int i = 0; i < 900; i++) { 091 REGION 092 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 093 } 094 REGION.flush(true); 095 for (int i = 900; i < 1000; i++) { 096 REGION 097 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 098 } 099 } 100 101 @After 102 public void tearDown() throws IOException { 103 REGION.close(true); 104 UTIL.cleanupTestDir(); 105 } 106 107 @Test 108 public void test() throws IOException { 109 try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { 110 StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting(); 111 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 112 if (kvs instanceof StoreFileScanner) { 113 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 114 // starting from pread so we use shared reader here. 115 assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 116 } 117 } 118 List<Cell> cells = new ArrayList<>(); 119 for (int i = 0; i < 500; i++) { 120 assertTrue(scanner.next(cells)); 121 Result result = Result.create(cells); 122 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 123 cells.clear(); 124 scanner.shipped(); 125 } 126 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 127 if (kvs instanceof StoreFileScanner) { 128 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 129 // we should have convert to use stream read now. 130 assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 131 } 132 } 133 for (int i = 500; i < 1000; i++) { 134 assertEquals(i != 999, scanner.next(cells)); 135 Result result = Result.create(cells); 136 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 137 cells.clear(); 138 scanner.shipped(); 139 } 140 } 141 // make sure all scanners are closed. 142 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 143 assertFalse(sf.isReferencedInReads()); 144 } 145 } 146 147 public static final class MatchLastRowKeyFilter extends FilterBase { 148 149 @Override 150 public boolean filterRowKey(Cell cell) throws IOException { 151 return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999; 152 } 153 } 154 155 private void testFilter(Filter filter) throws IOException { 156 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) { 157 StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting(); 158 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 159 if (kvs instanceof StoreFileScanner) { 160 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 161 // starting from pread so we use shared reader here. 162 assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 163 } 164 } 165 List<Cell> cells = new ArrayList<>(); 166 // should return before finishing the scan as we want to switch from pread to stream 167 assertTrue(scanner.next(cells, 168 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 169 assertTrue(cells.isEmpty()); 170 scanner.shipped(); 171 172 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 173 if (kvs instanceof StoreFileScanner) { 174 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 175 // we should have convert to use stream read now. 176 assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD); 177 } 178 } 179 assertFalse(scanner.next(cells, 180 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 181 Result result = Result.create(cells); 182 assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL))); 183 cells.clear(); 184 scanner.shipped(); 185 } 186 // make sure all scanners are closed. 187 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 188 assertFalse(sf.isReferencedInReads()); 189 } 190 } 191 192 // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next 193 // until the row key is changed. And there we can only use NoLimitScannerContext so we can not 194 // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to 195 // an infinite loop. Need to dig more, the code are way too complicated... 196 @Ignore 197 @Test 198 public void testFilterRowKey() throws IOException { 199 testFilter(new MatchLastRowKeyFilter()); 200 } 201 202 public static final class MatchLastRowCellNextColFilter extends FilterBase { 203 204 @Override 205 public ReturnCode filterCell(Cell c) throws IOException { 206 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 207 return ReturnCode.INCLUDE; 208 } else { 209 return ReturnCode.NEXT_COL; 210 } 211 } 212 } 213 214 @Test 215 public void testFilterCellNextCol() throws IOException { 216 testFilter(new MatchLastRowCellNextColFilter()); 217 } 218 219 public static final class MatchLastRowCellNextRowFilter extends FilterBase { 220 221 @Override 222 public ReturnCode filterCell(Cell c) throws IOException { 223 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 224 return ReturnCode.INCLUDE; 225 } else { 226 return ReturnCode.NEXT_ROW; 227 } 228 } 229 } 230 231 @Test 232 public void testFilterCellNextRow() throws IOException { 233 testFilter(new MatchLastRowCellNextRowFilter()); 234 } 235 236 public static final class MatchLastRowFilterRowFilter extends FilterBase { 237 238 private boolean exclude; 239 240 @Override 241 public void filterRowCells(List<Cell> kvs) throws IOException { 242 Cell c = kvs.get(0); 243 exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999; 244 } 245 246 @Override 247 public void reset() throws IOException { 248 exclude = false; 249 } 250 251 @Override 252 public boolean filterRow() throws IOException { 253 return exclude; 254 } 255 256 @Override 257 public boolean hasFilterRow() { 258 return true; 259 } 260 } 261 262 @Test 263 public void testFilterRow() throws IOException { 264 testFilter(new MatchLastRowFilterRowFilter()); 265 } 266 267 /** 268 * Verifies that when the store scanner switches from pread to stream read successfully, all store 269 * files that were read (including those closed during the switch) are reported by 270 * {@link StoreScanner#getFilesRead()} after close. 271 */ 272 @Test 273 public void testGetFilesReadOnTrySwitchToStreamRead() throws Exception { 274 HStore store = REGION.getStore(FAMILY); 275 FileSystem fs = REGION.getFilesystem(); 276 277 // Set a very small preadMaxBytes so that trySwitchToStreamRead is triggered during scan. 278 long originalPreadMaxBytes = 279 UTIL.getConfiguration().getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); 280 try { 281 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 10L); 282 283 ScanInfo scanInfo = 284 new ScanInfo(UTIL.getConfiguration(), FAMILY, 0, Integer.MAX_VALUE, Long.MAX_VALUE, 285 org.apache.hadoop.hbase.KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 286 org.apache.hadoop.hbase.CellComparator.getInstance(), false); 287 Scan scan = new Scan().setReadType(Scan.ReadType.DEFAULT); 288 long readPt = 289 REGION.getReadPoint(org.apache.hadoop.hbase.client.IsolationLevel.READ_COMMITTED); 290 291 StoreScanner storeScanner = new StoreScanner(store, scanInfo, scan, null, readPt); 292 293 // Collect expected store file paths (qualified) for assertion after close. 294 Set<Path> expectedFilePaths = new HashSet<>(); 295 for (HStoreFile sf : store.getStorefiles()) { 296 expectedFilePaths.add(fs.makeQualified(sf.getPath())); 297 } 298 assertFalse("Should have at least one store file", expectedFilePaths.isEmpty()); 299 300 // Verify scanners start in PREAD mode before the switch. 301 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 302 if (kvs instanceof StoreFileScanner) { 303 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 304 assertSame("Scanner should start in PREAD mode", ReaderType.PREAD, 305 sfScanner.getReader().getReaderContext().getReaderType()); 306 } 307 } 308 309 // Scan a few rows and call shipped() to trigger trySwitchToStreamRead. 310 List<Cell> results = new ArrayList<>(); 311 ScannerContext scannerContext = ScannerContext.newBuilder().build(); 312 boolean switchVerified = false; 313 while (storeScanner.next(results, scannerContext)) { 314 results.clear(); 315 storeScanner.shipped(); 316 317 // Check mid-scan, whether the switch happened. 318 if (!switchVerified) { 319 boolean allSwitched = true; 320 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 321 if (kvs instanceof StoreFileScanner) { 322 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 323 if (sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD) { 324 allSwitched = false; 325 break; 326 } 327 } 328 } 329 if (allSwitched) { 330 switchVerified = true; 331 } 332 } 333 } 334 assertTrue("trySwitchToStreamRead should have been invoked and scanners switched to stream", 335 switchVerified); 336 337 // Not closing the scanners explicitly, because those must be closed during 338 // trySwitchToStreamRead 339 340 // After close: files that were read (including those closed during switch) must be tracked. 341 Set<Path> filesRead = storeScanner.getFilesRead(); 342 assertEquals("Should have exact file count after close", expectedFilePaths.size(), 343 filesRead.size()); 344 assertEquals("Should contain all expected store file paths", expectedFilePaths, filesRead); 345 } finally { 346 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 347 originalPreadMaxBytes); 348 } 349 } 350}