001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertThrows;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellComparatorImpl;
033import org.apache.hadoop.hbase.ExtendedCell;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.testclassification.RegionServerTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.CollectionBackedScanner;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041
042@Tag(RegionServerTests.TAG)
043@Tag(SmallTests.TAG)
044public class TestKeyValueHeap {
045
046  private byte[] row1 = Bytes.toBytes("row1");
047  private byte[] fam1 = Bytes.toBytes("fam1");
048  private byte[] col1 = Bytes.toBytes("col1");
049  private byte[] data = Bytes.toBytes("data");
050
051  private byte[] row2 = Bytes.toBytes("row2");
052  private byte[] fam2 = Bytes.toBytes("fam2");
053  private byte[] col2 = Bytes.toBytes("col2");
054
055  private byte[] col3 = Bytes.toBytes("col3");
056  private byte[] col4 = Bytes.toBytes("col4");
057  private byte[] col5 = Bytes.toBytes("col5");
058
059  // Variable name encoding. kv<row#><fam#><col#>
060  ExtendedCell kv111 = new KeyValue(row1, fam1, col1, data);
061  ExtendedCell kv112 = new KeyValue(row1, fam1, col2, data);
062  ExtendedCell kv113 = new KeyValue(row1, fam1, col3, data);
063  ExtendedCell kv114 = new KeyValue(row1, fam1, col4, data);
064  ExtendedCell kv115 = new KeyValue(row1, fam1, col5, data);
065  ExtendedCell kv121 = new KeyValue(row1, fam2, col1, data);
066  ExtendedCell kv122 = new KeyValue(row1, fam2, col2, data);
067  ExtendedCell kv211 = new KeyValue(row2, fam1, col1, data);
068  ExtendedCell kv212 = new KeyValue(row2, fam1, col2, data);
069  ExtendedCell kv213 = new KeyValue(row2, fam1, col3, data);
070
071  TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
072  TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
073  TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
074
075  List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3));
076
077  /*
078   * Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned Cells
079   * are same as {@code expected}.
080   * @return List of Cells returned from scanners.
081   */
082  public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
083    throws IOException {
084    // Creating KeyValueHeap
085    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
086      List<Cell> actual = new ArrayList<>();
087      while (kvh.peek() != null) {
088        actual.add(kvh.next());
089      }
090
091      assertEquals(expected, actual);
092      return actual;
093    }
094  }
095
096  @Test
097  public void testSorted() throws IOException {
098    // Cases that need to be checked are:
099    // 1. The "smallest" Cell is in the same scanners as current
100    // 2. Current scanner gets empty
101
102    List<Cell> expected =
103      Arrays.asList(kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
104
105    List<Cell> actual = assertCells(expected, scanners);
106
107    // Check if result is sorted according to Comparator
108    for (int i = 0; i < actual.size() - 1; i++) {
109      int ret = CellComparatorImpl.COMPARATOR.compare(actual.get(i), actual.get(i + 1));
110      assertTrue(ret < 0);
111    }
112  }
113
114  @Test
115  public void testSeek() throws IOException {
116    // Cases:
117    // 1. Seek Cell that is not in scanner
118    // 2. Check that smallest that is returned from a seek is correct
119    List<Cell> expected = Arrays.asList(kv211);
120
121    // Creating KeyValueHeap
122    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
123      ExtendedCell seekKv = new KeyValue(row2, fam1, null, null);
124      kvh.seek(seekKv);
125
126      List<Cell> actual = Arrays.asList(kvh.peek());
127
128      assertEquals(expected, actual, "Expected = " + Arrays.toString(expected.toArray())
129        + "\n Actual = " + Arrays.toString(actual.toArray()));
130    }
131  }
132
133  @Test
134  public void testScannerLeak() throws IOException {
135    // Test for unclosed scanners (HBASE-1927)
136
137    TestScanner s4 = new TestScanner(new ArrayList<>());
138    scanners.add(s4);
139
140    // Creating KeyValueHeap
141    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
142      for (;;) {
143        if (kvh.next() == null) {
144          break;
145        }
146      }
147      // Once the internal scanners go out of Cells, those will be removed from KVHeap's priority
148      // queue and added to a Set for lazy close. The actual close will happen only on
149      // KVHeap#close()
150      assertEquals(4, kvh.scannersForDelayedClose.size());
151      assertTrue(kvh.scannersForDelayedClose.contains(s1));
152      assertTrue(kvh.scannersForDelayedClose.contains(s2));
153      assertTrue(kvh.scannersForDelayedClose.contains(s3));
154      assertTrue(kvh.scannersForDelayedClose.contains(s4));
155    }
156
157    for (KeyValueScanner scanner : scanners) {
158      assertTrue(((TestScanner) scanner).isClosed());
159    }
160  }
161
162  @Test
163  public void testScannerException() throws IOException {
164    // Test for NPE issue when exception happens in scanners (HBASE-13835)
165
166    TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
167    TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
168    TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
169    TestScanner s4 = new SeekTestScanner(new ArrayList<>());
170
171    List<KeyValueScanner> scanners = new ArrayList<>(Arrays.asList(s1, s2, s3, s4));
172
173    // Creating KeyValueHeap
174    try (KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR)) {
175      for (KeyValueScanner scanner : scanners) {
176        ((SeekTestScanner) scanner).setRealSeekDone(false);
177      }
178      // The pollRealKV should throw IOE.
179      assertThrows(IOException.class, () -> {
180        for (;;) {
181          if (kvh.next() == null) {
182            break;
183          }
184        }
185      });
186    }
187    // It implies there is no NPE thrown from kvh.close() if getting here
188    for (KeyValueScanner scanner : scanners) {
189      // Verify that close is called and only called once for each scanner
190      assertTrue(((SeekTestScanner) scanner).isClosed());
191      assertEquals(1, ((SeekTestScanner) scanner).getClosedNum());
192    }
193  }
194
195  @Test
196  public void testPriorityId() throws IOException {
197    ExtendedCell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
198    ExtendedCell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
199    TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
200    TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
201    List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
202    assertCells(expected, Arrays.asList(scan1, scan2));
203
204    scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
205    scan2 = new TestScanner(Arrays.asList(kv113B), 1);
206    expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
207    assertCells(expected, Arrays.asList(scan1, scan2));
208  }
209
210  @Test
211  public void testGetFilesRead() throws IOException {
212    // Create test scanners with file paths
213    Path file1 = new Path("/test/file1");
214    Path file2 = new Path("/test/file2");
215    Path file3 = new Path("/test/file3");
216
217    FileTrackingScanner scanner1 =
218      new FileTrackingScanner(Arrays.asList(kv115, kv211, kv212), file1);
219    FileTrackingScanner scanner2 = new FileTrackingScanner(Arrays.asList(kv111, kv112), file2);
220    FileTrackingScanner scanner3 =
221      new FileTrackingScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213), file3);
222
223    // Add a non-file-based scanner (e.g., memstore scanner) that doesn't return files
224    TestScanner memStoreScanner = new TestScanner(Arrays.asList(kv114));
225
226    List<KeyValueScanner> scanners =
227      new ArrayList<>(Arrays.asList(scanner1, scanner2, scanner3, memStoreScanner));
228
229    // Create KeyValueHeap and scan through all cells
230    KeyValueHeap keyValueHeap = new KeyValueHeap(scanners, CellComparatorImpl.COMPARATOR);
231
232    // Before closing, should return empty set even after scanning
233    // Scan through all cells first
234    while (keyValueHeap.peek() != null) {
235      keyValueHeap.next();
236    }
237
238    // Verify that before closing, files are not returned
239    Set<Path> filesReadBeforeClose = keyValueHeap.getFilesRead();
240    assertTrue(filesReadBeforeClose.isEmpty(), "Should return empty set before closing heap");
241    assertEquals(0, filesReadBeforeClose.size(), "Should have 0 files before closing");
242
243    // Now close the heap
244    keyValueHeap.close();
245
246    // After closing, should return all files from file-based scanners only
247    // Non-file-based scanners (like memstore) should not contribute files
248    Set<Path> filesReadAfterClose = keyValueHeap.getFilesRead();
249    assertEquals(3, filesReadAfterClose.size(),
250      "Should return set with 3 file paths after closing (excluding non-file scanner)");
251    assertTrue(filesReadAfterClose.contains(file1), "Should contain file1");
252    assertTrue(filesReadAfterClose.contains(file2), "Should contain file2");
253    assertTrue(filesReadAfterClose.contains(file3), "Should contain file3");
254
255    // Verify that non-file-based scanner doesn't contribute any files
256    // (memStoreScanner.getFilesRead() should return empty set)
257    Set<Path> memStoreFiles = memStoreScanner.getFilesRead();
258    assertTrue(memStoreFiles.isEmpty(), "Non-file-based scanner should return empty set");
259  }
260
261  private static class TestScanner extends CollectionBackedScanner {
262    private boolean closed = false;
263    private long scannerOrder = 0;
264
265    public TestScanner(List<ExtendedCell> list) {
266      super(list);
267    }
268
269    public TestScanner(List<ExtendedCell> list, long scannerOrder) {
270      this(list);
271      this.scannerOrder = scannerOrder;
272    }
273
274    @Override
275    public long getScannerOrder() {
276      return scannerOrder;
277    }
278
279    @Override
280    public void close() {
281      closed = true;
282    }
283
284    public boolean isClosed() {
285      return closed;
286    }
287  }
288
289  private static class SeekTestScanner extends TestScanner {
290    private int closedNum = 0;
291    private boolean realSeekDone = true;
292
293    public SeekTestScanner(List<ExtendedCell> list) {
294      super(list);
295    }
296
297    @Override
298    public void close() {
299      super.close();
300      closedNum++;
301    }
302
303    public int getClosedNum() {
304      return closedNum;
305    }
306
307    @Override
308    public boolean realSeekDone() {
309      return realSeekDone;
310    }
311
312    public void setRealSeekDone(boolean done) {
313      realSeekDone = done;
314    }
315
316    @Override
317    public void enforceSeek() throws IOException {
318      throw new IOException("enforceSeek must not be called on a " + "non-lazy scanner");
319    }
320  }
321
322  private static class FileTrackingScanner extends TestScanner {
323    private final Path filePath;
324    private boolean closed = false;
325
326    public FileTrackingScanner(List<ExtendedCell> list, Path filePath) {
327      super(list);
328      this.filePath = filePath;
329    }
330
331    @Override
332    public void close() {
333      super.close();
334      closed = true;
335    }
336
337    @Override
338    public Set<Path> getFilesRead() {
339      // Only return the file path after the scanner is closed
340      return closed ? Collections.singleton(filePath) : Collections.emptySet();
341    }
342  }
343}