001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.Set;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.PrivateCellUtil;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.io.compress.Compression;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.BloomFilterUtil;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.runner.RunWith;
056import org.junit.runners.Parameterized;
057import org.junit.runners.Parameterized.Parameters;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Test various seek optimizations for correctness and check if they are
063 * actually saving I/O operations.
064 */
065@RunWith(Parameterized.class)
066@Category({RegionServerTests.class, MediumTests.class})
067public class TestSeekOptimizations {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestSeekOptimizations.class);
072
073  private static final Logger LOG =
074      LoggerFactory.getLogger(TestSeekOptimizations.class);
075
076  // Constants
077  private static final String FAMILY = "myCF";
078  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
079
080  private static final int PUTS_PER_ROW_COL = 50;
081  private static final int DELETES_PER_ROW_COL = 10;
082
083  private static final int NUM_ROWS = 3;
084  private static final int NUM_COLS = 3;
085
086  private static final boolean VERBOSE = false;
087
088  /**
089   * Disable this when this test fails hopelessly and you need to debug a
090   * simpler case.
091   */
092  private static final boolean USE_MANY_STORE_FILES = true;
093
094  private static final int[][] COLUMN_SETS = new int[][] {
095    {},  // All columns
096    {0},
097    {1},
098    {0, 2},
099    {1, 2},
100    {0, 1, 2},
101  };
102
103  // Both start row and end row are inclusive here for the purposes of this
104  // test.
105  private static final int[][] ROW_RANGES = new int[][] {
106    {-1, -1},
107    {0, 1},
108    {1, 1},
109    {1, 2},
110    {0, 2}
111  };
112
113  private static final int[] MAX_VERSIONS_VALUES = new int[] { 1, 2 };
114
115  // Instance variables
116  private HRegion region;
117  private Put put;
118  private Delete del;
119  private Random rand;
120  private Set<Long> putTimestamps = new HashSet<>();
121  private Set<Long> delTimestamps = new HashSet<>();
122  private List<Cell> expectedKVs = new ArrayList<>();
123
124  private Compression.Algorithm comprAlgo;
125  private BloomType bloomType;
126
127  private long totalSeekDiligent, totalSeekLazy;
128
129  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
130
131  @Parameters
132  public static final Collection<Object[]> parameters() {
133    return HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS;
134  }
135
136  public TestSeekOptimizations(Compression.Algorithm comprAlgo,
137      BloomType bloomType) {
138    this.comprAlgo = comprAlgo;
139    this.bloomType = bloomType;
140  }
141
142  @Before
143  public void setUp() {
144    rand = new Random(91238123L);
145    expectedKVs.clear();
146    TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
147  }
148
149  @Test
150  public void testMultipleTimestampRanges() throws IOException {
151    // enable seek counting
152    StoreFileScanner.instrument();
153
154    region = TEST_UTIL.createTestRegion("testMultipleTimestampRanges",
155        new HColumnDescriptor(FAMILY)
156            .setCompressionType(comprAlgo)
157            .setBloomFilterType(bloomType)
158            .setMaxVersions(3)
159    );
160
161    // Delete the given timestamp and everything before.
162    final long latestDelTS = USE_MANY_STORE_FILES ? 1397 : -1;
163
164    createTimestampRange(1, 50, -1);
165    createTimestampRange(51, 100, -1);
166    if (USE_MANY_STORE_FILES) {
167      createTimestampRange(100, 500, 127);
168      createTimestampRange(900, 1300, -1);
169      createTimestampRange(1301, 2500, latestDelTS);
170      createTimestampRange(2502, 2598, -1);
171      createTimestampRange(2599, 2999, -1);
172    }
173
174    prepareExpectedKVs(latestDelTS);
175
176    for (int[] columnArr : COLUMN_SETS) {
177      for (int[] rowRange : ROW_RANGES) {
178        for (int maxVersions : MAX_VERSIONS_VALUES) {
179          for (boolean lazySeekEnabled : new boolean[] { false, true }) {
180            testScan(columnArr, lazySeekEnabled, rowRange[0], rowRange[1],
181                maxVersions);
182          }
183        }
184      }
185    }
186
187    final double seekSavings = 1 - totalSeekLazy * 1.0 / totalSeekDiligent;
188    System.err.println("For bloom=" + bloomType + ", compr=" + comprAlgo +
189        " total seeks without optimization: " + totalSeekDiligent
190        + ", with optimization: " + totalSeekLazy + " (" +
191        String.format("%.2f%%", totalSeekLazy * 100.0 / totalSeekDiligent) +
192        "), savings: " + String.format("%.2f%%",
193            100.0 * seekSavings) + "\n");
194
195    // Test that lazy seeks are buying us something. Without the actual
196    // implementation of the lazy seek optimization this will be 0.
197    final double expectedSeekSavings = 0.0;
198    assertTrue("Lazy seek is only saving " +
199        String.format("%.2f%%", seekSavings * 100) + " seeks but should " +
200        "save at least " + String.format("%.2f%%", expectedSeekSavings * 100),
201        seekSavings >= expectedSeekSavings);
202  }
203
204  private void testScan(final int[] columnArr, final boolean lazySeekEnabled,
205      final int startRow, final int endRow, int maxVersions)
206      throws IOException {
207    StoreScanner.enableLazySeekGlobally(lazySeekEnabled);
208    final Scan scan = new Scan();
209    final Set<String> qualSet = new HashSet<>();
210    for (int iColumn : columnArr) {
211      String qualStr = getQualStr(iColumn);
212      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualStr));
213      qualSet.add(qualStr);
214    }
215    scan.setMaxVersions(maxVersions);
216    scan.setStartRow(rowBytes(startRow));
217
218    // Adjust for the fact that for multi-row queries the end row is exclusive.
219    {
220      final byte[] scannerStopRow =
221          rowBytes(endRow + (startRow != endRow ? 1 : 0));
222      scan.setStopRow(scannerStopRow);
223    }
224
225    final long initialSeekCount = StoreFileScanner.getSeekCount();
226    final InternalScanner scanner = region.getScanner(scan);
227    final List<Cell> results = new ArrayList<>();
228    final List<Cell> actualKVs = new ArrayList<>();
229
230    // Such a clumsy do-while loop appears to be the official way to use an
231    // internalScanner. scanner.next() return value refers to the _next_
232    // result, not to the one already returned in results.
233    boolean hasNext;
234    do {
235      hasNext = scanner.next(results);
236      actualKVs.addAll(results);
237      results.clear();
238    } while (hasNext);
239
240    List<Cell> filteredKVs = filterExpectedResults(qualSet,
241        rowBytes(startRow), rowBytes(endRow), maxVersions);
242    final String rowRestrictionStr =
243        (startRow == -1 && endRow == -1) ? "all rows" : (
244            startRow == endRow ? ("row=" + startRow) : ("startRow="
245            + startRow + ", " + "endRow=" + endRow));
246    final String columnRestrictionStr =
247        columnArr.length == 0 ? "all columns"
248            : ("columns=" + Arrays.toString(columnArr));
249    final String testDesc =
250        "Bloom=" + bloomType + ", compr=" + comprAlgo + ", "
251            + (scan.isGetScan() ? "Get" : "Scan") + ": "
252            + columnRestrictionStr + ", " + rowRestrictionStr
253            + ", maxVersions=" + maxVersions + ", lazySeek=" + lazySeekEnabled;
254    long seekCount = StoreFileScanner.getSeekCount() - initialSeekCount;
255    if (VERBOSE) {
256      System.err.println("Seek count: " + seekCount + ", KVs returned: "
257        + actualKVs.size() + ". " + testDesc +
258        (lazySeekEnabled ? "\n" : ""));
259    }
260    if (lazySeekEnabled) {
261      totalSeekLazy += seekCount;
262    } else {
263      totalSeekDiligent += seekCount;
264    }
265    assertKVListsEqual(testDesc, filteredKVs, actualKVs);
266  }
267
268  private List<Cell> filterExpectedResults(Set<String> qualSet,
269      byte[] startRow, byte[] endRow, int maxVersions) {
270    final List<Cell> filteredKVs = new ArrayList<>();
271    final Map<String, Integer> verCount = new HashMap<>();
272    for (Cell kv : expectedKVs) {
273      if (startRow.length > 0 &&
274          Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
275              startRow, 0, startRow.length) < 0) {
276        continue;
277      }
278
279      // In this unit test the end row is always inclusive.
280      if (endRow.length > 0 &&
281          Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
282              endRow, 0, endRow.length) > 0) {
283        continue;
284      }
285
286      if (!qualSet.isEmpty() && (!CellUtil.matchingFamily(kv, FAMILY_BYTES)
287          || !qualSet.contains(Bytes.toString(CellUtil.cloneQualifier(kv))))) {
288        continue;
289      }
290
291      final String rowColStr =
292        Bytes.toStringBinary(CellUtil.cloneRow(kv)) + "/"
293            + Bytes.toStringBinary(CellUtil.cloneFamily(kv)) + ":"
294            + Bytes.toStringBinary(CellUtil.cloneQualifier(kv));
295      final Integer curNumVer = verCount.get(rowColStr);
296      final int newNumVer = curNumVer != null ? (curNumVer + 1) : 1;
297      if (newNumVer <= maxVersions) {
298        filteredKVs.add(kv);
299        verCount.put(rowColStr, newNumVer);
300      }
301    }
302
303    return filteredKVs;
304  }
305
306  private void prepareExpectedKVs(long latestDelTS) {
307    final List<Cell> filteredKVs = new ArrayList<>();
308    for (Cell kv : expectedKVs) {
309      if (kv.getTimestamp() > latestDelTS || latestDelTS == -1) {
310        filteredKVs.add(kv);
311      }
312    }
313    expectedKVs = filteredKVs;
314    Collections.sort(expectedKVs, CellComparatorImpl.COMPARATOR);
315  }
316
317  public void put(String qual, long ts) {
318    if (!putTimestamps.contains(ts)) {
319      put.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts, createValue(ts));
320      putTimestamps.add(ts);
321    }
322    if (VERBOSE) {
323      LOG.info("put: row " + Bytes.toStringBinary(put.getRow())
324          + ", cf " + FAMILY + ", qualifier " + qual + ", ts " + ts);
325    }
326  }
327
328  private byte[] createValue(long ts) {
329    return Bytes.toBytes("value" + ts);
330  }
331
332  public void delAtTimestamp(String qual, long ts) {
333    del.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts);
334    logDelete(qual, ts, "at");
335  }
336
337  private void logDelete(String qual, long ts, String delType) {
338    if (VERBOSE) {
339      LOG.info("del " + delType + ": row "
340          + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY
341          + ", qualifier " + qual + ", ts " + ts);
342    }
343  }
344
345  private void delUpToTimestamp(String qual, long upToTS) {
346    del.addColumns(FAMILY_BYTES, Bytes.toBytes(qual), upToTS);
347    logDelete(qual, upToTS, "up to and including");
348  }
349
350  private long randLong(long n) {
351    long l = rand.nextLong();
352    if (l == Long.MIN_VALUE)
353      l = Long.MAX_VALUE;
354    return Math.abs(l) % n;
355  }
356
357  private long randBetween(long a, long b) {
358    long x = a + randLong(b - a + 1);
359    assertTrue(a <= x && x <= b);
360    return x;
361  }
362
363  private final String rowStr(int i) {
364    return ("row" + i).intern();
365  }
366
367  private final byte[] rowBytes(int i) {
368    if (i == -1) {
369      return HConstants.EMPTY_BYTE_ARRAY;
370    }
371    return Bytes.toBytes(rowStr(i));
372  }
373
374  private final String getQualStr(int i) {
375    return ("qual" + i).intern();
376  }
377
378  public void createTimestampRange(long minTS, long maxTS,
379      long deleteUpToTS) throws IOException {
380    assertTrue(minTS < maxTS);
381    assertTrue(deleteUpToTS == -1
382        || (minTS <= deleteUpToTS && deleteUpToTS <= maxTS));
383
384    for (int iRow = 0; iRow < NUM_ROWS; ++iRow) {
385      final String row = rowStr(iRow);
386      final byte[] rowBytes = Bytes.toBytes(row);
387      for (int iCol = 0; iCol < NUM_COLS; ++iCol) {
388        final String qual = getQualStr(iCol);
389        final byte[] qualBytes = Bytes.toBytes(qual);
390        put = new Put(rowBytes);
391
392        putTimestamps.clear();
393        put(qual, minTS);
394        put(qual, maxTS);
395        for (int i = 0; i < PUTS_PER_ROW_COL; ++i) {
396          put(qual, randBetween(minTS, maxTS));
397        }
398
399        long[] putTimestampList = new long[putTimestamps.size()];
400        {
401          int i = 0;
402          for (long ts : putTimestamps) {
403            putTimestampList[i++] = ts;
404          }
405        }
406
407        // Delete a predetermined number of particular timestamps
408        delTimestamps.clear();
409        assertTrue(putTimestampList.length >= DELETES_PER_ROW_COL);
410        int numToDel = DELETES_PER_ROW_COL;
411        int tsRemaining = putTimestampList.length;
412        del = new Delete(rowBytes);
413        for (long ts : putTimestampList) {
414          if (rand.nextInt(tsRemaining) < numToDel) {
415            delAtTimestamp(qual, ts);
416            putTimestamps.remove(ts);
417            --numToDel;
418          }
419
420          if (--tsRemaining == 0) {
421            break;
422          }
423        }
424
425        // Another type of delete: everything up to the given timestamp.
426        if (deleteUpToTS != -1) {
427          delUpToTimestamp(qual, deleteUpToTS);
428        }
429
430        region.put(put);
431        if (!del.isEmpty()) {
432          region.delete(del);
433        }
434
435        // Add remaining timestamps (those we have not deleted) to expected
436        // results
437        for (long ts : putTimestamps) {
438          expectedKVs.add(new KeyValue(rowBytes, FAMILY_BYTES, qualBytes, ts,
439              KeyValue.Type.Put));
440        }
441      }
442    }
443
444    region.flush(true);
445  }
446
447  @After
448  public void tearDown() throws IOException {
449    if (region != null) {
450      HBaseTestingUtility.closeRegionAndWAL(region);
451    }
452
453    // We have to re-set the lazy seek flag back to the default so that other
454    // unit tests are not affected.
455    StoreScanner.enableLazySeekGlobally(
456        StoreScanner.LAZY_SEEK_ENABLED_BY_DEFAULT);
457  }
458
459
460  public void assertKVListsEqual(String additionalMsg,
461      final List<? extends Cell> expected,
462      final List<? extends Cell> actual) {
463    final int eLen = expected.size();
464    final int aLen = actual.size();
465    final int minLen = Math.min(eLen, aLen);
466
467    int i;
468    for (i = 0; i < minLen
469        && PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expected.get(i),
470          actual.get(i)) == 0; ++i) {
471    }
472
473    if (additionalMsg == null) {
474      additionalMsg = "";
475    }
476    if (!additionalMsg.isEmpty()) {
477      additionalMsg = ". " + additionalMsg;
478    }
479
480    if (eLen != aLen || i != minLen) {
481      throw new AssertionError(
482          "Expected and actual KV arrays differ at position " + i + ": " +
483          HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
484          HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
485    }
486  }
487}
488