001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.TreeSet; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellComparatorImpl; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.KeyValue; 040import org.apache.hadoop.hbase.KeyValueTestUtil; 041import org.apache.hadoop.hbase.PrivateCellUtil; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.io.compress.Compression; 047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 048import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 049import org.apache.hadoop.hbase.util.BloomFilterUtil; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.junit.Test; 052import org.junit.runners.Parameterized.Parameter; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Tests optimized scanning of multiple columns. <br> 058 * We separated the big test into several sub-class UT, because When in ROWCOL bloom type, we will 059 * test the row-col bloom filter frequently for saving HDFS seek once we switch from one column to 060 * another in our UT. It's cpu time consuming (~45s for each case), so moved the ROWCOL case into a 061 * separated LargeTests to avoid timeout failure. <br> 062 * <br> 063 * To be clear: In TestMultiColumnScanner, we will flush 10 (NUM_FLUSHES=10) HFiles here, and the 064 * table will put ~1000 cells (rows=20, ts=6, qualifiers=8, total=20*6*8 ~ 1000) . Each full table 065 * scan will check the ROWCOL bloom filter 20 (rows)* 8 (column) * 10 (hfiles)= 1600 times, beside 066 * it will scan the full table 6*2^8=1536 times, so finally will have 1600*1536=2457600 bloom filter 067 * testing. (See HBASE-21520) 068 */ 069public abstract class TestMultiColumnScanner { 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestMultiColumnScanner.class); 072 073 private static final String TABLE_NAME = 074 TestMultiColumnScanner.class.getSimpleName(); 075 076 static final int MAX_VERSIONS = 50; 077 078 private static final String FAMILY = "CF"; 079 private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY); 080 081 /** 082 * The size of the column qualifier set used. Increasing this parameter 083 * exponentially increases test time. 084 */ 085 private static final int NUM_COLUMNS = 8; 086 087 private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1; 088 private static final int NUM_FLUSHES = 10; 089 private static final int NUM_ROWS = 20; 090 091 /** A large value of type long for use as a timestamp */ 092 private static final long BIG_LONG = 9111222333444555666L; 093 094 /** 095 * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because 096 * it will be replaced by an timestamp auto-generated based on the time. 097 */ 098 private static final long[] TIMESTAMPS = new long[] { 1, 3, 5, 099 Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 }; 100 101 /** The probability that a column is skipped in a store file. */ 102 private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7; 103 104 /** The probability to delete a row/column pair */ 105 private static final double DELETE_PROBABILITY = 0.02; 106 107 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 108 109 @Parameter(0) 110 public Compression.Algorithm comprAlgo; 111 112 @Parameter(1) 113 public BloomType bloomType; 114 115 @Parameter(2) 116 public DataBlockEncoding dataBlockEncoding; 117 118 // Some static sanity-checking. 119 static { 120 assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos. 121 122 // Ensure TIMESTAMPS are sorted. 123 for (int i = 0; i < TIMESTAMPS.length - 1; ++i) 124 assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]); 125 } 126 127 public static Collection<Object[]> generateParams(Compression.Algorithm algo, 128 boolean useDataBlockEncoding) { 129 List<Object[]> parameters = new ArrayList<>(); 130 for (BloomType bloomType : BloomType.values()) { 131 DataBlockEncoding dataBlockEncoding = 132 useDataBlockEncoding ? DataBlockEncoding.PREFIX : DataBlockEncoding.NONE; 133 parameters.add(new Object[] { algo, bloomType, dataBlockEncoding }); 134 } 135 return parameters; 136 } 137 138 @Test 139 public void testMultiColumnScanner() throws IOException { 140 TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10); 141 HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME, 142 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_BYTES).setCompressionType(comprAlgo) 143 .setBloomFilterType(bloomType).setMaxVersions(MAX_VERSIONS) 144 .setDataBlockEncoding(dataBlockEncoding).build(), 145 BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration())); 146 List<String> rows = sequentialStrings("row", NUM_ROWS); 147 List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS); 148 List<KeyValue> kvs = new ArrayList<>(); 149 Set<String> keySet = new HashSet<>(); 150 151 // A map from <row>_<qualifier> to the most recent delete timestamp for 152 // that column. 153 Map<String, Long> lastDelTimeMap = new HashMap<>(); 154 155 Random rand = new Random(29372937L); 156 157 for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) { 158 for (String qual : qualifiers) { 159 // This is where we decide to include or not include this column into 160 // this store file, regardless of row and timestamp. 161 if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB) 162 continue; 163 164 byte[] qualBytes = Bytes.toBytes(qual); 165 for (String row : rows) { 166 Put p = new Put(Bytes.toBytes(row)); 167 for (long ts : TIMESTAMPS) { 168 String value = createValue(row, qual, ts); 169 KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, 170 value); 171 assertEquals(kv.getTimestamp(), ts); 172 p.add(kv); 173 String keyAsString = kv.toString(); 174 if (!keySet.contains(keyAsString)) { 175 keySet.add(keyAsString); 176 kvs.add(kv); 177 } 178 } 179 region.put(p); 180 181 Delete d = new Delete(Bytes.toBytes(row)); 182 boolean deletedSomething = false; 183 for (long ts : TIMESTAMPS) 184 if (rand.nextDouble() < DELETE_PROBABILITY) { 185 d.addColumns(FAMILY_BYTES, qualBytes, ts); 186 String rowAndQual = row + "_" + qual; 187 Long whenDeleted = lastDelTimeMap.get(rowAndQual); 188 lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts 189 : Math.max(ts, whenDeleted)); 190 deletedSomething = true; 191 } 192 if (deletedSomething) 193 region.delete(d); 194 } 195 } 196 region.flush(true); 197 } 198 199 Collections.sort(kvs, CellComparatorImpl.COMPARATOR); 200 for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) { 201 for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) { 202 Scan scan = new Scan(); 203 scan.readVersions(maxVersions); 204 Set<String> qualSet = new TreeSet<>(); 205 { 206 int columnMaskTmp = columnBitMask; 207 for (String qual : qualifiers) { 208 if ((columnMaskTmp & 1) != 0) { 209 scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual)); 210 qualSet.add(qual); 211 } 212 columnMaskTmp >>= 1; 213 } 214 assertEquals(0, columnMaskTmp); 215 } 216 217 InternalScanner scanner = region.getScanner(scan); 218 List<Cell> results = new ArrayList<>(); 219 220 int kvPos = 0; 221 int numResults = 0; 222 String queryInfo = "columns queried: " + qualSet + " (columnBitMask=" 223 + columnBitMask + "), maxVersions=" + maxVersions; 224 225 while (scanner.next(results) || results.size() > 0) { 226 for (Cell kv : results) { 227 while (kvPos < kvs.size() 228 && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions, 229 lastDelTimeMap)) { 230 ++kvPos; 231 } 232 String rowQual = getRowQualStr(kv); 233 String deleteInfo = ""; 234 Long lastDelTS = lastDelTimeMap.get(rowQual); 235 if (lastDelTS != null) { 236 deleteInfo = "; last timestamp when row/column " + rowQual 237 + " was deleted: " + lastDelTS; 238 } 239 assertTrue("Scanner returned additional key/value: " + kv + ", " 240 + queryInfo + deleteInfo + ";", kvPos < kvs.size()); 241 assertTrue("Scanner returned wrong key/value; " + queryInfo + deleteInfo + ";", 242 PrivateCellUtil.equalsIgnoreMvccVersion(kvs.get(kvPos), (kv))); 243 ++kvPos; 244 ++numResults; 245 } 246 results.clear(); 247 } 248 for (; kvPos < kvs.size(); ++kvPos) { 249 KeyValue remainingKV = kvs.get(kvPos); 250 assertFalse("Matching column not returned by scanner: " 251 + remainingKV + ", " + queryInfo + ", results returned: " 252 + numResults, matchesQuery(remainingKV, qualSet, maxVersions, 253 lastDelTimeMap)); 254 } 255 } 256 } 257 assertTrue("This test is supposed to delete at least some row/column " + 258 "pairs", lastDelTimeMap.size() > 0); 259 LOG.info("Number of row/col pairs deleted at least once: " + 260 lastDelTimeMap.size()); 261 HBaseTestingUtility.closeRegionAndWAL(region); 262 } 263 264 private static String getRowQualStr(Cell kv) { 265 String rowStr = Bytes.toString(CellUtil.cloneRow(kv)); 266 String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv)); 267 return rowStr + "_" + qualStr; 268 } 269 270 private static boolean matchesQuery(KeyValue kv, Set<String> qualSet, 271 int maxVersions, Map<String, Long> lastDelTimeMap) { 272 Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv)); 273 long ts = kv.getTimestamp(); 274 return qualSet.contains(qualStr(kv)) 275 && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions] 276 && (lastDelTS == null || ts > lastDelTS); 277 } 278 279 private static String qualStr(KeyValue kv) { 280 return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), 281 kv.getQualifierLength()); 282 } 283 284 static String createValue(String row, String qual, long ts) { 285 return "value_for_" + row + "_" + qual + "_" + ts; 286 } 287 288 private static List<String> sequentialStrings(String prefix, int n) { 289 List<String> lst = new ArrayList<>(); 290 for (int i = 0; i < n; ++i) { 291 StringBuilder sb = new StringBuilder(); 292 sb.append(prefix + i); 293 294 // Make column length depend on i. 295 int iBitShifted = i; 296 while (iBitShifted != 0) { 297 sb.append((iBitShifted & 1) == 0 ? 'a' : 'b'); 298 iBitShifted >>= 1; 299 } 300 301 lst.add(sb.toString()); 302 } 303 return lst; 304 } 305} 306