001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.Set;
033import java.util.TreeSet;
034import java.util.concurrent.ThreadLocalRandom;
035import java.util.stream.Stream;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellComparatorImpl;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.ExtendedCell;
040import org.apache.hadoop.hbase.HBaseTestingUtil;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValueTestUtil;
043import org.apache.hadoop.hbase.PrivateCellUtil;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Delete;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.io.compress.Compression;
049import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
050import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
051import org.apache.hadoop.hbase.util.BloomFilterUtil;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.jupiter.api.TestTemplate;
054import org.junit.jupiter.params.provider.Arguments;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Tests optimized scanning of multiple columns. <br>
060 * We separated the big test into several sub-class UT, because When in ROWCOL bloom type, we will
061 * test the row-col bloom filter frequently for saving HDFS seek once we switch from one column to
062 * another in our UT. It's cpu time consuming (~45s for each case), so moved the ROWCOL case into a
063 * separated LargeTests to avoid timeout failure. <br>
064 * <br>
065 * To be clear: In TestMultiColumnScanner, we will flush 10 (NUM_FLUSHES=10) HFiles here, and the
066 * table will put ~1000 cells (rows=20, ts=6, qualifiers=8, total=20*6*8 ~ 1000) . Each full table
067 * scan will check the ROWCOL bloom filter 20 (rows)* 8 (column) * 10 (hfiles)= 1600 times, beside
068 * it will scan the full table 6*2^8=1536 times, so finally will have 1600*1536=2457600 bloom filter
069 * testing. (See HBASE-21520)
070 */
071public abstract class TestMultiColumnScanner {
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestMultiColumnScanner.class);
074
075  private static final String TABLE_NAME = TestMultiColumnScanner.class.getSimpleName();
076
077  static final int MAX_VERSIONS = 50;
078
079  private static final String FAMILY = "CF";
080  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
081
082  /**
083   * The size of the column qualifier set used. Increasing this parameter exponentially increases
084   * test time.
085   */
086  private static final int NUM_COLUMNS = 8;
087
088  private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1;
089  private static final int NUM_FLUSHES = 10;
090  private static final int NUM_ROWS = 20;
091
092  /** A large value of type long for use as a timestamp */
093  private static final long BIG_LONG = 9111222333444555666L;
094
095  /**
096   * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because it will be replaced by
097   * an timestamp auto-generated based on the time.
098   */
099  private static final long[] TIMESTAMPS =
100    new long[] { 1, 3, 5, Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 };
101
102  /** The probability that a column is skipped in a store file. */
103  private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7;
104
105  /** The probability to delete a row/column pair */
106  private static final double DELETE_PROBABILITY = 0.02;
107
108  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
109
110  private final Compression.Algorithm comprAlgo;
111
112  private final BloomType bloomType;
113
114  private final DataBlockEncoding dataBlockEncoding;
115
116  protected TestMultiColumnScanner(Compression.Algorithm comprAlgo, BloomType bloomType,
117    DataBlockEncoding dataBlockEncoding) {
118    this.comprAlgo = comprAlgo;
119    this.bloomType = bloomType;
120    this.dataBlockEncoding = dataBlockEncoding;
121  }
122
123  // Some static sanity-checking.
124  static {
125    assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos.
126
127    // Ensure TIMESTAMPS are sorted.
128    for (int i = 0; i < TIMESTAMPS.length - 1; ++i)
129      assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]);
130  }
131
132  public static Stream<Arguments> generateParams(Compression.Algorithm algo,
133    boolean useDataBlockEncoding) {
134    List<Arguments> parameters = new ArrayList<>();
135    for (BloomType bloomType : BloomType.values()) {
136      DataBlockEncoding dataBlockEncoding =
137        useDataBlockEncoding ? DataBlockEncoding.PREFIX : DataBlockEncoding.NONE;
138      parameters.add(Arguments.of(algo, bloomType, dataBlockEncoding));
139    }
140    return parameters.stream();
141  }
142
143  @TestTemplate
144  public void testMultiColumnScanner() throws IOException {
145    TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
146    HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME,
147      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_BYTES).setCompressionType(comprAlgo)
148        .setBloomFilterType(bloomType).setMaxVersions(MAX_VERSIONS)
149        .setDataBlockEncoding(dataBlockEncoding).build(),
150      BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration()));
151    List<String> rows = sequentialStrings("row", NUM_ROWS);
152    List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
153    List<KeyValue> kvs = new ArrayList<>();
154    Set<String> keySet = new HashSet<>();
155
156    // A map from <row>_<qualifier> to the most recent delete timestamp for
157    // that column.
158    Map<String, Long> lastDelTimeMap = new HashMap<>();
159
160    Random rand = ThreadLocalRandom.current();
161    for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
162      for (String qual : qualifiers) {
163        // This is where we decide to include or not include this column into
164        // this store file, regardless of row and timestamp.
165        if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB) continue;
166
167        byte[] qualBytes = Bytes.toBytes(qual);
168        for (String row : rows) {
169          Put p = new Put(Bytes.toBytes(row));
170          for (long ts : TIMESTAMPS) {
171            String value = createValue(row, qual, ts);
172            KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, value);
173            assertEquals(kv.getTimestamp(), ts);
174            p.add(kv);
175            String keyAsString = kv.toString();
176            if (!keySet.contains(keyAsString)) {
177              keySet.add(keyAsString);
178              kvs.add(kv);
179            }
180          }
181          region.put(p);
182
183          Delete d = new Delete(Bytes.toBytes(row));
184          boolean deletedSomething = false;
185          for (long ts : TIMESTAMPS)
186            if (rand.nextDouble() < DELETE_PROBABILITY) {
187              d.addColumns(FAMILY_BYTES, qualBytes, ts);
188              String rowAndQual = row + "_" + qual;
189              Long whenDeleted = lastDelTimeMap.get(rowAndQual);
190              lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts : Math.max(ts, whenDeleted));
191              deletedSomething = true;
192            }
193          if (deletedSomething) region.delete(d);
194        }
195      }
196      region.flush(true);
197    }
198
199    Collections.sort(kvs, CellComparatorImpl.COMPARATOR);
200    for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) {
201      for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) {
202        Scan scan = new Scan();
203        scan.readVersions(maxVersions);
204        Set<String> qualSet = new TreeSet<>();
205        {
206          int columnMaskTmp = columnBitMask;
207          for (String qual : qualifiers) {
208            if ((columnMaskTmp & 1) != 0) {
209              scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual));
210              qualSet.add(qual);
211            }
212            columnMaskTmp >>= 1;
213          }
214          assertEquals(0, columnMaskTmp);
215        }
216
217        InternalScanner scanner = region.getScanner(scan);
218        List<ExtendedCell> results = new ArrayList<>();
219
220        int kvPos = 0;
221        int numResults = 0;
222        String queryInfo = "columns queried: " + qualSet + " (columnBitMask=" + columnBitMask
223          + "), maxVersions=" + maxVersions;
224
225        while (scanner.next(results) || results.size() > 0) {
226          for (ExtendedCell kv : results) {
227            while (
228              kvPos < kvs.size()
229                && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions, lastDelTimeMap)
230            ) {
231              ++kvPos;
232            }
233            String rowQual = getRowQualStr(kv);
234            String deleteInfo = "";
235            Long lastDelTS = lastDelTimeMap.get(rowQual);
236            if (lastDelTS != null) {
237              deleteInfo =
238                "; last timestamp when row/column " + rowQual + " was deleted: " + lastDelTS;
239            }
240            assertTrue(kvPos < kvs.size(),
241              "Scanner returned additional key/value: " + kv + ", " + queryInfo + deleteInfo + ";");
242            assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(kvs.get(kvPos), kv),
243              "Scanner returned wrong key/value; " + queryInfo + deleteInfo + ";");
244            ++kvPos;
245            ++numResults;
246          }
247          results.clear();
248        }
249        for (; kvPos < kvs.size(); ++kvPos) {
250          KeyValue remainingKV = kvs.get(kvPos);
251          assertFalse(matchesQuery(remainingKV, qualSet, maxVersions, lastDelTimeMap),
252            "Matching column not returned by scanner: " + remainingKV + ", " + queryInfo
253              + ", results returned: " + numResults);
254        }
255      }
256    }
257    assertTrue(lastDelTimeMap.size() > 0,
258      "This test is supposed to delete at least some row/column " + "pairs");
259    LOG.info("Number of row/col pairs deleted at least once: " + lastDelTimeMap.size());
260    HBaseTestingUtil.closeRegionAndWAL(region);
261  }
262
263  private static String getRowQualStr(Cell kv) {
264    String rowStr = Bytes.toString(CellUtil.cloneRow(kv));
265    String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv));
266    return rowStr + "_" + qualStr;
267  }
268
269  private static boolean matchesQuery(KeyValue kv, Set<String> qualSet, int maxVersions,
270    Map<String, Long> lastDelTimeMap) {
271    Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv));
272    long ts = kv.getTimestamp();
273    return qualSet.contains(qualStr(kv)) && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions]
274      && (lastDelTS == null || ts > lastDelTS);
275  }
276
277  private static String qualStr(KeyValue kv) {
278    return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
279  }
280
281  static String createValue(String row, String qual, long ts) {
282    return "value_for_" + row + "_" + qual + "_" + ts;
283  }
284
285  private static List<String> sequentialStrings(String prefix, int n) {
286    List<String> lst = new ArrayList<>();
287    for (int i = 0; i < n; ++i) {
288      StringBuilder sb = new StringBuilder();
289      sb.append(prefix + i);
290
291      // Make column length depend on i.
292      int iBitShifted = i;
293      while (iBitShifted != 0) {
294        sb.append((iBitShifted & 1) == 0 ? 'a' : 'b');
295        iBitShifted >>= 1;
296      }
297
298      lst.add(sb.toString());
299    }
300    return lst;
301  }
302}