001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.TreeSet;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellComparatorImpl;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HColumnDescriptor;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.KeyValueTestUtil;
042import org.apache.hadoop.hbase.PrivateCellUtil;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.io.compress.Compression;
047import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.Test;
050import org.junit.runners.Parameterized.Parameter;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Tests optimized scanning of multiple columns. <br>
056 * We separated the big test into several sub-class UT, because When in ROWCOL bloom type, we will
057 * test the row-col bloom filter frequently for saving HDFS seek once we switch from one column to
058 * another in our UT. It's cpu time consuming (~45s for each case), so moved the ROWCOL case into a
059 * separated LargeTests to avoid timeout failure. <br>
060 * <br>
061 * To be clear: In TestMultiColumnScanner, we will flush 10 (NUM_FLUSHES=10) HFiles here, and the
062 * table will put ~1000 cells (rows=20, ts=6, qualifiers=8, total=20*6*8 ~ 1000) . Each full table
063 * scan will check the ROWCOL bloom filter 20 (rows)* 8 (column) * 10 (hfiles)= 1600 times, beside
064 * it will scan the full table 6*2^8=1536 times, so finally will have 1600*1536=2457600 bloom filter
065 * testing. (See HBASE-21520)
066 */
067public abstract class TestMultiColumnScanner {
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestMultiColumnScanner.class);
070
071  private static final String TABLE_NAME =
072      TestMultiColumnScanner.class.getSimpleName();
073
074  static final int MAX_VERSIONS = 50;
075
076  private static final String FAMILY = "CF";
077  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
078
079  /**
080   * The size of the column qualifier set used. Increasing this parameter
081   * exponentially increases test time.
082   */
083  private static final int NUM_COLUMNS = 8;
084
085  private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1;
086  private static final int NUM_FLUSHES = 10;
087  private static final int NUM_ROWS = 20;
088
089  /** A large value of type long for use as a timestamp */
090  private static final long BIG_LONG = 9111222333444555666L;
091
092  /**
093   * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because
094   * it will be replaced by an timestamp auto-generated based on the time.
095   */
096  private static final long[] TIMESTAMPS = new long[] { 1, 3, 5,
097      Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 };
098
099  /** The probability that a column is skipped in a store file. */
100  private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7;
101
102  /** The probability to delete a row/column pair */
103  private static final double DELETE_PROBABILITY = 0.02;
104
105  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
106
107  @Parameter(0)
108  public Compression.Algorithm comprAlgo;
109
110  @Parameter(1)
111  public BloomType bloomType;
112
113  @Parameter(2)
114  public DataBlockEncoding dataBlockEncoding;
115
116  // Some static sanity-checking.
117  static {
118    assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos.
119
120    // Ensure TIMESTAMPS are sorted.
121    for (int i = 0; i < TIMESTAMPS.length - 1; ++i)
122      assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]);
123  }
124
125  public static Collection<Object[]> generateParams(Compression.Algorithm algo,
126      boolean useDataBlockEncoding) {
127    List<Object[]> parameters = new ArrayList<>();
128    for (BloomType bloomType : BloomType.values()) {
129      DataBlockEncoding dataBlockEncoding =
130          useDataBlockEncoding ? DataBlockEncoding.PREFIX : DataBlockEncoding.NONE;
131      parameters.add(new Object[] { algo, bloomType, dataBlockEncoding });
132    }
133    return parameters;
134  }
135
136  @Test
137  public void testMultiColumnScanner() throws IOException {
138    HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME,
139        new HColumnDescriptor(FAMILY)
140            .setCompressionType(comprAlgo)
141            .setBloomFilterType(bloomType)
142            .setMaxVersions(MAX_VERSIONS)
143            .setDataBlockEncoding(dataBlockEncoding)
144    );
145    List<String> rows = sequentialStrings("row", NUM_ROWS);
146    List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
147    List<KeyValue> kvs = new ArrayList<>();
148    Set<String> keySet = new HashSet<>();
149
150    // A map from <row>_<qualifier> to the most recent delete timestamp for
151    // that column.
152    Map<String, Long> lastDelTimeMap = new HashMap<>();
153
154    Random rand = new Random(29372937L);
155
156    for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
157      for (String qual : qualifiers) {
158        // This is where we decide to include or not include this column into
159        // this store file, regardless of row and timestamp.
160        if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB)
161          continue;
162
163        byte[] qualBytes = Bytes.toBytes(qual);
164        for (String row : rows) {
165          Put p = new Put(Bytes.toBytes(row));
166          for (long ts : TIMESTAMPS) {
167            String value = createValue(row, qual, ts);
168            KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts,
169                value);
170            assertEquals(kv.getTimestamp(), ts);
171            p.add(kv);
172            String keyAsString = kv.toString();
173            if (!keySet.contains(keyAsString)) {
174              keySet.add(keyAsString);
175              kvs.add(kv);
176            }
177          }
178          region.put(p);
179
180          Delete d = new Delete(Bytes.toBytes(row));
181          boolean deletedSomething = false;
182          for (long ts : TIMESTAMPS)
183            if (rand.nextDouble() < DELETE_PROBABILITY) {
184              d.addColumns(FAMILY_BYTES, qualBytes, ts);
185              String rowAndQual = row + "_" + qual;
186              Long whenDeleted = lastDelTimeMap.get(rowAndQual);
187              lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts
188                  : Math.max(ts, whenDeleted));
189              deletedSomething = true;
190            }
191          if (deletedSomething)
192            region.delete(d);
193        }
194      }
195      region.flush(true);
196    }
197
198    Collections.sort(kvs, CellComparatorImpl.COMPARATOR);
199    for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) {
200      for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) {
201        Scan scan = new Scan();
202        scan.setMaxVersions(maxVersions);
203        Set<String> qualSet = new TreeSet<>();
204        {
205          int columnMaskTmp = columnBitMask;
206          for (String qual : qualifiers) {
207            if ((columnMaskTmp & 1) != 0) {
208              scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual));
209              qualSet.add(qual);
210            }
211            columnMaskTmp >>= 1;
212          }
213          assertEquals(0, columnMaskTmp);
214        }
215
216        InternalScanner scanner = region.getScanner(scan);
217        List<Cell> results = new ArrayList<>();
218
219        int kvPos = 0;
220        int numResults = 0;
221        String queryInfo = "columns queried: " + qualSet + " (columnBitMask="
222            + columnBitMask + "), maxVersions=" + maxVersions;
223
224        while (scanner.next(results) || results.size() > 0) {
225          for (Cell kv : results) {
226            while (kvPos < kvs.size()
227                && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions,
228                    lastDelTimeMap)) {
229              ++kvPos;
230            }
231            String rowQual = getRowQualStr(kv);
232            String deleteInfo = "";
233            Long lastDelTS = lastDelTimeMap.get(rowQual);
234            if (lastDelTS != null) {
235              deleteInfo = "; last timestamp when row/column " + rowQual
236                  + " was deleted: " + lastDelTS;
237            }
238            assertTrue("Scanner returned additional key/value: " + kv + ", "
239                + queryInfo + deleteInfo + ";", kvPos < kvs.size());
240            assertTrue("Scanner returned wrong key/value; " + queryInfo + deleteInfo + ";",
241              PrivateCellUtil.equalsIgnoreMvccVersion(kvs.get(kvPos), (kv)));
242            ++kvPos;
243            ++numResults;
244          }
245          results.clear();
246        }
247        for (; kvPos < kvs.size(); ++kvPos) {
248          KeyValue remainingKV = kvs.get(kvPos);
249          assertFalse("Matching column not returned by scanner: "
250              + remainingKV + ", " + queryInfo + ", results returned: "
251              + numResults, matchesQuery(remainingKV, qualSet, maxVersions,
252              lastDelTimeMap));
253        }
254      }
255    }
256    assertTrue("This test is supposed to delete at least some row/column " +
257        "pairs", lastDelTimeMap.size() > 0);
258    LOG.info("Number of row/col pairs deleted at least once: " +
259       lastDelTimeMap.size());
260    HBaseTestingUtility.closeRegionAndWAL(region);
261  }
262
263  private static String getRowQualStr(Cell kv) {
264    String rowStr = Bytes.toString(CellUtil.cloneRow(kv));
265    String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv));
266    return rowStr + "_" + qualStr;
267  }
268
269  private static boolean matchesQuery(KeyValue kv, Set<String> qualSet,
270      int maxVersions, Map<String, Long> lastDelTimeMap) {
271    Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv));
272    long ts = kv.getTimestamp();
273    return qualSet.contains(qualStr(kv))
274        && ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions]
275        && (lastDelTS == null || ts > lastDelTS);
276  }
277
278  private static String qualStr(KeyValue kv) {
279    return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(),
280        kv.getQualifierLength());
281  }
282
283  static String createValue(String row, String qual, long ts) {
284    return "value_for_" + row + "_" + qual + "_" + ts;
285  }
286
287  private static List<String> sequentialStrings(String prefix, int n) {
288    List<String> lst = new ArrayList<>();
289    for (int i = 0; i < n; ++i) {
290      StringBuilder sb = new StringBuilder();
291      sb.append(prefix + i);
292
293      // Make column length depend on i.
294      int iBitShifted = i;
295      while (iBitShifted != 0) {
296        sb.append((iBitShifted & 1) == 0 ? 'a' : 'b');
297        iBitShifted >>= 1;
298      }
299
300      lst.add(sb.toString());
301    }
302    return lst;
303  }
304}
305