001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.Set;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.PrivateCellUtil;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.io.compress.Compression;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.BloomFilterUtil;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.runner.RunWith;
056import org.junit.runners.Parameterized;
057import org.junit.runners.Parameterized.Parameters;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Test various seek optimizations for correctness and check if they are actually saving I/O
063 * operations.
064 */
065@RunWith(Parameterized.class)
066@Category({ RegionServerTests.class, MediumTests.class })
067public class TestSeekOptimizations {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestSeekOptimizations.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestSeekOptimizations.class);
074
075  // Constants
076  private static final String FAMILY = "myCF";
077  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
078
079  private static final int PUTS_PER_ROW_COL = 50;
080  private static final int DELETES_PER_ROW_COL = 10;
081
082  private static final int NUM_ROWS = 3;
083  private static final int NUM_COLS = 3;
084
085  private static final boolean VERBOSE = false;
086
087  /**
088   * Disable this when this test fails hopelessly and you need to debug a simpler case.
089   */
090  private static final boolean USE_MANY_STORE_FILES = true;
091
092  private static final int[][] COLUMN_SETS = new int[][] { {}, // All columns
093    { 0 }, { 1 }, { 0, 2 }, { 1, 2 }, { 0, 1, 2 }, };
094
095  // Both start row and end row are inclusive here for the purposes of this
096  // test.
097  private static final int[][] ROW_RANGES =
098    new int[][] { { -1, -1 }, { 0, 1 }, { 1, 1 }, { 1, 2 }, { 0, 2 } };
099
100  private static final int[] MAX_VERSIONS_VALUES = new int[] { 1, 2 };
101
102  // Instance variables
103  private HRegion region;
104  private Put put;
105  private Delete del;
106  private Set<Long> putTimestamps = new HashSet<>();
107  private Set<Long> delTimestamps = new HashSet<>();
108  private List<Cell> expectedKVs = new ArrayList<>();
109
110  private Compression.Algorithm comprAlgo;
111  private BloomType bloomType;
112
113  private long totalSeekDiligent, totalSeekLazy;
114
115  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
116  private static final Random RNG = new Random(); // This test depends on Random#setSeed
117
118  @Parameters
119  public static final Collection<Object[]> parameters() {
120    return HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS;
121  }
122
123  public TestSeekOptimizations(Compression.Algorithm comprAlgo, BloomType bloomType) {
124    this.comprAlgo = comprAlgo;
125    this.bloomType = bloomType;
126  }
127
128  @Before
129  public void setUp() {
130    RNG.setSeed(91238123L);
131    expectedKVs.clear();
132    TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
133  }
134
135  @Test
136  public void testMultipleTimestampRanges() throws IOException {
137    // enable seek counting
138    StoreFileScanner.instrument();
139
140    region = TEST_UTIL.createTestRegion("testMultipleTimestampRanges", new HColumnDescriptor(FAMILY)
141      .setCompressionType(comprAlgo).setBloomFilterType(bloomType).setMaxVersions(3));
142
143    // Delete the given timestamp and everything before.
144    final long latestDelTS = USE_MANY_STORE_FILES ? 1397 : -1;
145
146    createTimestampRange(1, 50, -1);
147    createTimestampRange(51, 100, -1);
148    if (USE_MANY_STORE_FILES) {
149      createTimestampRange(100, 500, 127);
150      createTimestampRange(900, 1300, -1);
151      createTimestampRange(1301, 2500, latestDelTS);
152      createTimestampRange(2502, 2598, -1);
153      createTimestampRange(2599, 2999, -1);
154    }
155
156    prepareExpectedKVs(latestDelTS);
157
158    for (int[] columnArr : COLUMN_SETS) {
159      for (int[] rowRange : ROW_RANGES) {
160        for (int maxVersions : MAX_VERSIONS_VALUES) {
161          for (boolean lazySeekEnabled : new boolean[] { false, true }) {
162            testScan(columnArr, lazySeekEnabled, rowRange[0], rowRange[1], maxVersions);
163          }
164        }
165      }
166    }
167
168    final double seekSavings = 1 - totalSeekLazy * 1.0 / totalSeekDiligent;
169    System.err.println("For bloom=" + bloomType + ", compr=" + comprAlgo
170      + " total seeks without optimization: " + totalSeekDiligent + ", with optimization: "
171      + totalSeekLazy + " (" + String.format("%.2f%%", totalSeekLazy * 100.0 / totalSeekDiligent)
172      + "), savings: " + String.format("%.2f%%", 100.0 * seekSavings) + "\n");
173
174    // Test that lazy seeks are buying us something. Without the actual
175    // implementation of the lazy seek optimization this will be 0.
176    final double expectedSeekSavings = 0.0;
177    assertTrue("Lazy seek is only saving " + String.format("%.2f%%", seekSavings * 100)
178      + " seeks but should " + "save at least "
179      + String.format("%.2f%%", expectedSeekSavings * 100), seekSavings >= expectedSeekSavings);
180  }
181
182  private void testScan(final int[] columnArr, final boolean lazySeekEnabled, final int startRow,
183    final int endRow, int maxVersions) throws IOException {
184    StoreScanner.enableLazySeekGlobally(lazySeekEnabled);
185    final Scan scan = new Scan();
186    final Set<String> qualSet = new HashSet<>();
187    for (int iColumn : columnArr) {
188      String qualStr = getQualStr(iColumn);
189      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualStr));
190      qualSet.add(qualStr);
191    }
192    scan.setMaxVersions(maxVersions);
193    scan.setStartRow(rowBytes(startRow));
194
195    // Adjust for the fact that for multi-row queries the end row is exclusive.
196    {
197      final byte[] scannerStopRow = rowBytes(endRow + (startRow != endRow ? 1 : 0));
198      scan.setStopRow(scannerStopRow);
199    }
200
201    final long initialSeekCount = StoreFileScanner.getSeekCount();
202    final InternalScanner scanner = region.getScanner(scan);
203    final List<Cell> results = new ArrayList<>();
204    final List<Cell> actualKVs = new ArrayList<>();
205
206    // Such a clumsy do-while loop appears to be the official way to use an
207    // internalScanner. scanner.next() return value refers to the _next_
208    // result, not to the one already returned in results.
209    boolean hasNext;
210    do {
211      hasNext = scanner.next(results);
212      actualKVs.addAll(results);
213      results.clear();
214    } while (hasNext);
215
216    List<Cell> filteredKVs =
217      filterExpectedResults(qualSet, rowBytes(startRow), rowBytes(endRow), maxVersions);
218    final String rowRestrictionStr = (startRow == -1 && endRow == -1)
219      ? "all rows"
220      : (startRow == endRow
221        ? ("row=" + startRow)
222        : ("startRow=" + startRow + ", " + "endRow=" + endRow));
223    final String columnRestrictionStr =
224      columnArr.length == 0 ? "all columns" : ("columns=" + Arrays.toString(columnArr));
225    final String testDesc = "Bloom=" + bloomType + ", compr=" + comprAlgo + ", "
226      + (scan.isGetScan() ? "Get" : "Scan") + ": " + columnRestrictionStr + ", " + rowRestrictionStr
227      + ", maxVersions=" + maxVersions + ", lazySeek=" + lazySeekEnabled;
228    long seekCount = StoreFileScanner.getSeekCount() - initialSeekCount;
229    if (VERBOSE) {
230      System.err.println("Seek count: " + seekCount + ", KVs returned: " + actualKVs.size() + ". "
231        + testDesc + (lazySeekEnabled ? "\n" : ""));
232    }
233    if (lazySeekEnabled) {
234      totalSeekLazy += seekCount;
235    } else {
236      totalSeekDiligent += seekCount;
237    }
238    assertKVListsEqual(testDesc, filteredKVs, actualKVs);
239  }
240
241  private List<Cell> filterExpectedResults(Set<String> qualSet, byte[] startRow, byte[] endRow,
242    int maxVersions) {
243    final List<Cell> filteredKVs = new ArrayList<>();
244    final Map<String, Integer> verCount = new HashMap<>();
245    for (Cell kv : expectedKVs) {
246      if (
247        startRow.length > 0 && Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(),
248          kv.getRowLength(), startRow, 0, startRow.length) < 0
249      ) {
250        continue;
251      }
252
253      // In this unit test the end row is always inclusive.
254      if (
255        endRow.length > 0 && Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
256          endRow, 0, endRow.length) > 0
257      ) {
258        continue;
259      }
260
261      if (
262        !qualSet.isEmpty() && (!CellUtil.matchingFamily(kv, FAMILY_BYTES)
263          || !qualSet.contains(Bytes.toString(CellUtil.cloneQualifier(kv))))
264      ) {
265        continue;
266      }
267
268      final String rowColStr = Bytes.toStringBinary(CellUtil.cloneRow(kv)) + "/"
269        + Bytes.toStringBinary(CellUtil.cloneFamily(kv)) + ":"
270        + Bytes.toStringBinary(CellUtil.cloneQualifier(kv));
271      final Integer curNumVer = verCount.get(rowColStr);
272      final int newNumVer = curNumVer != null ? (curNumVer + 1) : 1;
273      if (newNumVer <= maxVersions) {
274        filteredKVs.add(kv);
275        verCount.put(rowColStr, newNumVer);
276      }
277    }
278
279    return filteredKVs;
280  }
281
282  private void prepareExpectedKVs(long latestDelTS) {
283    final List<Cell> filteredKVs = new ArrayList<>();
284    for (Cell kv : expectedKVs) {
285      if (kv.getTimestamp() > latestDelTS || latestDelTS == -1) {
286        filteredKVs.add(kv);
287      }
288    }
289    expectedKVs = filteredKVs;
290    Collections.sort(expectedKVs, CellComparatorImpl.COMPARATOR);
291  }
292
293  public void put(String qual, long ts) {
294    if (!putTimestamps.contains(ts)) {
295      put.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts, createValue(ts));
296      putTimestamps.add(ts);
297    }
298    if (VERBOSE) {
299      LOG.info("put: row " + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY + ", qualifier "
300        + qual + ", ts " + ts);
301    }
302  }
303
304  private byte[] createValue(long ts) {
305    return Bytes.toBytes("value" + ts);
306  }
307
308  public void delAtTimestamp(String qual, long ts) {
309    del.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts);
310    logDelete(qual, ts, "at");
311  }
312
313  private void logDelete(String qual, long ts, String delType) {
314    if (VERBOSE) {
315      LOG.info("del " + delType + ": row " + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY
316        + ", qualifier " + qual + ", ts " + ts);
317    }
318  }
319
320  private void delUpToTimestamp(String qual, long upToTS) {
321    del.addColumns(FAMILY_BYTES, Bytes.toBytes(qual), upToTS);
322    logDelete(qual, upToTS, "up to and including");
323  }
324
325  private long randLong(long n) {
326    long l = RNG.nextLong();
327    if (l == Long.MIN_VALUE) l = Long.MAX_VALUE;
328    return Math.abs(l) % n;
329  }
330
331  private long randBetween(long a, long b) {
332    long x = a + randLong(b - a + 1);
333    assertTrue(a <= x && x <= b);
334    return x;
335  }
336
337  private final String rowStr(int i) {
338    return ("row" + i).intern();
339  }
340
341  private final byte[] rowBytes(int i) {
342    if (i == -1) {
343      return HConstants.EMPTY_BYTE_ARRAY;
344    }
345    return Bytes.toBytes(rowStr(i));
346  }
347
348  private final String getQualStr(int i) {
349    return ("qual" + i).intern();
350  }
351
352  public void createTimestampRange(long minTS, long maxTS, long deleteUpToTS) throws IOException {
353    assertTrue(minTS < maxTS);
354    assertTrue(deleteUpToTS == -1 || (minTS <= deleteUpToTS && deleteUpToTS <= maxTS));
355
356    for (int iRow = 0; iRow < NUM_ROWS; ++iRow) {
357      final String row = rowStr(iRow);
358      final byte[] rowBytes = Bytes.toBytes(row);
359      for (int iCol = 0; iCol < NUM_COLS; ++iCol) {
360        final String qual = getQualStr(iCol);
361        final byte[] qualBytes = Bytes.toBytes(qual);
362        put = new Put(rowBytes);
363
364        putTimestamps.clear();
365        put(qual, minTS);
366        put(qual, maxTS);
367        for (int i = 0; i < PUTS_PER_ROW_COL; ++i) {
368          put(qual, randBetween(minTS, maxTS));
369        }
370
371        long[] putTimestampList = new long[putTimestamps.size()];
372        {
373          int i = 0;
374          for (long ts : putTimestamps) {
375            putTimestampList[i++] = ts;
376          }
377        }
378
379        // Delete a predetermined number of particular timestamps
380        delTimestamps.clear();
381        assertTrue(putTimestampList.length >= DELETES_PER_ROW_COL);
382        int numToDel = DELETES_PER_ROW_COL;
383        int tsRemaining = putTimestampList.length;
384        del = new Delete(rowBytes);
385        for (long ts : putTimestampList) {
386          if (RNG.nextInt(tsRemaining) < numToDel) {
387            delAtTimestamp(qual, ts);
388            putTimestamps.remove(ts);
389            --numToDel;
390          }
391
392          if (--tsRemaining == 0) {
393            break;
394          }
395        }
396
397        // Another type of delete: everything up to the given timestamp.
398        if (deleteUpToTS != -1) {
399          delUpToTimestamp(qual, deleteUpToTS);
400        }
401
402        region.put(put);
403        if (!del.isEmpty()) {
404          region.delete(del);
405        }
406
407        // Add remaining timestamps (those we have not deleted) to expected
408        // results
409        for (long ts : putTimestamps) {
410          expectedKVs.add(new KeyValue(rowBytes, FAMILY_BYTES, qualBytes, ts, KeyValue.Type.Put));
411        }
412      }
413    }
414
415    region.flush(true);
416  }
417
418  @After
419  public void tearDown() throws IOException {
420    if (region != null) {
421      HBaseTestingUtility.closeRegionAndWAL(region);
422    }
423
424    // We have to re-set the lazy seek flag back to the default so that other
425    // unit tests are not affected.
426    StoreScanner.enableLazySeekGlobally(StoreScanner.LAZY_SEEK_ENABLED_BY_DEFAULT);
427  }
428
429  public void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
430    final List<? extends Cell> actual) {
431    final int eLen = expected.size();
432    final int aLen = actual.size();
433    final int minLen = Math.min(eLen, aLen);
434
435    int i;
436    for (i = 0; i < minLen && PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR,
437      expected.get(i), actual.get(i)) == 0; ++i) {
438    }
439
440    if (additionalMsg == null) {
441      additionalMsg = "";
442    }
443    if (!additionalMsg.isEmpty()) {
444      additionalMsg = ". " + additionalMsg;
445    }
446
447    if (eLen != aLen || i != minLen) {
448      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
449        + HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen + ") vs. "
450        + HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
451    }
452  }
453}