001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Random;
032import java.util.Set;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.PrivateCellUtil;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Put;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.io.compress.Compression;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.After;
050import org.junit.Before;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.runner.RunWith;
055import org.junit.runners.Parameterized;
056import org.junit.runners.Parameterized.Parameters;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060/**
061 * Test various seek optimizations for correctness and check if they are
062 * actually saving I/O operations.
063 */
064@RunWith(Parameterized.class)
065@Category({RegionServerTests.class, MediumTests.class})
066public class TestSeekOptimizations {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestSeekOptimizations.class);
071
072  private static final Logger LOG =
073      LoggerFactory.getLogger(TestSeekOptimizations.class);
074
075  // Constants
076  private static final String FAMILY = "myCF";
077  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
078
079  private static final int PUTS_PER_ROW_COL = 50;
080  private static final int DELETES_PER_ROW_COL = 10;
081
082  private static final int NUM_ROWS = 3;
083  private static final int NUM_COLS = 3;
084
085  private static final boolean VERBOSE = false;
086
087  /**
088   * Disable this when this test fails hopelessly and you need to debug a
089   * simpler case.
090   */
091  private static final boolean USE_MANY_STORE_FILES = true;
092
093  private static final int[][] COLUMN_SETS = new int[][] {
094    {},  // All columns
095    {0},
096    {1},
097    {0, 2},
098    {1, 2},
099    {0, 1, 2},
100  };
101
102  // Both start row and end row are inclusive here for the purposes of this
103  // test.
104  private static final int[][] ROW_RANGES = new int[][] {
105    {-1, -1},
106    {0, 1},
107    {1, 1},
108    {1, 2},
109    {0, 2}
110  };
111
112  private static final int[] MAX_VERSIONS_VALUES = new int[] { 1, 2 };
113
114  // Instance variables
115  private HRegion region;
116  private Put put;
117  private Delete del;
118  private Random rand;
119  private Set<Long> putTimestamps = new HashSet<>();
120  private Set<Long> delTimestamps = new HashSet<>();
121  private List<Cell> expectedKVs = new ArrayList<>();
122
123  private Compression.Algorithm comprAlgo;
124  private BloomType bloomType;
125
126  private long totalSeekDiligent, totalSeekLazy;
127
128  private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
129
130  @Parameters
131  public static final Collection<Object[]> parameters() {
132    return HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS;
133  }
134
135  public TestSeekOptimizations(Compression.Algorithm comprAlgo,
136      BloomType bloomType) {
137    this.comprAlgo = comprAlgo;
138    this.bloomType = bloomType;
139  }
140
141  @Before
142  public void setUp() {
143    rand = new Random(91238123L);
144    expectedKVs.clear();
145  }
146
147  @Test
148  public void testMultipleTimestampRanges() throws IOException {
149    // enable seek counting
150    StoreFileScanner.instrument();
151
152    region = TEST_UTIL.createTestRegion("testMultipleTimestampRanges",
153        new HColumnDescriptor(FAMILY)
154            .setCompressionType(comprAlgo)
155            .setBloomFilterType(bloomType)
156            .setMaxVersions(3)
157    );
158
159    // Delete the given timestamp and everything before.
160    final long latestDelTS = USE_MANY_STORE_FILES ? 1397 : -1;
161
162    createTimestampRange(1, 50, -1);
163    createTimestampRange(51, 100, -1);
164    if (USE_MANY_STORE_FILES) {
165      createTimestampRange(100, 500, 127);
166      createTimestampRange(900, 1300, -1);
167      createTimestampRange(1301, 2500, latestDelTS);
168      createTimestampRange(2502, 2598, -1);
169      createTimestampRange(2599, 2999, -1);
170    }
171
172    prepareExpectedKVs(latestDelTS);
173
174    for (int[] columnArr : COLUMN_SETS) {
175      for (int[] rowRange : ROW_RANGES) {
176        for (int maxVersions : MAX_VERSIONS_VALUES) {
177          for (boolean lazySeekEnabled : new boolean[] { false, true }) {
178            testScan(columnArr, lazySeekEnabled, rowRange[0], rowRange[1],
179                maxVersions);
180          }
181        }
182      }
183    }
184
185    final double seekSavings = 1 - totalSeekLazy * 1.0 / totalSeekDiligent;
186    System.err.println("For bloom=" + bloomType + ", compr=" + comprAlgo +
187        " total seeks without optimization: " + totalSeekDiligent
188        + ", with optimization: " + totalSeekLazy + " (" +
189        String.format("%.2f%%", totalSeekLazy * 100.0 / totalSeekDiligent) +
190        "), savings: " + String.format("%.2f%%",
191            100.0 * seekSavings) + "\n");
192
193    // Test that lazy seeks are buying us something. Without the actual
194    // implementation of the lazy seek optimization this will be 0.
195    final double expectedSeekSavings = 0.0;
196    assertTrue("Lazy seek is only saving " +
197        String.format("%.2f%%", seekSavings * 100) + " seeks but should " +
198        "save at least " + String.format("%.2f%%", expectedSeekSavings * 100),
199        seekSavings >= expectedSeekSavings);
200  }
201
202  private void testScan(final int[] columnArr, final boolean lazySeekEnabled,
203      final int startRow, final int endRow, int maxVersions)
204      throws IOException {
205    StoreScanner.enableLazySeekGlobally(lazySeekEnabled);
206    final Scan scan = new Scan();
207    final Set<String> qualSet = new HashSet<>();
208    for (int iColumn : columnArr) {
209      String qualStr = getQualStr(iColumn);
210      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualStr));
211      qualSet.add(qualStr);
212    }
213    scan.setMaxVersions(maxVersions);
214    scan.setStartRow(rowBytes(startRow));
215
216    // Adjust for the fact that for multi-row queries the end row is exclusive.
217    {
218      final byte[] scannerStopRow =
219          rowBytes(endRow + (startRow != endRow ? 1 : 0));
220      scan.setStopRow(scannerStopRow);
221    }
222
223    final long initialSeekCount = StoreFileScanner.getSeekCount();
224    final InternalScanner scanner = region.getScanner(scan);
225    final List<Cell> results = new ArrayList<>();
226    final List<Cell> actualKVs = new ArrayList<>();
227
228    // Such a clumsy do-while loop appears to be the official way to use an
229    // internalScanner. scanner.next() return value refers to the _next_
230    // result, not to the one already returned in results.
231    boolean hasNext;
232    do {
233      hasNext = scanner.next(results);
234      actualKVs.addAll(results);
235      results.clear();
236    } while (hasNext);
237
238    List<Cell> filteredKVs = filterExpectedResults(qualSet,
239        rowBytes(startRow), rowBytes(endRow), maxVersions);
240    final String rowRestrictionStr =
241        (startRow == -1 && endRow == -1) ? "all rows" : (
242            startRow == endRow ? ("row=" + startRow) : ("startRow="
243            + startRow + ", " + "endRow=" + endRow));
244    final String columnRestrictionStr =
245        columnArr.length == 0 ? "all columns"
246            : ("columns=" + Arrays.toString(columnArr));
247    final String testDesc =
248        "Bloom=" + bloomType + ", compr=" + comprAlgo + ", "
249            + (scan.isGetScan() ? "Get" : "Scan") + ": "
250            + columnRestrictionStr + ", " + rowRestrictionStr
251            + ", maxVersions=" + maxVersions + ", lazySeek=" + lazySeekEnabled;
252    long seekCount = StoreFileScanner.getSeekCount() - initialSeekCount;
253    if (VERBOSE) {
254      System.err.println("Seek count: " + seekCount + ", KVs returned: "
255        + actualKVs.size() + ". " + testDesc +
256        (lazySeekEnabled ? "\n" : ""));
257    }
258    if (lazySeekEnabled) {
259      totalSeekLazy += seekCount;
260    } else {
261      totalSeekDiligent += seekCount;
262    }
263    assertKVListsEqual(testDesc, filteredKVs, actualKVs);
264  }
265
266  private List<Cell> filterExpectedResults(Set<String> qualSet,
267      byte[] startRow, byte[] endRow, int maxVersions) {
268    final List<Cell> filteredKVs = new ArrayList<>();
269    final Map<String, Integer> verCount = new HashMap<>();
270    for (Cell kv : expectedKVs) {
271      if (startRow.length > 0 &&
272          Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
273              startRow, 0, startRow.length) < 0) {
274        continue;
275      }
276
277      // In this unit test the end row is always inclusive.
278      if (endRow.length > 0 &&
279          Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
280              endRow, 0, endRow.length) > 0) {
281        continue;
282      }
283
284      if (!qualSet.isEmpty() && (!CellUtil.matchingFamily(kv, FAMILY_BYTES)
285          || !qualSet.contains(Bytes.toString(CellUtil.cloneQualifier(kv))))) {
286        continue;
287      }
288
289      final String rowColStr =
290        Bytes.toStringBinary(CellUtil.cloneRow(kv)) + "/"
291            + Bytes.toStringBinary(CellUtil.cloneFamily(kv)) + ":"
292            + Bytes.toStringBinary(CellUtil.cloneQualifier(kv));
293      final Integer curNumVer = verCount.get(rowColStr);
294      final int newNumVer = curNumVer != null ? (curNumVer + 1) : 1;
295      if (newNumVer <= maxVersions) {
296        filteredKVs.add(kv);
297        verCount.put(rowColStr, newNumVer);
298      }
299    }
300
301    return filteredKVs;
302  }
303
304  private void prepareExpectedKVs(long latestDelTS) {
305    final List<Cell> filteredKVs = new ArrayList<>();
306    for (Cell kv : expectedKVs) {
307      if (kv.getTimestamp() > latestDelTS || latestDelTS == -1) {
308        filteredKVs.add(kv);
309      }
310    }
311    expectedKVs = filteredKVs;
312    Collections.sort(expectedKVs, CellComparatorImpl.COMPARATOR);
313  }
314
315  public void put(String qual, long ts) {
316    if (!putTimestamps.contains(ts)) {
317      put.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts, createValue(ts));
318      putTimestamps.add(ts);
319    }
320    if (VERBOSE) {
321      LOG.info("put: row " + Bytes.toStringBinary(put.getRow())
322          + ", cf " + FAMILY + ", qualifier " + qual + ", ts " + ts);
323    }
324  }
325
326  private byte[] createValue(long ts) {
327    return Bytes.toBytes("value" + ts);
328  }
329
330  public void delAtTimestamp(String qual, long ts) {
331    del.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts);
332    logDelete(qual, ts, "at");
333  }
334
335  private void logDelete(String qual, long ts, String delType) {
336    if (VERBOSE) {
337      LOG.info("del " + delType + ": row "
338          + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY
339          + ", qualifier " + qual + ", ts " + ts);
340    }
341  }
342
343  private void delUpToTimestamp(String qual, long upToTS) {
344    del.addColumns(FAMILY_BYTES, Bytes.toBytes(qual), upToTS);
345    logDelete(qual, upToTS, "up to and including");
346  }
347
348  private long randLong(long n) {
349    long l = rand.nextLong();
350    if (l == Long.MIN_VALUE)
351      l = Long.MAX_VALUE;
352    return Math.abs(l) % n;
353  }
354
355  private long randBetween(long a, long b) {
356    long x = a + randLong(b - a + 1);
357    assertTrue(a <= x && x <= b);
358    return x;
359  }
360
361  private final String rowStr(int i) {
362    return ("row" + i).intern();
363  }
364
365  private final byte[] rowBytes(int i) {
366    if (i == -1) {
367      return HConstants.EMPTY_BYTE_ARRAY;
368    }
369    return Bytes.toBytes(rowStr(i));
370  }
371
372  private final String getQualStr(int i) {
373    return ("qual" + i).intern();
374  }
375
376  public void createTimestampRange(long minTS, long maxTS,
377      long deleteUpToTS) throws IOException {
378    assertTrue(minTS < maxTS);
379    assertTrue(deleteUpToTS == -1
380        || (minTS <= deleteUpToTS && deleteUpToTS <= maxTS));
381
382    for (int iRow = 0; iRow < NUM_ROWS; ++iRow) {
383      final String row = rowStr(iRow);
384      final byte[] rowBytes = Bytes.toBytes(row);
385      for (int iCol = 0; iCol < NUM_COLS; ++iCol) {
386        final String qual = getQualStr(iCol);
387        final byte[] qualBytes = Bytes.toBytes(qual);
388        put = new Put(rowBytes);
389
390        putTimestamps.clear();
391        put(qual, minTS);
392        put(qual, maxTS);
393        for (int i = 0; i < PUTS_PER_ROW_COL; ++i) {
394          put(qual, randBetween(minTS, maxTS));
395        }
396
397        long[] putTimestampList = new long[putTimestamps.size()];
398        {
399          int i = 0;
400          for (long ts : putTimestamps) {
401            putTimestampList[i++] = ts;
402          }
403        }
404
405        // Delete a predetermined number of particular timestamps
406        delTimestamps.clear();
407        assertTrue(putTimestampList.length >= DELETES_PER_ROW_COL);
408        int numToDel = DELETES_PER_ROW_COL;
409        int tsRemaining = putTimestampList.length;
410        del = new Delete(rowBytes);
411        for (long ts : putTimestampList) {
412          if (rand.nextInt(tsRemaining) < numToDel) {
413            delAtTimestamp(qual, ts);
414            putTimestamps.remove(ts);
415            --numToDel;
416          }
417
418          if (--tsRemaining == 0) {
419            break;
420          }
421        }
422
423        // Another type of delete: everything up to the given timestamp.
424        if (deleteUpToTS != -1) {
425          delUpToTimestamp(qual, deleteUpToTS);
426        }
427
428        region.put(put);
429        if (!del.isEmpty()) {
430          region.delete(del);
431        }
432
433        // Add remaining timestamps (those we have not deleted) to expected
434        // results
435        for (long ts : putTimestamps) {
436          expectedKVs.add(new KeyValue(rowBytes, FAMILY_BYTES, qualBytes, ts,
437              KeyValue.Type.Put));
438        }
439      }
440    }
441
442    region.flush(true);
443  }
444
445  @After
446  public void tearDown() throws IOException {
447    if (region != null) {
448      HBaseTestingUtility.closeRegionAndWAL(region);
449    }
450
451    // We have to re-set the lazy seek flag back to the default so that other
452    // unit tests are not affected.
453    StoreScanner.enableLazySeekGlobally(
454        StoreScanner.LAZY_SEEK_ENABLED_BY_DEFAULT);
455  }
456
457
458  public void assertKVListsEqual(String additionalMsg,
459      final List<? extends Cell> expected,
460      final List<? extends Cell> actual) {
461    final int eLen = expected.size();
462    final int aLen = actual.size();
463    final int minLen = Math.min(eLen, aLen);
464
465    int i;
466    for (i = 0; i < minLen
467        && PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR, expected.get(i),
468          actual.get(i)) == 0; ++i) {
469    }
470
471    if (additionalMsg == null) {
472      additionalMsg = "";
473    }
474    if (!additionalMsg.isEmpty()) {
475      additionalMsg = ". " + additionalMsg;
476    }
477
478    if (eLen != aLen || i != minLen) {
479      throw new AssertionError(
480          "Expected and actual KV arrays differ at position " + i + ": " +
481          HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
482          HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
483    }
484  }
485
486}
487