001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Random; 032import java.util.Set; 033import java.util.TreeSet; 034import java.util.concurrent.ThreadLocalRandom; 035import java.util.stream.Stream; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.CellComparatorImpl; 038import org.apache.hadoop.hbase.CellUtil; 039import org.apache.hadoop.hbase.ExtendedCell; 040import org.apache.hadoop.hbase.HBaseTestingUtil; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.KeyValueTestUtil; 043import org.apache.hadoop.hbase.PrivateCellUtil; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 045import org.apache.hadoop.hbase.client.Delete; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.io.compress.Compression; 049import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 050import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 051import org.apache.hadoop.hbase.util.BloomFilterUtil; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.jupiter.api.TestTemplate; 054import org.junit.jupiter.params.provider.Arguments; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Tests optimized scanning of multiple columns. <br> 060 * We separated the big test into several sub-class UT, because When in ROWCOL bloom type, we will 061 * test the row-col bloom filter frequently for saving HDFS seek once we switch from one column to 062 * another in our UT. It's cpu time consuming (~45s for each case), so moved the ROWCOL case into a 063 * separated LargeTests to avoid timeout failure. <br> 064 * <br> 065 * To be clear: In TestMultiColumnScanner, we will flush 10 (NUM_FLUSHES=10) HFiles here, and the 066 * table will put ~1000 cells (rows=20, ts=6, qualifiers=8, total=20*6*8 ~ 1000) . Each full table 067 * scan will check the ROWCOL bloom filter 20 (rows)* 8 (column) * 10 (hfiles)= 1600 times, beside 068 * it will scan the full table 6*2^8=1536 times, so finally will have 1600*1536=2457600 bloom filter 069 * testing. (See HBASE-21520) 070 */ 071public abstract class TestMultiColumnScanner { 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestMultiColumnScanner.class); 074 075 private static final String TABLE_NAME = TestMultiColumnScanner.class.getSimpleName(); 076 077 static final int MAX_VERSIONS = 50; 078 079 private static final String FAMILY = "CF"; 080 private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY); 081 082 /** 083 * The size of the column qualifier set used. Increasing this parameter exponentially increases 084 * test time. 085 */ 086 private static final int NUM_COLUMNS = 8; 087 088 private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1; 089 private static final int NUM_FLUSHES = 10; 090 private static final int NUM_ROWS = 20; 091 092 /** A large value of type long for use as a timestamp */ 093 private static final long BIG_LONG = 9111222333444555666L; 094 095 /** 096 * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because it will be replaced by 097 * an timestamp auto-generated based on the time. 098 */ 099 private static final long[] TIMESTAMPS = 100 new long[] { 1, 3, 5, Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 }; 101 102 /** The probability that a column is skipped in a store file. */ 103 private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7; 104 105 /** The probability to delete a row/column pair */ 106 private static final double DELETE_PROBABILITY = 0.02; 107 108 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 109 110 private final Compression.Algorithm comprAlgo; 111 112 private final BloomType bloomType; 113 114 private final DataBlockEncoding dataBlockEncoding; 115 116 protected TestMultiColumnScanner(Compression.Algorithm comprAlgo, BloomType bloomType, 117 DataBlockEncoding dataBlockEncoding) { 118 this.comprAlgo = comprAlgo; 119 this.bloomType = bloomType; 120 this.dataBlockEncoding = dataBlockEncoding; 121 } 122 123 // Some static sanity-checking. 124 static { 125 assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos. 126 127 // Ensure TIMESTAMPS are sorted. 128 for (int i = 0; i < TIMESTAMPS.length - 1; ++i) 129 assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]); 130 } 131 132 public static Stream<Arguments> generateParams(Compression.Algorithm algo, 133 boolean useDataBlockEncoding) { 134 List<Arguments> parameters = new ArrayList<>(); 135 for (BloomType bloomType : BloomType.values()) { 136 DataBlockEncoding dataBlockEncoding = 137 useDataBlockEncoding ? DataBlockEncoding.PREFIX : DataBlockEncoding.NONE; 138 parameters.add(Arguments.of(algo, bloomType, dataBlockEncoding)); 139 } 140 return parameters.stream(); 141 } 142 143 @TestTemplate 144 public void testMultiColumnScanner() throws IOException { 145 TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); 146 HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME, 147 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_BYTES).setCompressionType(comprAlgo) 148 .setBloomFilterType(bloomType).setMaxVersions(MAX_VERSIONS) 149 .setDataBlockEncoding(dataBlockEncoding).build(), 150 BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration())); 151 List<String> rows = sequentialStrings("row", NUM_ROWS); 152 List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS); 153 List<KeyValue> kvs = new ArrayList<>(); 154 Set<String> keySet = new HashSet<>(); 155 156 // A map from <row>_<qualifier> to the most recent delete timestamp for 157 // that column. 158 Map<String, Long> lastDelTimeMap = new HashMap<>(); 159 160 Random rand = ThreadLocalRandom.current(); 161 for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) { 162 for (String qual : qualifiers) { 163 // This is where we decide to include or not include this column into 164 // this store file, regardless of row and timestamp. 165 if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB) continue; 166 167 byte[] qualBytes = Bytes.toBytes(qual); 168 for (String row : rows) { 169 Put p = new Put(Bytes.toBytes(row)); 170 for (long ts : TIMESTAMPS) { 171 String value = createValue(row, qual, ts); 172 KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, value); 173 assertEquals(kv.getTimestamp(), ts); 174 p.add(kv); 175 String keyAsString = kv.toString(); 176 if (!keySet.contains(keyAsString)) { 177 keySet.add(keyAsString); 178 kvs.add(kv); 179 } 180 } 181 region.put(p); 182 183 Delete d = new Delete(Bytes.toBytes(row)); 184 boolean deletedSomething = false; 185 for (long ts : TIMESTAMPS) 186 if (rand.nextDouble() < DELETE_PROBABILITY) { 187 d.addColumns(FAMILY_BYTES, qualBytes, ts); 188 String rowAndQual = row + "_" + qual; 189 Long whenDeleted = lastDelTimeMap.get(rowAndQual); 190 lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts : Math.max(ts, whenDeleted)); 191 deletedSomething = true; 192 } 193 if (deletedSomething) region.delete(d); 194 } 195 } 196 region.flush(true); 197 } 198 199 Collections.sort(kvs, CellComparatorImpl.COMPARATOR); 200 for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) { 201 for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) { 202 Scan scan = new Scan(); 203 scan.readVersions(maxVersions); 204 Set<String> qualSet = new TreeSet<>(); 205 { 206 int columnMaskTmp = columnBitMask; 207 for (String qual : qualifiers) { 208 if ((columnMaskTmp & 1) != 0) { 209 scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual)); 210 qualSet.add(qual); 211 } 212 columnMaskTmp >>= 1; 213 } 214 assertEquals(0, columnMaskTmp); 215 } 216 217 InternalScanner scanner = region.getScanner(scan); 218 List<ExtendedCell> results = new ArrayList<>(); 219 220 int kvPos = 0; 221 int numResults = 0; 222 String queryInfo = "columns queried: " + qualSet + " (columnBitMask=" + columnBitMask 223 + "), maxVersions=" + maxVersions; 224 225 while (scanner.next(results) || results.size() > 0) { 226 for (ExtendedCell kv : results) { 227 while ( 228 kvPos < kvs.size() 229 && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions, lastDelTimeMap) 230 ) { 231 ++kvPos; 232 } 233 String rowQual = getRowQualStr(kv); 234 String deleteInfo = ""; 235 Long lastDelTS = lastDelTimeMap.get(rowQual); 236 if (lastDelTS != null) { 237 deleteInfo = 238 "; last timestamp when row/column " + rowQual + " was deleted: " + lastDelTS; 239 } 240 assertTrue(kvPos < kvs.size(), 241 "Scanner returned additional key/value: " + kv + ", " + queryInfo + deleteInfo + ";"); 242 assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(kvs.get(kvPos), kv), 243 "Scanner returned wrong key/value; " + queryInfo + deleteInfo + ";"); 244 ++kvPos; 245 ++numResults; 246 } 247 results.clear(); 248 } 249 for (; kvPos < kvs.size(); ++kvPos) { 250 KeyValue remainingKV = kvs.get(kvPos); 251 assertFalse(matchesQuery(remainingKV, qualSet, maxVersions, lastDelTimeMap), 252 "Matching column not returned by scanner: " + remainingKV + ", " + queryInfo 253 + ", results returned: " + numResults); 254 } 255 } 256 } 257 assertTrue(lastDelTimeMap.size() > 0, 258 "This test is supposed to delete at least some row/column " + "pairs"); 259 LOG.info("Number of row/col pairs deleted at least once: " + lastDelTimeMap.size()); 260 HBaseTestingUtil.closeRegionAndWAL(region); 261 } 262 263 private static String getRowQualStr(Cell kv) { 264 String rowStr = Bytes.toString(CellUtil.cloneRow(kv)); 265 String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv)); 266 return rowStr + "_" + qualStr; 267 } 268 269 private static boolean matchesQuery(KeyValue kv, Set<String> qualSet, int maxVersions, 270 Map<String, Long> lastDelTimeMap) { 271 Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv)); 272 long ts = kv.getTimestamp(); 273 return qualSet.contains(qualStr(kv)) && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions] 274 && (lastDelTS == null || ts > lastDelTS); 275 } 276 277 private static String qualStr(KeyValue kv) { 278 return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()); 279 } 280 281 static String createValue(String row, String qual, long ts) { 282 return "value_for_" + row + "_" + qual + "_" + ts; 283 } 284 285 private static List<String> sequentialStrings(String prefix, int n) { 286 List<String> lst = new ArrayList<>(); 287 for (int i = 0; i < n; ++i) { 288 StringBuilder sb = new StringBuilder(); 289 sb.append(prefix + i); 290 291 // Make column length depend on i. 292 int iBitShifted = i; 293 while (iBitShifted != 0) { 294 sb.append((iBitShifted & 1) == 0 ? 'a' : 'b'); 295 iBitShifted >>= 1; 296 } 297 298 lst.add(sb.toString()); 299 } 300 return lst; 301 } 302}