001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.concurrent.ThreadLocalRandom; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.filter.Filter; 037import org.apache.hadoop.hbase.filter.FilterBase; 038import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType; 039import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; 040import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; 041import org.apache.hadoop.hbase.testclassification.RegionServerTests; 042import org.apache.hadoop.hbase.testclassification.SmallTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.After; 045import org.junit.Before; 046import org.junit.ClassRule; 047import org.junit.Ignore; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051@Category({ RegionServerTests.class, SmallTests.class }) 052public class TestSwitchToStreamRead { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); 057 058 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 059 060 private static TableName TABLE_NAME = TableName.valueOf("stream"); 061 062 private static byte[] FAMILY = Bytes.toBytes("cf"); 063 064 private static byte[] QUAL = Bytes.toBytes("cq"); 065 066 private static String VALUE_PREFIX; 067 068 private static HRegion REGION; 069 070 @Before 071 public void setUp() throws IOException { 072 UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); 073 StringBuilder sb = new StringBuilder(256); 074 for (int i = 0; i < 255; i++) { 075 sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1)); 076 } 077 VALUE_PREFIX = sb.append("-").toString(); 078 REGION = UTIL.createLocalHRegion( 079 TableDescriptorBuilder.newBuilder(TABLE_NAME) 080 .setColumnFamily( 081 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) 082 .build(), 083 null, null); 084 for (int i = 0; i < 900; i++) { 085 REGION 086 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 087 } 088 REGION.flush(true); 089 for (int i = 900; i < 1000; i++) { 090 REGION 091 .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); 092 } 093 } 094 095 @After 096 public void tearDown() throws IOException { 097 REGION.close(true); 098 UTIL.cleanupTestDir(); 099 } 100 101 @Test 102 public void test() throws IOException { 103 try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { 104 StoreScanner storeScanner = 105 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); 106 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 107 if (kvs instanceof StoreFileScanner) { 108 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 109 // starting from pread so we use shared reader here. 110 assertTrue(sfScanner.getReader().getReaderContext() 111 .getReaderType() == ReaderType.PREAD); 112 } 113 } 114 List<Cell> cells = new ArrayList<>(); 115 for (int i = 0; i < 500; i++) { 116 assertTrue(scanner.next(cells)); 117 Result result = Result.create(cells); 118 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 119 cells.clear(); 120 scanner.shipped(); 121 } 122 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 123 if (kvs instanceof StoreFileScanner) { 124 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 125 // we should have convert to use stream read now. 126 assertFalse(sfScanner.getReader().getReaderContext() 127 .getReaderType() == ReaderType.PREAD); 128 } 129 } 130 for (int i = 500; i < 1000; i++) { 131 assertEquals(i != 999, scanner.next(cells)); 132 Result result = Result.create(cells); 133 assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); 134 cells.clear(); 135 scanner.shipped(); 136 } 137 } 138 // make sure all scanners are closed. 139 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 140 assertFalse(sf.isReferencedInReads()); 141 } 142 } 143 144 public static final class MatchLastRowKeyFilter extends FilterBase { 145 146 @Override 147 public boolean filterRowKey(Cell cell) throws IOException { 148 return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999; 149 } 150 } 151 152 private void testFilter(Filter filter) throws IOException { 153 try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) { 154 StoreScanner storeScanner = 155 (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); 156 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 157 if (kvs instanceof StoreFileScanner) { 158 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 159 // starting from pread so we use shared reader here. 160 assertTrue(sfScanner.getReader().getReaderContext() 161 .getReaderType() == ReaderType.PREAD); 162 } 163 } 164 List<Cell> cells = new ArrayList<>(); 165 // should return before finishing the scan as we want to switch from pread to stream 166 assertTrue(scanner.next(cells, 167 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 168 assertTrue(cells.isEmpty()); 169 scanner.shipped(); 170 171 for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { 172 if (kvs instanceof StoreFileScanner) { 173 StoreFileScanner sfScanner = (StoreFileScanner) kvs; 174 // we should have convert to use stream read now. 175 assertFalse(sfScanner.getReader().getReaderContext() 176 .getReaderType() == ReaderType.PREAD); 177 } 178 } 179 assertFalse(scanner.next(cells, 180 ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); 181 Result result = Result.create(cells); 182 assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL))); 183 cells.clear(); 184 scanner.shipped(); 185 } 186 // make sure all scanners are closed. 187 for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { 188 assertFalse(sf.isReferencedInReads()); 189 } 190 } 191 192 // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next 193 // until the row key is changed. And there we can only use NoLimitScannerContext so we can not 194 // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to 195 // an infinite loop. Need to dig more, the code are way too complicated... 196 @Ignore 197 @Test 198 public void testFilterRowKey() throws IOException { 199 testFilter(new MatchLastRowKeyFilter()); 200 } 201 202 public static final class MatchLastRowCellNextColFilter extends FilterBase { 203 204 @Override 205 public ReturnCode filterCell(Cell c) throws IOException { 206 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 207 return ReturnCode.INCLUDE; 208 } else { 209 return ReturnCode.NEXT_COL; 210 } 211 } 212 } 213 214 @Test 215 public void testFilterCellNextCol() throws IOException { 216 testFilter(new MatchLastRowCellNextColFilter()); 217 } 218 219 public static final class MatchLastRowCellNextRowFilter extends FilterBase { 220 221 @Override 222 public ReturnCode filterCell(Cell c) throws IOException { 223 if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { 224 return ReturnCode.INCLUDE; 225 } else { 226 return ReturnCode.NEXT_ROW; 227 } 228 } 229 } 230 231 @Test 232 public void testFilterCellNextRow() throws IOException { 233 testFilter(new MatchLastRowCellNextRowFilter()); 234 } 235 236 public static final class MatchLastRowFilterRowFilter extends FilterBase { 237 238 private boolean exclude; 239 240 @Override 241 public void filterRowCells(List<Cell> kvs) throws IOException { 242 Cell c = kvs.get(0); 243 exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999; 244 } 245 246 @Override 247 public void reset() throws IOException { 248 exclude = false; 249 } 250 251 @Override 252 public boolean filterRow() throws IOException { 253 return exclude; 254 } 255 256 @Override 257 public boolean hasFilterRow() { 258 return true; 259 } 260 } 261 262 @Test 263 public void testFilterRow() throws IOException { 264 testFilter(new MatchLastRowFilterRowFilter()); 265 } 266}