001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.TreeSet; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.CellComparatorImpl; 037import org.apache.hadoop.hbase.CellUtil; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HColumnDescriptor; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.KeyValueTestUtil; 042import org.apache.hadoop.hbase.PrivateCellUtil; 043import org.apache.hadoop.hbase.client.Delete; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.Scan; 046import org.apache.hadoop.hbase.io.compress.Compression; 047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.junit.Test; 050import org.junit.runners.Parameterized.Parameter; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Tests optimized scanning of multiple columns. <br> 056 * We separated the big test into several sub-class UT, because When in ROWCOL bloom type, we will 057 * test the row-col bloom filter frequently for saving HDFS seek once we switch from one column to 058 * another in our UT. It's cpu time consuming (~45s for each case), so moved the ROWCOL case into a 059 * separated LargeTests to avoid timeout failure. <br> 060 * <br> 061 * To be clear: In TestMultiColumnScanner, we will flush 10 (NUM_FLUSHES=10) HFiles here, and the 062 * table will put ~1000 cells (rows=20, ts=6, qualifiers=8, total=20*6*8 ~ 1000) . Each full table 063 * scan will check the ROWCOL bloom filter 20 (rows)* 8 (column) * 10 (hfiles)= 1600 times, beside 064 * it will scan the full table 6*2^8=1536 times, so finally will have 1600*1536=2457600 bloom filter 065 * testing. (See HBASE-21520) 066 */ 067public abstract class TestMultiColumnScanner { 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestMultiColumnScanner.class); 070 071 private static final String TABLE_NAME = 072 TestMultiColumnScanner.class.getSimpleName(); 073 074 static final int MAX_VERSIONS = 50; 075 076 private static final String FAMILY = "CF"; 077 private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY); 078 079 /** 080 * The size of the column qualifier set used. Increasing this parameter 081 * exponentially increases test time. 082 */ 083 private static final int NUM_COLUMNS = 8; 084 085 private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1; 086 private static final int NUM_FLUSHES = 10; 087 private static final int NUM_ROWS = 20; 088 089 /** A large value of type long for use as a timestamp */ 090 private static final long BIG_LONG = 9111222333444555666L; 091 092 /** 093 * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because 094 * it will be replaced by an timestamp auto-generated based on the time. 095 */ 096 private static final long[] TIMESTAMPS = new long[] { 1, 3, 5, 097 Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 }; 098 099 /** The probability that a column is skipped in a store file. */ 100 private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7; 101 102 /** The probability to delete a row/column pair */ 103 private static final double DELETE_PROBABILITY = 0.02; 104 105 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); 106 107 @Parameter(0) 108 public Compression.Algorithm comprAlgo; 109 110 @Parameter(1) 111 public BloomType bloomType; 112 113 @Parameter(2) 114 public DataBlockEncoding dataBlockEncoding; 115 116 // Some static sanity-checking. 117 static { 118 assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos. 119 120 // Ensure TIMESTAMPS are sorted. 121 for (int i = 0; i < TIMESTAMPS.length - 1; ++i) 122 assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]); 123 } 124 125 public static Collection<Object[]> generateParams(Compression.Algorithm algo, 126 boolean useDataBlockEncoding) { 127 List<Object[]> parameters = new ArrayList<>(); 128 for (BloomType bloomType : BloomType.values()) { 129 DataBlockEncoding dataBlockEncoding = 130 useDataBlockEncoding ? DataBlockEncoding.PREFIX : DataBlockEncoding.NONE; 131 parameters.add(new Object[] { algo, bloomType, dataBlockEncoding }); 132 } 133 return parameters; 134 } 135 136 @Test 137 public void testMultiColumnScanner() throws IOException { 138 HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME, 139 new HColumnDescriptor(FAMILY) 140 .setCompressionType(comprAlgo) 141 .setBloomFilterType(bloomType) 142 .setMaxVersions(MAX_VERSIONS) 143 .setDataBlockEncoding(dataBlockEncoding) 144 ); 145 List<String> rows = sequentialStrings("row", NUM_ROWS); 146 List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS); 147 List<KeyValue> kvs = new ArrayList<>(); 148 Set<String> keySet = new HashSet<>(); 149 150 // A map from <row>_<qualifier> to the most recent delete timestamp for 151 // that column. 152 Map<String, Long> lastDelTimeMap = new HashMap<>(); 153 154 Random rand = new Random(29372937L); 155 156 for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) { 157 for (String qual : qualifiers) { 158 // This is where we decide to include or not include this column into 159 // this store file, regardless of row and timestamp. 160 if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB) 161 continue; 162 163 byte[] qualBytes = Bytes.toBytes(qual); 164 for (String row : rows) { 165 Put p = new Put(Bytes.toBytes(row)); 166 for (long ts : TIMESTAMPS) { 167 String value = createValue(row, qual, ts); 168 KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, 169 value); 170 assertEquals(kv.getTimestamp(), ts); 171 p.add(kv); 172 String keyAsString = kv.toString(); 173 if (!keySet.contains(keyAsString)) { 174 keySet.add(keyAsString); 175 kvs.add(kv); 176 } 177 } 178 region.put(p); 179 180 Delete d = new Delete(Bytes.toBytes(row)); 181 boolean deletedSomething = false; 182 for (long ts : TIMESTAMPS) 183 if (rand.nextDouble() < DELETE_PROBABILITY) { 184 d.addColumns(FAMILY_BYTES, qualBytes, ts); 185 String rowAndQual = row + "_" + qual; 186 Long whenDeleted = lastDelTimeMap.get(rowAndQual); 187 lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts 188 : Math.max(ts, whenDeleted)); 189 deletedSomething = true; 190 } 191 if (deletedSomething) 192 region.delete(d); 193 } 194 } 195 region.flush(true); 196 } 197 198 Collections.sort(kvs, CellComparatorImpl.COMPARATOR); 199 for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) { 200 for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) { 201 Scan scan = new Scan(); 202 scan.setMaxVersions(maxVersions); 203 Set<String> qualSet = new TreeSet<>(); 204 { 205 int columnMaskTmp = columnBitMask; 206 for (String qual : qualifiers) { 207 if ((columnMaskTmp & 1) != 0) { 208 scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual)); 209 qualSet.add(qual); 210 } 211 columnMaskTmp >>= 1; 212 } 213 assertEquals(0, columnMaskTmp); 214 } 215 216 InternalScanner scanner = region.getScanner(scan); 217 List<Cell> results = new ArrayList<>(); 218 219 int kvPos = 0; 220 int numResults = 0; 221 String queryInfo = "columns queried: " + qualSet + " (columnBitMask=" 222 + columnBitMask + "), maxVersions=" + maxVersions; 223 224 while (scanner.next(results) || results.size() > 0) { 225 for (Cell kv : results) { 226 while (kvPos < kvs.size() 227 && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions, 228 lastDelTimeMap)) { 229 ++kvPos; 230 } 231 String rowQual = getRowQualStr(kv); 232 String deleteInfo = ""; 233 Long lastDelTS = lastDelTimeMap.get(rowQual); 234 if (lastDelTS != null) { 235 deleteInfo = "; last timestamp when row/column " + rowQual 236 + " was deleted: " + lastDelTS; 237 } 238 assertTrue("Scanner returned additional key/value: " + kv + ", " 239 + queryInfo + deleteInfo + ";", kvPos < kvs.size()); 240 assertTrue("Scanner returned wrong key/value; " + queryInfo + deleteInfo + ";", 241 PrivateCellUtil.equalsIgnoreMvccVersion(kvs.get(kvPos), (kv))); 242 ++kvPos; 243 ++numResults; 244 } 245 results.clear(); 246 } 247 for (; kvPos < kvs.size(); ++kvPos) { 248 KeyValue remainingKV = kvs.get(kvPos); 249 assertFalse("Matching column not returned by scanner: " 250 + remainingKV + ", " + queryInfo + ", results returned: " 251 + numResults, matchesQuery(remainingKV, qualSet, maxVersions, 252 lastDelTimeMap)); 253 } 254 } 255 } 256 assertTrue("This test is supposed to delete at least some row/column " + 257 "pairs", lastDelTimeMap.size() > 0); 258 LOG.info("Number of row/col pairs deleted at least once: " + 259 lastDelTimeMap.size()); 260 HBaseTestingUtility.closeRegionAndWAL(region); 261 } 262 263 private static String getRowQualStr(Cell kv) { 264 String rowStr = Bytes.toString(CellUtil.cloneRow(kv)); 265 String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv)); 266 return rowStr + "_" + qualStr; 267 } 268 269 private static boolean matchesQuery(KeyValue kv, Set<String> qualSet, 270 int maxVersions, Map<String, Long> lastDelTimeMap) { 271 Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv)); 272 long ts = kv.getTimestamp(); 273 return qualSet.contains(qualStr(kv)) 274 && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions] 275 && (lastDelTS == null || ts > lastDelTS); 276 } 277 278 private static String qualStr(KeyValue kv) { 279 return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), 280 kv.getQualifierLength()); 281 } 282 283 static String createValue(String row, String qual, long ts) { 284 return "value_for_" + row + "_" + qual + "_" + ts; 285 } 286 287 private static List<String> sequentialStrings(String prefix, int n) { 288 List<String> lst = new ArrayList<>(); 289 for (int i = 0; i < n; ++i) { 290 StringBuilder sb = new StringBuilder(); 291 sb.append(prefix + i); 292 293 // Make column length depend on i. 294 int iBitShifted = i; 295 while (iBitShifted != 0) { 296 sb.append((iBitShifted & 1) == 0 ? 'a' : 'b'); 297 iBitShifted >>= 1; 298 } 299 300 lst.add(sb.toString()); 301 } 302 return lst; 303 } 304} 305