001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Random;
031import java.util.Set;
032import java.util.stream.Stream;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparatorImpl;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.PrivateCellUtil;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.io.compress.Compression;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.BloomFilterUtil;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.junit.jupiter.api.AfterEach;
052import org.junit.jupiter.api.BeforeEach;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.TestTemplate;
055import org.junit.jupiter.params.provider.Arguments;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Test various seek optimizations for correctness and check if they are actually saving I/O
061 * operations.
062 */
063@Tag(RegionServerTests.TAG)
064@Tag(MediumTests.TAG)
065@HBaseParameterizedTestTemplate(name = "{index}: comprAlgo={0}, bloomType={1}")
066public class TestSeekOptimizations {
067
068  private static final Logger LOG = LoggerFactory.getLogger(TestSeekOptimizations.class);
069
070  // Constants
071  private static final String FAMILY = "myCF";
072  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
073
074  private static final int PUTS_PER_ROW_COL = 50;
075  private static final int DELETES_PER_ROW_COL = 10;
076
077  private static final int NUM_ROWS = 3;
078  private static final int NUM_COLS = 3;
079
080  private static final boolean VERBOSE = false;
081
082  /**
083   * Disable this when this test fails hopelessly and you need to debug a simpler case.
084   */
085  private static final boolean USE_MANY_STORE_FILES = true;
086
087  private static final int[][] COLUMN_SETS = new int[][] { {}, // All columns
088    { 0 }, { 1 }, { 0, 2 }, { 1, 2 }, { 0, 1, 2 }, };
089
090  // Both start row and end row are inclusive here for the purposes of this
091  // test.
092  private static final int[][] ROW_RANGES =
093    new int[][] { { -1, -1 }, { 0, 1 }, { 1, 1 }, { 1, 2 }, { 0, 2 } };
094
095  private static final int[] MAX_VERSIONS_VALUES = new int[] { 1, 2 };
096
097  // Instance variables
098  private HRegion region;
099  private Put put;
100  private Delete del;
101  private Set<Long> putTimestamps = new HashSet<>();
102  private Set<Long> delTimestamps = new HashSet<>();
103  private List<Cell> expectedKVs = new ArrayList<>();
104
105  private Compression.Algorithm comprAlgo;
106  private BloomType bloomType;
107
108  private long totalSeekDiligent, totalSeekLazy;
109
110  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
111  private static final Random RNG = new Random(); // This test depends on Random#setSeed
112
113  public static Stream<Arguments> parameters() {
114    return HBaseTestingUtil.BLOOM_AND_COMPRESSION_COMBINATIONS.stream().map(Arguments::of);
115  }
116
117  public TestSeekOptimizations(Compression.Algorithm comprAlgo, BloomType bloomType) {
118    this.comprAlgo = comprAlgo;
119    this.bloomType = bloomType;
120  }
121
122  @BeforeEach
123  public void setUp() {
124    RNG.setSeed(91238123L);
125    expectedKVs.clear();
126    TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
127  }
128
129  @TestTemplate
130  public void testMultipleTimestampRanges() throws IOException {
131    // enable seek counting
132    StoreFileScanner.instrument();
133    ColumnFamilyDescriptor columnFamilyDescriptor =
134      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setCompressionType(comprAlgo)
135        .setBloomFilterType(bloomType).setMaxVersions(3).build();
136
137    region = TEST_UTIL.createTestRegion("testMultipleTimestampRanges", columnFamilyDescriptor);
138
139    // Delete the given timestamp and everything before.
140    final long latestDelTS = USE_MANY_STORE_FILES ? 1397 : -1;
141
142    createTimestampRange(1, 50, -1);
143    createTimestampRange(51, 100, -1);
144    if (USE_MANY_STORE_FILES) {
145      createTimestampRange(100, 500, 127);
146      createTimestampRange(900, 1300, -1);
147      createTimestampRange(1301, 2500, latestDelTS);
148      createTimestampRange(2502, 2598, -1);
149      createTimestampRange(2599, 2999, -1);
150    }
151
152    prepareExpectedKVs(latestDelTS);
153
154    for (int[] columnArr : COLUMN_SETS) {
155      for (int[] rowRange : ROW_RANGES) {
156        for (int maxVersions : MAX_VERSIONS_VALUES) {
157          for (boolean lazySeekEnabled : new boolean[] { false, true }) {
158            testScan(columnArr, lazySeekEnabled, rowRange[0], rowRange[1], maxVersions);
159          }
160        }
161      }
162    }
163
164    final double seekSavings = 1 - totalSeekLazy * 1.0 / totalSeekDiligent;
165    System.err.println("For bloom=" + bloomType + ", compr=" + comprAlgo
166      + " total seeks without optimization: " + totalSeekDiligent + ", with optimization: "
167      + totalSeekLazy + " (" + String.format("%.2f%%", totalSeekLazy * 100.0 / totalSeekDiligent)
168      + "), savings: " + String.format("%.2f%%", 100.0 * seekSavings) + "\n");
169
170    // Test that lazy seeks are buying us something. Without the actual
171    // implementation of the lazy seek optimization this will be 0.
172    final double expectedSeekSavings = 0.0;
173    assertTrue(seekSavings >= expectedSeekSavings,
174      "Lazy seek is only saving " + String.format("%.2f%%", seekSavings * 100)
175        + " seeks but should " + "save at least "
176        + String.format("%.2f%%", expectedSeekSavings * 100));
177  }
178
179  private void testScan(final int[] columnArr, final boolean lazySeekEnabled, final int startRow,
180    final int endRow, int maxVersions) throws IOException {
181    StoreScanner.enableLazySeekGlobally(lazySeekEnabled);
182    final Scan scan = new Scan();
183    final Set<String> qualSet = new HashSet<>();
184    for (int iColumn : columnArr) {
185      String qualStr = getQualStr(iColumn);
186      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualStr));
187      qualSet.add(qualStr);
188    }
189    scan.readVersions(maxVersions);
190    scan.withStartRow(rowBytes(startRow));
191
192    // Adjust for the fact that for multi-row queries the end row is exclusive.
193    if (startRow != endRow) {
194      scan.withStopRow(rowBytes(endRow + 1));
195    } else {
196      scan.withStopRow(rowBytes(endRow), true);
197    }
198
199    final long initialSeekCount = StoreFileScanner.getSeekCount();
200    final InternalScanner scanner = region.getScanner(scan);
201    final List<Cell> results = new ArrayList<>();
202    final List<Cell> actualKVs = new ArrayList<>();
203
204    // Such a clumsy do-while loop appears to be the official way to use an
205    // internalScanner. scanner.next() return value refers to the _next_
206    // result, not to the one already returned in results.
207    boolean hasNext;
208    do {
209      hasNext = scanner.next(results);
210      actualKVs.addAll(results);
211      results.clear();
212    } while (hasNext);
213
214    List<Cell> filteredKVs =
215      filterExpectedResults(qualSet, rowBytes(startRow), rowBytes(endRow), maxVersions);
216    final String rowRestrictionStr = (startRow == -1 && endRow == -1)
217      ? "all rows"
218      : (startRow == endRow
219        ? ("row=" + startRow)
220        : ("startRow=" + startRow + ", " + "endRow=" + endRow));
221    final String columnRestrictionStr =
222      columnArr.length == 0 ? "all columns" : ("columns=" + Arrays.toString(columnArr));
223    final String testDesc = "Bloom=" + bloomType + ", compr=" + comprAlgo + ", "
224      + (scan.isGetScan() ? "Get" : "Scan") + ": " + columnRestrictionStr + ", " + rowRestrictionStr
225      + ", maxVersions=" + maxVersions + ", lazySeek=" + lazySeekEnabled;
226    long seekCount = StoreFileScanner.getSeekCount() - initialSeekCount;
227    if (VERBOSE) {
228      System.err.println("Seek count: " + seekCount + ", KVs returned: " + actualKVs.size() + ". "
229        + testDesc + (lazySeekEnabled ? "\n" : ""));
230    }
231    if (lazySeekEnabled) {
232      totalSeekLazy += seekCount;
233    } else {
234      totalSeekDiligent += seekCount;
235    }
236    assertKVListsEqual(testDesc, filteredKVs, actualKVs);
237  }
238
239  private List<Cell> filterExpectedResults(Set<String> qualSet, byte[] startRow, byte[] endRow,
240    int maxVersions) {
241    final List<Cell> filteredKVs = new ArrayList<>();
242    final Map<String, Integer> verCount = new HashMap<>();
243    for (Cell kv : expectedKVs) {
244      if (
245        startRow.length > 0 && Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(),
246          kv.getRowLength(), startRow, 0, startRow.length) < 0
247      ) {
248        continue;
249      }
250
251      // In this unit test the end row is always inclusive.
252      if (
253        endRow.length > 0 && Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
254          endRow, 0, endRow.length) > 0
255      ) {
256        continue;
257      }
258
259      if (
260        !qualSet.isEmpty() && (!CellUtil.matchingFamily(kv, FAMILY_BYTES)
261          || !qualSet.contains(Bytes.toString(CellUtil.cloneQualifier(kv))))
262      ) {
263        continue;
264      }
265
266      final String rowColStr = Bytes.toStringBinary(CellUtil.cloneRow(kv)) + "/"
267        + Bytes.toStringBinary(CellUtil.cloneFamily(kv)) + ":"
268        + Bytes.toStringBinary(CellUtil.cloneQualifier(kv));
269      final Integer curNumVer = verCount.get(rowColStr);
270      final int newNumVer = curNumVer != null ? (curNumVer + 1) : 1;
271      if (newNumVer <= maxVersions) {
272        filteredKVs.add(kv);
273        verCount.put(rowColStr, newNumVer);
274      }
275    }
276
277    return filteredKVs;
278  }
279
280  private void prepareExpectedKVs(long latestDelTS) {
281    final List<Cell> filteredKVs = new ArrayList<>();
282    for (Cell kv : expectedKVs) {
283      if (kv.getTimestamp() > latestDelTS || latestDelTS == -1) {
284        filteredKVs.add(kv);
285      }
286    }
287    expectedKVs = filteredKVs;
288    Collections.sort(expectedKVs, CellComparatorImpl.COMPARATOR);
289  }
290
291  public void put(String qual, long ts) {
292    if (!putTimestamps.contains(ts)) {
293      put.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts, createValue(ts));
294      putTimestamps.add(ts);
295    }
296    if (VERBOSE) {
297      LOG.info("put: row " + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY + ", qualifier "
298        + qual + ", ts " + ts);
299    }
300  }
301
302  private byte[] createValue(long ts) {
303    return Bytes.toBytes("value" + ts);
304  }
305
306  public void delAtTimestamp(String qual, long ts) {
307    del.addColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts);
308    logDelete(qual, ts, "at");
309  }
310
311  private void logDelete(String qual, long ts, String delType) {
312    if (VERBOSE) {
313      LOG.info("del " + delType + ": row " + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY
314        + ", qualifier " + qual + ", ts " + ts);
315    }
316  }
317
318  private void delUpToTimestamp(String qual, long upToTS) {
319    del.addColumns(FAMILY_BYTES, Bytes.toBytes(qual), upToTS);
320    logDelete(qual, upToTS, "up to and including");
321  }
322
323  private long randLong(long n) {
324    long l = RNG.nextLong();
325    if (l == Long.MIN_VALUE) l = Long.MAX_VALUE;
326    return Math.abs(l) % n;
327  }
328
329  private long randBetween(long a, long b) {
330    long x = a + randLong(b - a + 1);
331    assertTrue(a <= x && x <= b);
332    return x;
333  }
334
335  private final String rowStr(int i) {
336    return ("row" + i).intern();
337  }
338
339  private final byte[] rowBytes(int i) {
340    if (i == -1) {
341      return HConstants.EMPTY_BYTE_ARRAY;
342    }
343    return Bytes.toBytes(rowStr(i));
344  }
345
346  private final String getQualStr(int i) {
347    return ("qual" + i).intern();
348  }
349
350  public void createTimestampRange(long minTS, long maxTS, long deleteUpToTS) throws IOException {
351    assertTrue(minTS < maxTS);
352    assertTrue(deleteUpToTS == -1 || (minTS <= deleteUpToTS && deleteUpToTS <= maxTS));
353
354    for (int iRow = 0; iRow < NUM_ROWS; ++iRow) {
355      final String row = rowStr(iRow);
356      final byte[] rowBytes = Bytes.toBytes(row);
357      for (int iCol = 0; iCol < NUM_COLS; ++iCol) {
358        final String qual = getQualStr(iCol);
359        final byte[] qualBytes = Bytes.toBytes(qual);
360        put = new Put(rowBytes);
361
362        putTimestamps.clear();
363        put(qual, minTS);
364        put(qual, maxTS);
365        for (int i = 0; i < PUTS_PER_ROW_COL; ++i) {
366          put(qual, randBetween(minTS, maxTS));
367        }
368
369        long[] putTimestampList = new long[putTimestamps.size()];
370        {
371          int i = 0;
372          for (long ts : putTimestamps) {
373            putTimestampList[i++] = ts;
374          }
375        }
376
377        // Delete a predetermined number of particular timestamps
378        delTimestamps.clear();
379        assertTrue(putTimestampList.length >= DELETES_PER_ROW_COL);
380        int numToDel = DELETES_PER_ROW_COL;
381        int tsRemaining = putTimestampList.length;
382        del = new Delete(rowBytes);
383        for (long ts : putTimestampList) {
384          if (RNG.nextInt(tsRemaining) < numToDel) {
385            delAtTimestamp(qual, ts);
386            putTimestamps.remove(ts);
387            --numToDel;
388          }
389
390          if (--tsRemaining == 0) {
391            break;
392          }
393        }
394
395        // Another type of delete: everything up to the given timestamp.
396        if (deleteUpToTS != -1) {
397          delUpToTimestamp(qual, deleteUpToTS);
398        }
399
400        region.put(put);
401        if (!del.isEmpty()) {
402          region.delete(del);
403        }
404
405        // Add remaining timestamps (those we have not deleted) to expected
406        // results
407        for (long ts : putTimestamps) {
408          expectedKVs.add(new KeyValue(rowBytes, FAMILY_BYTES, qualBytes, ts, KeyValue.Type.Put));
409        }
410      }
411    }
412
413    region.flush(true);
414  }
415
416  @AfterEach
417  public void tearDown() throws IOException {
418    if (region != null) {
419      HBaseTestingUtil.closeRegionAndWAL(region);
420    }
421
422    // We have to re-set the lazy seek flag back to the default so that other
423    // unit tests are not affected.
424    StoreScanner.enableLazySeekGlobally(StoreScanner.LAZY_SEEK_ENABLED_BY_DEFAULT);
425  }
426
427  public void assertKVListsEqual(String additionalMsg, final List<? extends Cell> expected,
428    final List<? extends Cell> actual) {
429    final int eLen = expected.size();
430    final int aLen = actual.size();
431    final int minLen = Math.min(eLen, aLen);
432
433    int i;
434    for (i = 0; i < minLen && PrivateCellUtil.compareKeyIgnoresMvcc(CellComparatorImpl.COMPARATOR,
435      expected.get(i), actual.get(i)) == 0; ++i) {
436    }
437
438    if (additionalMsg == null) {
439      additionalMsg = "";
440    }
441    if (!additionalMsg.isEmpty()) {
442      additionalMsg = ". " + additionalMsg;
443    }
444
445    if (eLen != aLen || i != minLen) {
446      throw new AssertionError("Expected and actual KV arrays differ at position " + i + ": "
447        + HBaseTestingUtil.safeGetAsStr(expected, i) + " (length " + eLen + ") vs. "
448        + HBaseTestingUtil.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
449    }
450  }
451}