001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.testclassification.RegionServerTests;
037import org.apache.hadoop.hbase.testclassification.SmallTests;
038import org.apache.hadoop.hbase.util.Bytes;
039import org.apache.hadoop.hbase.util.CollectionBackedScanner;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043
044@Category({ RegionServerTests.class, SmallTests.class })
045public class TestKeyValueHeap {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestKeyValueHeap.class);
050
051  private byte[] row1 = Bytes.toBytes("row1");
052  private byte[] fam1 = Bytes.toBytes("fam1");
053  private byte[] col1 = Bytes.toBytes("col1");
054  private byte[] data = Bytes.toBytes("data");
055
056  private byte[] row2 = Bytes.toBytes("row2");
057  private byte[] fam2 = Bytes.toBytes("fam2");
058  private byte[] col2 = Bytes.toBytes("col2");
059
060  private byte[] col3 = Bytes.toBytes("col3");
061  private byte[] col4 = Bytes.toBytes("col4");
062  private byte[] col5 = Bytes.toBytes("col5");
063
064  // Variable name encoding. kv<row#><fam#><col#>
065  ExtendedCell kv111 = new KeyValue(row1, fam1, col1, data);
066  ExtendedCell kv112 = new KeyValue(row1, fam1, col2, data);
067  ExtendedCell kv113 = new KeyValue(row1, fam1, col3, data);
068  ExtendedCell kv114 = new KeyValue(row1, fam1, col4, data);
069  ExtendedCell kv115 = new KeyValue(row1, fam1, col5, data);
070  ExtendedCell kv121 = new KeyValue(row1, fam2, col1, data);
071  ExtendedCell kv122 = new KeyValue(row1, fam2, col2, data);
072  ExtendedCell kv211 = new KeyValue(row2, fam1, col1, data);
073  ExtendedCell kv212 = new KeyValue(row2, fam1, col2, data);
074  ExtendedCell kv213 = new KeyValue(row2, fam1, col3, data);
075
076  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
077  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
078  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
079
080  List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3));
081
082  /*
083   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells
084   * are same as {@code expected}.
085   * @return List of Cells returned from scanners.
086   */
087  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
088    throws IOException {
089    // Creating KeyValueHeap
090    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
091      List<Cell> actual = new ArrayList<>();
092      while (kvh.peek() != null) {
093        actual.add(kvh.next());
094      }
095
096      assertEquals(expected, actual);
097      return actual;
098    }
099  }
100
101  @Test
102  public void testSorted() throws IOException {
103    // Cases that need to be checked are:
104    // 1. The "smallest" Cell is in the same scanners as current
105    // 2. Current scanner gets empty
106
107    List<Cell> expected =
108      Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
109
110    List<Cell> actual = assertCells(expected, scanners);
111
112    // Check if result is sorted according to Comparator
113    for (int i = 0; i < actual.size() - 1; i++) {
114      int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1));
115      assertTrue(ret < 0);
116    }
117  }
118
119  @Test
120  public void testSeek() throws IOException {
121    // Cases:
122    // 1. Seek Cell that is not in scanner
123    // 2. Check that smallest that is returned from a seek is correct
124    List<Cell> expected = Arrays.asList(kv211);
125
126    // Creating KeyValueHeap
127    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
128      ExtendedCell seekKv = new KeyValue(row2, fam1, null, null);
129      kvh.seek(seekKv);
130
131      List<Cell> actual = Arrays.asList(kvh.peek());
132
133      assertEquals("Expected = " + Arrays.toString(expected.toArray()) + "\n Actual = "
134        + Arrays.toString(actual.toArray()), expected, actual);
135    }
136  }
137
138  @Test
139  public void testScannerLeak() throws IOException {
140    // Test for unclosed scanners (HBASE-1927)
141
142    TestScanner s4 = new TestScanner(new ArrayList<>());
143    scanners.add(s4);
144
145    // Creating KeyValueHeap
146    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
147      for (;;) {
148        if (kvh.next() == null) {
149          break;
150        }
151      }
152      // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
153      // queue and added to a Set for lazy close. The actual close will happen only on
154      // KVHeap#close()
155      assertEquals(4, kvh.scannersForDelayedClose.size());
156      assertTrue(kvh.scannersForDelayedClose.contains(s1));
157      assertTrue(kvh.scannersForDelayedClose.contains(s2));
158      assertTrue(kvh.scannersForDelayedClose.contains(s3));
159      assertTrue(kvh.scannersForDelayedClose.contains(s4));
160    }
161
162    for (KeyValueScanner scanner : scanners) {
163      assertTrue(((TestScanner) scanner).isClosed());
164    }
165  }
166
167  @Test
168  public void testScannerException() throws IOException {
169    // Test for NPE issue when exception happens in scanners (HBASE-13835)
170
171    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
172    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
173    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
174    TestScanner s4 = new SeekTestScanner(new ArrayList<>());
175
176    List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4));
177
178    // Creating KeyValueHeap
179    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
180      for (KeyValueScanner scanner : scanners) {
181        ((SeekTestScanner) scanner).setRealSeekDone(false);
182      }
183      // The pollRealKV should throw IOE.
184      assertThrows(IOException.class, () -> {
185        for (;;) {
186          if (kvh.next() == null) {
187            break;
188          }
189        }
190      });
191    }
192    // It implies there is no NPE thrown from kvh.close() if getting here
193    for (KeyValueScanner scanner : scanners) {
194      // Verify that close is called and only called once for each scanner
195      assertTrue(((SeekTestScanner) scanner).isClosed());
196      assertEquals(1, ((SeekTestScanner) scanner).getClosedNum());
197    }
198  }
199
200  @Test
201  public void testPriorityId() throws IOException {
202    ExtendedCell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
203    ExtendedCell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
204    TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
205    TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
206    List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
207    assertCells(expected, Arrays.asList(scan1, scan2));
208
209    scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
210    scan2 = new TestScanner(Arrays.asList(kv113B), 1);
211    expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
212    assertCells(expected, Arrays.asList(scan1, scan2));
213  }
214
215  @Test
216  public void testGetFilesRead() throws IOException {
217    // Create test scanners with file paths
218    Path file1 = new Path("/test/file1");
219    Path file2 = new Path("/test/file2");
220    Path file3 = new Path("/test/file3");
221
222    FileTrackingScanner scanner1 =
223      new FileTrackingScanner(Arrays.asList(kv115, kv211, kv212), file1);
224    FileTrackingScanner scanner2 = new FileTrackingScanner(Arrays.asList(kv111, kv112), file2);
225    FileTrackingScanner scanner3 =
226      new FileTrackingScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213), file3);
227
228    // Add a non-file-based scanner (e.g., memstore scanner) that doesn't return files
229    TestScanner memStoreScanner = new TestScanner(Arrays.asList(kv114));
230
231    List<KeyValueScanner> scanners =
232      new ArrayList<>(Arrays.asList(scanner1, scanner2, scanner3, memStoreScanner));
233
234    // Create KeyValueHeap and scan through all cells
235    KeyValueHeap keyValueHeap = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
236
237    // Before closing, should return empty set even after scanning
238    // Scan through all cells first
239    while (keyValueHeap.peek() != null) {
240      keyValueHeap.next();
241    }
242
243    // Verify that before closing, files are not returned
244    Set<Path> filesReadBeforeClose = keyValueHeap.getFilesRead();
245    assertTrue("Should return empty set before closing heap", filesReadBeforeClose.isEmpty());
246    assertEquals("Should have 0 files before closing", 0, filesReadBeforeClose.size());
247
248    // Now close the heap
249    keyValueHeap.close();
250
251    // After closing, should return all files from file-based scanners only
252    // Non-file-based scanners (like memstore) should not contribute files
253    Set<Path> filesReadAfterClose = keyValueHeap.getFilesRead();
254    assertEquals("Should return set with 3 file paths after closing (excluding non-file scanner)",
255      3, filesReadAfterClose.size());
256    assertTrue("Should contain file1", filesReadAfterClose.contains(file1));
257    assertTrue("Should contain file2", filesReadAfterClose.contains(file2));
258    assertTrue("Should contain file3", filesReadAfterClose.contains(file3));
259
260    // Verify that non-file-based scanner doesn't contribute any files
261    // (memStoreScanner.getFilesRead() should return empty set)
262    Set<Path> memStoreFiles = memStoreScanner.getFilesRead();
263    assertTrue("Non-file-based scanner should return empty set", memStoreFiles.isEmpty());
264  }
265
266  private static class TestScanner extends CollectionBackedScanner {
267    private boolean closed = false;
268    private long scannerOrder = 0;
269
270    public TestScanner(List<ExtendedCell> list) {
271      super(list);
272    }
273
274    public TestScanner(List<ExtendedCell> list, long scannerOrder) {
275      this(list);
276      this.scannerOrder = scannerOrder;
277    }
278
279    @Override
280    public long getScannerOrder() {
281      return scannerOrder;
282    }
283
284    @Override
285    public void close() {
286      closed = true;
287    }
288
289    public boolean isClosed() {
290      return closed;
291    }
292  }
293
294  private static class SeekTestScanner extends TestScanner {
295    private int closedNum = 0;
296    private boolean realSeekDone = true;
297
298    public SeekTestScanner(List<ExtendedCell> list) {
299      super(list);
300    }
301
302    @Override
303    public void close() {
304      super.close();
305      closedNum++;
306    }
307
308    public int getClosedNum() {
309      return closedNum;
310    }
311
312    @Override
313    public boolean realSeekDone() {
314      return realSeekDone;
315    }
316
317    public void setRealSeekDone(boolean done) {
318      realSeekDone = done;
319    }
320
321    @Override
322    public void enforceSeek() throws IOException {
323      throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
324    }
325  }
326
327  private static class FileTrackingScanner extends TestScanner {
328    private final Path filePath;
329    private boolean closed = false;
330
331    public FileTrackingScanner(List<ExtendedCell> list, Path filePath) {
332      super(list);
333      this.filePath = filePath;
334    }
335
336    @Override
337    public void close() {
338      super.close();
339      closed = true;
340    }
341
342    @Override
343    public Set<Path> getFilesRead() {
344      // Only return the file path after the scanner is closed
345      return closed ? Collections.singleton(filePath) : Collections.emptySet();
346    }
347  }
348}