001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.Set;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.io.compress.Compression;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.BloomFilterUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.junit.After;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056import org.junit.runner.RunWith;
057import org.junit.runners.Parameterized;
058import org.junit.runners.Parameterized.Parameters;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Test various seek optimizations for correctness and check if they are actually saving I/O
064 * operations.
065 */
066@RunWith(Parameterized.class)
067@Category({ RegionServerTests.class, MediumTests.class })
068public class TestSeekOptimizations {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestSeekOptimizations.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestSeekOptimizations.class);
075
076  // Constants
077  private static final String FAMILY = "myCF";
078  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
079
080  private static final int PUTS_PER_ROW_COL = 50;
081  private static final int DELETES_PER_ROW_COL = 10;
082
083  private static final int NUM_ROWS = 3;
084  private static final int NUM_COLS = 3;
085
086  private static final boolean VERBOSE = false;
087
088  /**
089   * Disable this when this test fails hopelessly and you need to debug a simpler case.
090   */
091  private static final boolean USE_MANY_STORE_FILES = true;
092
093  private static final int[][] COLUMN_SETS = new int[][] { {}, // All columns
094    { 0 }, { 1 }, { 0, 2 }, { 1, 2 }, { 0, 1, 2 }, };
095
096  // Both start row and end row are inclusive here for the purposes of this
097  // test.
098  private static final int[][] ROW_RANGES =
099    new int[][] { { -1, -1 }, { 0, 1 }, { 1, 1 }, { 1, 2 }, { 0, 2 } };
100
101  private static final int[] MAX_VERSIONS_VALUES = new int[] { 1, 2 };
102
103  // Instance variables
104  private HRegion region;
105  private Put put;
106  private Delete del;
107  private Set<Long> putTimestamps = new HashSet<>();
108  private Set<Long> delTimestamps = new HashSet<>();
109  private List<Cell> expectedKVs = new ArrayList<>();
110
111  private Compression.Algorithm comprAlgo;
112  private BloomType bloomType;
113
114  private long totalSeekDiligent, totalSeekLazy;
115
116  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
117  private static final Random RNG = new Random(); // This test depends on Random#setSeed
118
119  @Parameters
120  public static final Collection<Object[]> parameters() {
121    return HBaseTestingUtil.BLOOM_AND_COMPRESSION_COMBINATIONS;
122  }
123
124  public TestSeekOptimizations(Compression.Algorithm comprAlgo, BloomType bloomType) {
125    this.comprAlgo = comprAlgo;
126    this.bloomType = bloomType;
127  }
128
129  @Before
130  public void setUp() {
131    RNG.setSeed(91238123L);
132    expectedKVs.clear();
133    TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
134  }
135
136  @Test
137  public void testMultipleTimestampRanges() throws IOException {
138    // enable seek counting
139    StoreFileScanner.instrument();
140    ColumnFamilyDescriptor columnFamilyDescriptor =
141      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setCompressionType(comprAlgo)
142        .setBloomFilterType(bloomType).setMaxVersions(3).build();
143
144    region = TEST_UTIL.createTestRegion("testMultipleTimestampRanges", columnFamilyDescriptor);
145
146    // Delete the given timestamp and everything before.
147    final long latestDelTS = USE_MANY_STORE_FILES ? 1397 : -1;
148
149    createTimestampRange(1, 50, -1);
150    createTimestampRange(51, 100, -1);
151    if (USE_MANY_STORE_FILES) {
152      createTimestampRange(100, 500, 127);
153      createTimestampRange(900, 1300, -1);
154      createTimestampRange(1301, 2500, latestDelTS);
155      createTimestampRange(2502, 2598, -1);
156      createTimestampRange(2599, 2999, -1);
157    }
158
159    prepareExpectedKVs(latestDelTS);
160
161    for (int[] columnArr : COLUMN_SETS) {
162      for (int[] rowRange : ROW_RANGES) {
163        for (int maxVersions : MAX_VERSIONS_VALUES) {
164          for (boolean lazySeekEnabled : new boolean[] { false, true }) {
165            testScan(columnArr, lazySeekEnabled, rowRange[0], rowRange[1], maxVersions);
166          }
167        }
168      }
169    }
170
171    final double seekSavings = 1 - totalSeekLazy * 1.0 / totalSeekDiligent;
172    System.err.println("For bloom=" + bloomType + ", compr=" + comprAlgo
173      + " total seeks without optimization: " + totalSeekDiligent + ", with optimization: "
174      + totalSeekLazy + " (" + String.format("%.2f%%", totalSeekLazy * 100.0 / totalSeekDiligent)
175      + "), savings: " + String.format("%.2f%%", 100.0 * seekSavings) + "\n");
176
177    // Test that lazy seeks are buying us something. Without the actual
178    // implementation of the lazy seek optimization this will be 0.
179    final double expectedSeekSavings = 0.0;
180    assertTrue("Lazy seek is only saving " + String.format("%.2f%%", seekSavings * 100)
181      + " seeks but should " + "save at least "
182      + String.format("%.2f%%", expectedSeekSavings * 100), seekSavings >= expectedSeekSavings);
183  }
184
185  private void testScan(final int[] columnArr, final boolean lazySeekEnabled, final int startRow,
186    final int endRow, int maxVersions) throws IOException {
187    StoreScanner.enableLazySeekGlobally(lazySeekEnabled);
188    final Scan scan = new Scan();
189    final Set<String> qualSet = new HashSet<>();
190    for (int iColumn : columnArr) {
191      String qualStr = getQualStr(iColumn);
192      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualStr));
193      qualSet.add(qualStr);
194    }
195    scan.readVersions(maxVersions);
196    scan.withStartRow(rowBytes(startRow));
197
198    // Adjust for the fact that for multi-row queries the end row is exclusive.
199    if (startRow != endRow) {
200      scan.withStopRow(rowBytes(endRow + 1));
201    } else {
202      scan.withStopRow(rowBytes(endRow), true);
203    }
204
205    final long initialSeekCount = StoreFileScanner.getSeekCount();
206    final InternalScanner scanner = region.getScanner(scan);
207    final List<Cell> results = new ArrayList<>();
208    final List<Cell> actualKVs = new ArrayList<>();
209
210    // Such a clumsy do-while loop appears to be the official way to use an
211    // internalScanner. scanner.next() return value refers to the _next_
212    // result, not to the one already returned in results.
213    boolean hasNext;
214    do {
215      hasNext = scanner.next(results);
216      actualKVs.addAll(results);
217      results.clear();
218    } while (hasNext);
219
220    List<Cell> filteredKVs =
221      filterExpectedResults(qualSet, rowBytes(startRow), rowBytes(endRow), maxVersions);
222    final String rowRestrictionStr = (startRow == -1 && endRow == -1)
223      ? "all rows"
224      : (startRow == endRow
225        ? ("row=" + startRow)
226        : ("startRow=" + startRow + ", " + "endRow=" + endRow));
227    final String columnRestrictionStr =
228      columnArr.length == 0 ? "all columns" : ("columns=" + Arrays.toString(columnArr));
229    final String testDesc = "Bloom=" + bloomType + ", compr=" + comprAlgo + ", "
230      + (scan.isGetScan() ? "Get" : "Scan") + ": " + columnRestrictionStr + ", " + rowRestrictionStr
231      + ", maxVersions=" + maxVersions + ", lazySeek=" + lazySeekEnabled;
232    long seekCount = StoreFileScanner.getSeekCount() - initialSeekCount;
233    if (VERBOSE) {
234      System.err.println("Seek count: " + seekCount + ", KVs returned: " + actualKVs.size() + ". "
235        + testDesc + (lazySeekEnabled ? "\n" : ""));
236    }
237    if (lazySeekEnabled) {
238      totalSeekLazy += seekCount;
239    } else {
240      totalSeekDiligent += seekCount;
241    }
242    assertKVListsEqual(testDesc, filteredKVs, actualKVs);
243  }
244
245  private List<Cell> filterExpectedResults(Set<String> qualSet, byte[] startRow, byte[] endRow,
246    int maxVersions) {
247    final List<Cell> filteredKVs = new ArrayList<>();
248    final Map<String, Integer> verCount = new HashMap<>();
249    for (Cell kv : expectedKVs) {
250      if (
251        startRow.length > 0 && Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(),
252          kv.getRowLength(), startRow, 0, startRow.length) < 0
253      ) {
254        continue;
255      }
256
257      // In this unit test the end row is always inclusive.
258      if (
259        endRow.length > 0 && Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
260          endRow, 0, endRow.length) > 0
261      ) {
262        continue;
263      }
264
265      if (
266        !qualSet.isEmpty() && (!CellUtil.matchingFamily(kv, FAMILY_BYTES)
267          || !qualSet.contains(Bytes.toString(CellUtil.cloneQualifier(kv))))
268      ) {
269        continue;
270      }
271
272      final String rowColStr = Bytes.toStringBinary(CellUtil.cloneRow(kv)) + "/"
273        + Bytes.toStringBinary(CellUtil.cloneFamily(kv)) + ":"
274        + Bytes.toStringBinary(CellUtil.cloneQualifier(kv));
275      final Integer curNumVer = verCount.get(rowColStr);
276      final int newNumVer = curNumVer != null ? (curNumVer + 1) : 1;
277      if (newNumVer <= maxVersions) {
278        filteredKVs.add(kv);
279        verCount.put(rowColStr, newNumVer);
280      }
281    }
282
283    return filteredKVs;
284  }
285
286  private void prepareExpectedKVs(long latestDelTS) {
287    final List<Cell> filteredKVs = new ArrayList<>();
288    for (Cell kv : expectedKVs) {
289      if (kv.getTimestamp() > latestDelTS || latestDelTS == -1) {
290        filteredKVs.add(kv);
291      }
292    }
293    expectedKVs = filteredKVs;
294    Collections.sort(expectedKVs, CellComparatorImpl.COMPARATOR);
295  }
296
297  public void put(String qual, long ts) {
298    if (!putTimestamps.contains(ts)) {
299      put.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts, createValue(ts));
300      putTimestamps.add(ts);
301    }
302    if (VERBOSE) {
303      LOG.info("put: row " + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY + ", qualifier "
304        + qual + ", ts " + ts);
305    }
306  }
307
308  private byte[] createValue(long ts) {
309    return Bytes.toBytes("value" + ts);
310  }
311
312  public void delAtTimestamp(String qual, long ts) {
313    del.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts);
314    logDelete(qual, ts, "at");
315  }
316
317  private void logDelete(String qual, long ts, String delType) {
318    if (VERBOSE) {
319      LOG.info("del " + delType + ": row " + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY
320        + ", qualifier " + qual + ", ts " + ts);
321    }
322  }
323
324  private void delUpToTimestamp(String qual, long upToTS) {
325    del.addColumns(FAMILY_BYTES, Bytes.toBytes(qual), upToTS);
326    logDelete(qual, upToTS, "up to and including");
327  }
328
329  private long randLong(long n) {
330    long l = RNG.nextLong();
331    if (l == Long.MIN_VALUE) l = Long.MAX_VALUE;
332    return Math.abs(l) % n;
333  }
334
335  private long randBetween(long a, long b) {
336    long x = a + randLong(b - a + 1);
337    assertTrue(a <= x && x <= b);
338    return x;
339  }
340
341  private final String rowStr(int i) {
342    return ("row" + i).intern();
343  }
344
345  private final byte[] rowBytes(int i) {
346    if (i == -1) {
347      return HConstants.EMPTY_BYTE_ARRAY;
348    }
349    return Bytes.toBytes(rowStr(i));
350  }
351
352  private final String getQualStr(int i) {
353    return ("qual" + i).intern();
354  }
355
356  public void createTimestampRange(long minTS, long maxTS, long deleteUpToTS) throws IOException {
357    assertTrue(minTS < maxTS);
358    assertTrue(deleteUpToTS == -1 || (minTS <= deleteUpToTS && deleteUpToTS <= maxTS));
359
360    for (int iRow = 0; iRow < NUM_ROWS; ++iRow) {
361      final String row = rowStr(iRow);
362      final byte[] rowBytes = Bytes.toBytes(row);
363      for (int iCol = 0; iCol < NUM_COLS; ++iCol) {
364        final String qual = getQualStr(iCol);
365        final byte[] qualBytes = Bytes.toBytes(qual);
366        put = new Put(rowBytes);
367
368        putTimestamps.clear();
369        put(qual, minTS);
370        put(qual, maxTS);
371        for (int i = 0; i < PUTS_PER_ROW_COL; ++i) {
372          put(qual, randBetween(minTS, maxTS));
373        }
374
375        long[] putTimestampList = new long[putTimestamps.size()];
376        {
377          int i = 0;
378          for (long ts : putTimestamps) {
379            putTimestampList[i++] = ts;
380          }
381        }
382
383        // Delete a predetermined number of particular timestamps
384        delTimestamps.clear();
385        assertTrue(putTimestampList.length >= DELETES_PER_ROW_COL);
386        int numToDel = DELETES_PER_ROW_COL;
387        int tsRemaining = putTimestampList.length;
388        del = new Delete(rowBytes);
389        for (long ts : putTimestampList) {
390          if (RNG.nextInt(tsRemaining) < numToDel) {
391            delAtTimestamp(qual, ts);
392            putTimestamps.remove(ts);
393            --numToDel;
394          }
395
396          if (--tsRemaining == 0) {
397            break;
398          }
399        }
400
401        // Another type of delete: everything up to the given timestamp.
402        if (deleteUpToTS != -1) {
403          delUpToTimestamp(qual, deleteUpToTS);
404        }
405
406        region.put(put);
407        if (!del.isEmpty()) {
408          region.delete(del);
409        }
410
411        // Add remaining timestamps (those we have not deleted) to expected
412        // results
413        for (long ts : putTimestamps) {
414          expectedKVs.add(new KeyValue(rowBytes, FAMILY_BYTES, qualBytes, ts, KeyValue.Type.Put));
415        }
416      }
417    }
418
419    region.flush(true);
420  }
421
422  @After
423  public void tearDown() throws IOException {
424    if (region != null) {
425      HBaseTestingUtil.closeRegionAndWAL(region);
426    }
427
428    // We have to re-set the lazy seek flag back to the default so that other
429    // unit tests are not affected.
430    StoreScanner.enableLazySeekGlobally(StoreScanner.LAZY_SEEK_ENABLED_BY_DEFAULT);
431  }
432
433  public void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
434    final List<? extends Cell> actual) {
435    final int eLen = expected.size();
436    final int aLen = actual.size();
437    final int minLen = Math.min(eLen, aLen);
438
439    int i;
440    for (i = 0; i < minLen && PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR,
441      expected.get(i), actual.get(i)) == 0; ++i) {
442    }
443
444    if (additionalMsg == null) {
445      additionalMsg = "";
446    }
447    if (!additionalMsg.isEmpty()) {
448      additionalMsg = ". " + additionalMsg;
449    }
450
451    if (eLen != aLen || i != minLen) {
452      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
453        + HBaseTestingUtil.safeGetAsStr(expected, i) + " (length " + eLen + ") vs. "
454        + HBaseTestingUtil.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
455    }
456  }
457}