001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.CellUtil.createCell;
021import static org.apache.hadoop.hbase.KeyValueTestUtil.create;
022import static org.apache.hadoop.hbase.regionserver.KeyValueScanFixture.scanFixture;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.List;
033import java.util.NavigableSet;
034import java.util.OptionalInt;
035import java.util.TreeSet;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellComparator;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.KeepDeletedCells;
047import org.apache.hadoop.hbase.KeyValue;
048import org.apache.hadoop.hbase.PrivateCellUtil;
049import org.apache.hadoop.hbase.client.Get;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
052import org.apache.hadoop.hbase.testclassification.MediumTests;
053import org.apache.hadoop.hbase.testclassification.RegionServerTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.EnvironmentEdge;
056import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
057import org.junit.ClassRule;
058import org.junit.Ignore;
059import org.junit.Rule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066// Can't be small as it plays with EnvironmentEdgeManager
067@Category({RegionServerTests.class, MediumTests.class})
068public class TestStoreScanner {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072      HBaseClassTestRule.forClass(TestStoreScanner.class);
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestStoreScanner.class);
075  @Rule public TestName name = new TestName();
076  private static final String CF_STR = "cf";
077  private static final byte[] CF = Bytes.toBytes(CF_STR);
078  static Configuration CONF = HBaseConfiguration.create();
079  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
080  private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
081      KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
082
083  /**
084   * From here on down, we have a bunch of defines and specific CELL_GRID of Cells. The
085   * CELL_GRID then has a Scanner that can fake out 'block' transitions. All this elaborate
086   * setup is for tests that ensure we don't overread, and that the
087   * {@link StoreScanner#optimize(org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode,
088   * Cell)} is not overly enthusiastic.
089   */
090  private static final byte[] ZERO = new byte[] {'0'};
091  private static final byte[] ZERO_POINT_ZERO = new byte[] {'0', '.', '0'};
092  private static final byte[] ONE = new byte[] {'1'};
093  private static final byte[] TWO = new byte[] {'2'};
094  private static final byte[] TWO_POINT_TWO = new byte[] {'2', '.', '2'};
095  private static final byte[] THREE = new byte[] {'3'};
096  private static final byte[] FOUR = new byte[] {'4'};
097  private static final byte[] FIVE = new byte[] {'5'};
098  private static final byte[] VALUE = new byte[] {'v'};
099  private static final int CELL_GRID_BLOCK2_BOUNDARY = 4;
100  private static final int CELL_GRID_BLOCK3_BOUNDARY = 11;
101  private static final int CELL_GRID_BLOCK4_BOUNDARY = 15;
102  private static final int CELL_GRID_BLOCK5_BOUNDARY = 19;
103
104  /**
105   * Five rows by four columns distinguished by column qualifier (column qualifier is one of the
106   * four rows... ONE, TWO, etc.). Exceptions are a weird row after TWO; it is TWO_POINT_TWO.
107   * And then row FOUR has five columns finishing w/ row FIVE having a single column.
108   * We will use this to test scan does the right thing as it
109   * we do Gets, StoreScanner#optimize, and what we do on (faked) block boundaries.
110   */
111  private static final Cell[] CELL_GRID = new Cell [] {
112    createCell(ONE, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
113    createCell(ONE, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
114    createCell(ONE, CF, THREE, 1L, KeyValue.Type.Put.getCode(), VALUE),
115    createCell(ONE, CF, FOUR, 1L, KeyValue.Type.Put.getCode(), VALUE),
116    // Offset 4 CELL_GRID_BLOCK2_BOUNDARY
117    createCell(TWO, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
118    createCell(TWO, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
119    createCell(TWO, CF, THREE, 1L, KeyValue.Type.Put.getCode(), VALUE),
120    createCell(TWO, CF, FOUR, 1L, KeyValue.Type.Put.getCode(), VALUE),
121    createCell(TWO_POINT_TWO, CF, ZERO, 1L, KeyValue.Type.Put.getCode(), VALUE),
122    createCell(TWO_POINT_TWO, CF, ZERO_POINT_ZERO, 1L, KeyValue.Type.Put.getCode(), VALUE),
123    createCell(TWO_POINT_TWO, CF, FIVE, 1L, KeyValue.Type.Put.getCode(), VALUE),
124    // Offset 11! CELL_GRID_BLOCK3_BOUNDARY
125    createCell(THREE, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
126    createCell(THREE, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
127    createCell(THREE, CF, THREE, 1L, KeyValue.Type.Put.getCode(), VALUE),
128    createCell(THREE, CF, FOUR, 1L, KeyValue.Type.Put.getCode(), VALUE),
129    // Offset 15 CELL_GRID_BLOCK4_BOUNDARY
130    createCell(FOUR, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
131    createCell(FOUR, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
132    createCell(FOUR, CF, THREE, 1L, KeyValue.Type.Put.getCode(), VALUE),
133    createCell(FOUR, CF, FOUR, 1L, KeyValue.Type.Put.getCode(), VALUE),
134    // Offset 19 CELL_GRID_BLOCK5_BOUNDARY
135    createCell(FOUR, CF, FIVE, 1L, KeyValue.Type.Put.getCode(), VALUE),
136    createCell(FIVE, CF, ZERO, 1L, KeyValue.Type.Put.getCode(), VALUE),
137  };
138
139  private static class KeyValueHeapWithCount extends KeyValueHeap {
140
141    final AtomicInteger count;
142
143    public KeyValueHeapWithCount(List<? extends KeyValueScanner> scanners,
144        CellComparator comparator, AtomicInteger count) throws IOException {
145      super(scanners, comparator);
146      this.count = count;
147    }
148
149    @Override
150    public Cell peek() {
151      this.count.incrementAndGet();
152      return super.peek();
153    }
154  }
155
156  /**
157   * A StoreScanner for our CELL_GRID above. Fakes the block transitions. Does counts of
158   * calls to optimize and counts of when optimize actually did an optimize.
159   */
160  private static class CellGridStoreScanner extends StoreScanner {
161    // Count of how often optimize is called and of how often it does an optimize.
162    AtomicInteger count;
163    final AtomicInteger optimization = new AtomicInteger(0);
164
165    CellGridStoreScanner(final Scan scan, ScanInfo scanInfo) throws IOException {
166      super(scan, scanInfo, scan.getFamilyMap().get(CF), Arrays.<KeyValueScanner> asList(
167        new KeyValueScanner[] { new KeyValueScanFixture(CellComparator.getInstance(), CELL_GRID) }));
168    }
169
170    @Override
171    protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
172        CellComparator comparator) throws IOException {
173      if (count == null) {
174        count = new AtomicInteger(0);
175      }
176      heap = newKVHeap(scanners, comparator);
177    }
178
179    @Override
180    protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners,
181        CellComparator comparator) throws IOException {
182      return new KeyValueHeapWithCount(scanners, comparator, count);
183    }
184
185    @Override
186    protected boolean trySkipToNextRow(Cell cell) throws IOException {
187      boolean optimized = super.trySkipToNextRow(cell);
188      LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
189          + ", optimized=" + optimized);
190      if (optimized) {
191        optimization.incrementAndGet();
192      }
193      return optimized;
194    }
195
196    @Override
197    protected boolean trySkipToNextColumn(Cell cell) throws IOException {
198      boolean optimized = super.trySkipToNextColumn(cell);
199      LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
200          + ", optimized=" + optimized);
201      if (optimized) {
202        optimization.incrementAndGet();
203      }
204      return optimized;
205    }
206
207    @Override
208    public Cell getNextIndexedKey() {
209      // Fake block boundaries by having index of next block change as we go through scan.
210      return count.get() > CELL_GRID_BLOCK4_BOUNDARY?
211          PrivateCellUtil.createFirstOnRow(CELL_GRID[CELL_GRID_BLOCK5_BOUNDARY]):
212            count.get() > CELL_GRID_BLOCK3_BOUNDARY?
213                PrivateCellUtil.createFirstOnRow(CELL_GRID[CELL_GRID_BLOCK4_BOUNDARY]):
214                  count.get() > CELL_GRID_BLOCK2_BOUNDARY?
215                      PrivateCellUtil.createFirstOnRow(CELL_GRID[CELL_GRID_BLOCK3_BOUNDARY]):
216                        PrivateCellUtil.createFirstOnRow(CELL_GRID[CELL_GRID_BLOCK2_BOUNDARY]);
217    }
218  };
219
220  private static final int CELL_WITH_VERSIONS_BLOCK2_BOUNDARY = 4;
221
222  private static final Cell[] CELL_WITH_VERSIONS = new Cell [] {
223    createCell(ONE, CF, ONE, 2L, KeyValue.Type.Put.getCode(), VALUE),
224    createCell(ONE, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
225    createCell(ONE, CF, TWO, 2L, KeyValue.Type.Put.getCode(), VALUE),
226    createCell(ONE, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
227    // Offset 4 CELL_WITH_VERSIONS_BLOCK2_BOUNDARY
228    createCell(TWO, CF, ONE, 1L, KeyValue.Type.Put.getCode(), VALUE),
229    createCell(TWO, CF, TWO, 1L, KeyValue.Type.Put.getCode(), VALUE),
230  };
231
232  private static class CellWithVersionsStoreScanner extends StoreScanner {
233    // Count of how often optimize is called and of how often it does an optimize.
234    final AtomicInteger optimization = new AtomicInteger(0);
235
236    CellWithVersionsStoreScanner(final Scan scan, ScanInfo scanInfo) throws IOException {
237      super(scan, scanInfo, scan.getFamilyMap().get(CF),
238          Arrays.<KeyValueScanner> asList(new KeyValueScanner[] {
239              new KeyValueScanFixture(CellComparator.getInstance(), CELL_WITH_VERSIONS) }));
240    }
241
242    @Override
243    protected boolean trySkipToNextColumn(Cell cell) throws IOException {
244      boolean optimized = super.trySkipToNextColumn(cell);
245      LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
246          + ", optimized=" + optimized);
247      if (optimized) {
248        optimization.incrementAndGet();
249      }
250      return optimized;
251    }
252
253    @Override
254    public Cell getNextIndexedKey() {
255      // Fake block boundaries by having index of next block change as we go through scan.
256      return PrivateCellUtil
257          .createFirstOnRow(CELL_WITH_VERSIONS[CELL_WITH_VERSIONS_BLOCK2_BOUNDARY]);
258    }
259  };
260
261  private static class CellWithVersionsNoOptimizeStoreScanner extends StoreScanner {
262    // Count of how often optimize is called and of how often it does an optimize.
263    final AtomicInteger optimization = new AtomicInteger(0);
264
265    CellWithVersionsNoOptimizeStoreScanner(Scan scan, ScanInfo scanInfo) throws IOException {
266      super(scan, scanInfo, scan.getFamilyMap().get(CF),
267          Arrays.<KeyValueScanner> asList(new KeyValueScanner[] {
268              new KeyValueScanFixture(CellComparator.getInstance(), CELL_WITH_VERSIONS) }));
269    }
270
271    @Override
272    protected boolean trySkipToNextColumn(Cell cell) throws IOException {
273      boolean optimized = super.trySkipToNextColumn(cell);
274      LOG.info("Cell=" + cell + ", nextIndex=" + CellUtil.toString(getNextIndexedKey(), false)
275          + ", optimized=" + optimized);
276      if (optimized) {
277        optimization.incrementAndGet();
278      }
279      return optimized;
280    }
281
282    @Override
283    public Cell getNextIndexedKey() {
284      return null;
285    }
286  };
287
288  @Test
289  public void testWithColumnCountGetFilter() throws Exception {
290    Get get = new Get(ONE);
291    get.readAllVersions();
292    get.addFamily(CF);
293    get.setFilter(new ColumnCountGetFilter(2));
294
295    try (CellWithVersionsNoOptimizeStoreScanner scannerNoOptimize =
296        new CellWithVersionsNoOptimizeStoreScanner(new Scan(get), this.scanInfo)) {
297      List<Cell> results = new ArrayList<>();
298      while (scannerNoOptimize.next(results)) {
299        continue;
300      }
301      assertEquals(2, results.size());
302      assertTrue(CellUtil.matchingColumn(results.get(0), CELL_WITH_VERSIONS[0]));
303      assertTrue(CellUtil.matchingColumn(results.get(1), CELL_WITH_VERSIONS[2]));
304      assertTrue("Optimize should do some optimizations",
305        scannerNoOptimize.optimization.get() == 0);
306    }
307
308    get.setFilter(new ColumnCountGetFilter(2));
309    try (CellWithVersionsStoreScanner scanner =
310        new CellWithVersionsStoreScanner(new Scan(get), this.scanInfo)) {
311      List<Cell> results = new ArrayList<>();
312      while (scanner.next(results)) {
313        continue;
314      }
315      assertEquals(2, results.size());
316      assertTrue(CellUtil.matchingColumn(results.get(0), CELL_WITH_VERSIONS[0]));
317      assertTrue(CellUtil.matchingColumn(results.get(1), CELL_WITH_VERSIONS[2]));
318      assertTrue("Optimize should do some optimizations", scanner.optimization.get() > 0);
319    }
320  }
321
322  /*
323   * Test utility for building a NavigableSet for scanners.
324   * @param strCols
325   * @return
326   */
327  NavigableSet<byte[]> getCols(String ...strCols) {
328    NavigableSet<byte[]> cols = new TreeSet<>(Bytes.BYTES_COMPARATOR);
329    for (String col : strCols) {
330      byte[] bytes = Bytes.toBytes(col);
331      cols.add(bytes);
332    }
333    return cols;
334  }
335
336  @Test
337  public void testFullRowGetDoesNotOverreadWhenRowInsideOneBlock() throws IOException {
338    // Do a Get against row two. Row two is inside a block that starts with row TWO but ends with
339    // row TWO_POINT_TWO. We should read one block only.
340    Get get = new Get(TWO);
341    Scan scan = new Scan(get);
342    try (CellGridStoreScanner scanner = new CellGridStoreScanner(scan, this.scanInfo)) {
343      List<Cell> results = new ArrayList<>();
344      while (scanner.next(results)) {
345        continue;
346      }
347      // Should be four results of column 1 (though there are 5 rows in the CELL_GRID -- the
348      // TWO_POINT_TWO row does not have a a column ONE.
349      assertEquals(4, results.size());
350      // We should have gone the optimize route 5 times totally... an INCLUDE for the four cells
351      // in the row plus the DONE on the end.
352      assertEquals(5, scanner.count.get());
353      // For a full row Get, there should be no opportunity for scanner optimization.
354      assertEquals(0, scanner.optimization.get());
355    }
356  }
357
358  @Test
359  public void testFullRowSpansBlocks() throws IOException {
360    // Do a Get against row FOUR. It spans two blocks.
361    Get get = new Get(FOUR);
362    Scan scan = new Scan(get);
363    try (CellGridStoreScanner scanner = new CellGridStoreScanner(scan, this.scanInfo)) {
364      List<Cell> results = new ArrayList<>();
365      while (scanner.next(results)) {
366        continue;
367      }
368      // Should be four results of column 1 (though there are 5 rows in the CELL_GRID -- the
369      // TWO_POINT_TWO row does not have a a column ONE.
370      assertEquals(5, results.size());
371      // We should have gone the optimize route 6 times totally... an INCLUDE for the five cells
372      // in the row plus the DONE on the end.
373      assertEquals(6, scanner.count.get());
374      // For a full row Get, there should be no opportunity for scanner optimization.
375      assertEquals(0, scanner.optimization.get());
376    }
377  }
378
379  /**
380   * Test optimize in StoreScanner. Test that we skip to the next 'block' when we it makes sense
381   * reading the block 'index'.
382   * @throws IOException
383   */
384  @Test
385  public void testOptimize() throws IOException {
386    Scan scan = new Scan();
387    // A scan that just gets the first qualifier on each row of the CELL_GRID
388    scan.addColumn(CF, ONE);
389    try (CellGridStoreScanner scanner = new CellGridStoreScanner(scan, this.scanInfo)) {
390      List<Cell> results = new ArrayList<>();
391      while (scanner.next(results)) {
392        continue;
393      }
394      // Should be four results of column 1 (though there are 5 rows in the CELL_GRID -- the
395      // TWO_POINT_TWO row does not have a a column ONE.
396      assertEquals(4, results.size());
397      for (Cell cell: results) {
398        assertTrue(Bytes.equals(ONE, 0, ONE.length,
399            cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
400      }
401      assertTrue("Optimize should do some optimizations", scanner.optimization.get() > 0);
402    }
403  }
404
405  /**
406   * Ensure the optimize Scan method in StoreScanner does not get in the way of a Get doing minimum
407   * work... seeking to start of block and then SKIPPING until we find the wanted Cell.
408   * This 'simple' scenario mimics case of all Cells fitting inside a single HFileBlock.
409   * See HBASE-15392. This test is a little cryptic. Takes a bit of staring to figure what it up to.
410   * @throws IOException
411   */
412  @Test
413  public void testOptimizeAndGet() throws IOException {
414    // First test a Get of two columns in the row R2. Every Get is a Scan. Get columns named
415    // R2 and R3.
416    Get get = new Get(TWO);
417    get.addColumn(CF, TWO);
418    get.addColumn(CF, THREE);
419    Scan scan = new Scan(get);
420    try (CellGridStoreScanner scanner = new CellGridStoreScanner(scan, this.scanInfo)) {
421      List<Cell> results = new ArrayList<>();
422      // For a Get there should be no more next's after the first call.
423      assertEquals(false, scanner.next(results));
424      // Should be one result only.
425      assertEquals(2, results.size());
426      // And we should have gone through optimize twice only.
427      assertEquals("First qcode is SEEK_NEXT_COL and second INCLUDE_AND_SEEK_NEXT_ROW", 3,
428        scanner.count.get());
429    }
430  }
431
432  /**
433   * Ensure that optimize does not cause the Get to do more seeking than required. Optimize
434   * (see HBASE-15392) was causing us to seek all Cells in a block when a Get Scan if the next block
435   * index/start key was a different row to the current one. A bug. We'd call next too often
436   * because we had to exhaust all Cells in the current row making us load the next block just to
437   * discard what we read there. This test is a little cryptic. Takes a bit of staring to figure
438   * what it up to.
439   * @throws IOException
440   */
441  @Test
442  public void testOptimizeAndGetWithFakedNextBlockIndexStart() throws IOException {
443    // First test a Get of second column in the row R2. Every Get is a Scan. Second column has a
444    // qualifier of R2.
445    Get get = new Get(THREE);
446    get.addColumn(CF, TWO);
447    Scan scan = new Scan(get);
448    try (CellGridStoreScanner scanner = new CellGridStoreScanner(scan, this.scanInfo)) {
449      List<Cell> results = new ArrayList<>();
450      // For a Get there should be no more next's after the first call.
451      assertEquals(false, scanner.next(results));
452      // Should be one result only.
453      assertEquals(1, results.size());
454      // And we should have gone through optimize twice only.
455      assertEquals("First qcode is SEEK_NEXT_COL and second INCLUDE_AND_SEEK_NEXT_ROW", 2,
456        scanner.count.get());
457    }
458  }
459
460  @Test
461  public void testScanTimeRange() throws IOException {
462    String r1 = "R1";
463    // returns only 1 of these 2 even though same timestamp
464    KeyValue [] kvs = new KeyValue[] {
465        create(r1, CF_STR, "a", 1, KeyValue.Type.Put, "dont-care"),
466        create(r1, CF_STR, "a", 2, KeyValue.Type.Put, "dont-care"),
467        create(r1, CF_STR, "a", 3, KeyValue.Type.Put, "dont-care"),
468        create(r1, CF_STR, "a", 4, KeyValue.Type.Put, "dont-care"),
469        create(r1, CF_STR, "a", 5, KeyValue.Type.Put, "dont-care"),
470    };
471    List<KeyValueScanner> scanners = Arrays.<KeyValueScanner>asList(
472        new KeyValueScanner[] {
473            new KeyValueScanFixture(CellComparator.getInstance(), kvs)
474    });
475    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes(r1));
476    scanSpec.setTimeRange(0, 6);
477    scanSpec.readAllVersions();
478    List<Cell> results = null;
479    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
480      results = new ArrayList<>();
481      assertEquals(true, scan.next(results));
482      assertEquals(5, results.size());
483      assertEquals(kvs[kvs.length - 1], results.get(0));
484    }
485    // Scan limited TimeRange
486    scanSpec = new Scan().withStartRow(Bytes.toBytes(r1));
487    scanSpec.setTimeRange(1, 3);
488    scanSpec.readAllVersions();
489    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
490      results = new ArrayList<>();
491      assertEquals(true, scan.next(results));
492      assertEquals(2, results.size());
493    }
494    // Another range.
495    scanSpec = new Scan().withStartRow(Bytes.toBytes(r1));
496    scanSpec.setTimeRange(5, 10);
497    scanSpec.readAllVersions();
498    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
499      results = new ArrayList<>();
500      assertEquals(true, scan.next(results));
501      assertEquals(1, results.size());
502    }
503    // See how TimeRange and Versions interact.
504    // Another range.
505    scanSpec = new Scan().withStartRow(Bytes.toBytes(r1));
506    scanSpec.setTimeRange(0, 10);
507    scanSpec.readVersions(3);
508    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
509      results = new ArrayList<>();
510      assertEquals(true, scan.next(results));
511      assertEquals(3, results.size());
512    }
513  }
514
515  @Test
516  public void testScanSameTimestamp() throws IOException {
517    // returns only 1 of these 2 even though same timestamp
518    KeyValue [] kvs = new KeyValue[] {
519        create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
520        create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
521    };
522    List<KeyValueScanner> scanners = Arrays.asList(
523        new KeyValueScanner[] {
524            new KeyValueScanFixture(CellComparator.getInstance(), kvs)
525        });
526
527    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1"));
528    // this only uses maxVersions (default=1) and TimeRange (default=all)
529    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
530      List<Cell> results = new ArrayList<>();
531      assertEquals(true, scan.next(results));
532      assertEquals(1, results.size());
533      assertEquals(kvs[0], results.get(0));
534    }
535  }
536
537  /*
538   * Test test shows exactly how the matcher's return codes confuses the StoreScanner
539   * and prevent it from doing the right thing.  Seeking once, then nexting twice
540   * should return R1, then R2, but in this case it doesnt.
541   * TODO this comment makes no sense above. Appears to do the right thing.
542   * @throws IOException
543   */
544  @Test
545  public void testWontNextToNext() throws IOException {
546    // build the scan file:
547    KeyValue [] kvs = new KeyValue[] {
548        create("R1", "cf", "a", 2, KeyValue.Type.Put, "dont-care"),
549        create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
550        create("R2", "cf", "a", 1, KeyValue.Type.Put, "dont-care")
551    };
552    List<KeyValueScanner> scanners = scanFixture(kvs);
553
554    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1"));
555    // this only uses maxVersions (default=1) and TimeRange (default=all)
556    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
557      List<Cell> results = new ArrayList<>();
558      scan.next(results);
559      assertEquals(1, results.size());
560      assertEquals(kvs[0], results.get(0));
561      // should be ok...
562      // now scan _next_ again.
563      results.clear();
564      scan.next(results);
565      assertEquals(1, results.size());
566      assertEquals(kvs[2], results.get(0));
567
568      results.clear();
569      scan.next(results);
570      assertEquals(0, results.size());
571    }
572  }
573
574
575  @Test
576  public void testDeleteVersionSameTimestamp() throws IOException {
577    KeyValue [] kvs = new KeyValue [] {
578        create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
579        create("R1", "cf", "a", 1, KeyValue.Type.Delete, "dont-care"),
580    };
581    List<KeyValueScanner> scanners = scanFixture(kvs);
582    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1"));
583    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
584      List<Cell> results = new ArrayList<>();
585      assertFalse(scan.next(results));
586      assertEquals(0, results.size());
587    }
588  }
589
590  /*
591   * Test the case where there is a delete row 'in front of' the next row, the scanner
592   * will move to the next row.
593   */
594  @Test
595  public void testDeletedRowThenGoodRow() throws IOException {
596    KeyValue [] kvs = new KeyValue [] {
597        create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
598        create("R1", "cf", "a", 1, KeyValue.Type.Delete, "dont-care"),
599        create("R2", "cf", "a", 20, KeyValue.Type.Put, "dont-care")
600    };
601    List<KeyValueScanner> scanners = scanFixture(kvs);
602    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1"));
603    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
604      List<Cell> results = new ArrayList<>();
605      assertEquals(true, scan.next(results));
606      assertEquals(0, results.size());
607
608      assertEquals(true, scan.next(results));
609      assertEquals(1, results.size());
610      assertEquals(kvs[2], results.get(0));
611
612      assertEquals(false, scan.next(results));
613    }
614  }
615
616  @Test
617  public void testDeleteVersionMaskingMultiplePuts() throws IOException {
618    long now = System.currentTimeMillis();
619    KeyValue [] kvs1 = new KeyValue[] {
620        create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
621        create("R1", "cf", "a", now, KeyValue.Type.Delete, "dont-care")
622    };
623    KeyValue [] kvs2 = new KeyValue[] {
624        create("R1", "cf", "a", now-500, KeyValue.Type.Put, "dont-care"),
625        create("R1", "cf", "a", now-100, KeyValue.Type.Put, "dont-care"),
626        create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care")
627    };
628    List<KeyValueScanner> scanners = scanFixture(kvs1, kvs2);
629
630    try (StoreScanner scan = new StoreScanner(new Scan().withStartRow(Bytes.toBytes("R1")),
631        scanInfo, getCols("a"), scanners)) {
632      List<Cell> results = new ArrayList<>();
633      // the two put at ts=now will be masked by the 1 delete, and
634      // since the scan default returns 1 version we'll return the newest
635      // key, which is kvs[2], now-100.
636      assertEquals(true, scan.next(results));
637      assertEquals(1, results.size());
638      assertEquals(kvs2[1], results.get(0));
639    }
640  }
641
642  @Test
643  public void testDeleteVersionsMixedAndMultipleVersionReturn() throws IOException {
644    long now = System.currentTimeMillis();
645    KeyValue [] kvs1 = new KeyValue[] {
646        create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
647        create("R1", "cf", "a", now, KeyValue.Type.Delete, "dont-care")
648    };
649    KeyValue [] kvs2 = new KeyValue[] {
650        create("R1", "cf", "a", now-500, KeyValue.Type.Put, "dont-care"),
651        create("R1", "cf", "a", now+500, KeyValue.Type.Put, "dont-care"),
652        create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
653        create("R2", "cf", "z", now, KeyValue.Type.Put, "dont-care")
654    };
655    List<KeyValueScanner> scanners = scanFixture(kvs1, kvs2);
656
657    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1")).readVersions(2);
658    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
659      List<Cell> results = new ArrayList<>();
660      assertEquals(true, scan.next(results));
661      assertEquals(2, results.size());
662      assertEquals(kvs2[1], results.get(0));
663      assertEquals(kvs2[0], results.get(1));
664    }
665  }
666
667  @Test
668  public void testWildCardOneVersionScan() throws IOException {
669    KeyValue [] kvs = new KeyValue [] {
670        create("R1", "cf", "a", 2, KeyValue.Type.Put, "dont-care"),
671        create("R1", "cf", "b", 1, KeyValue.Type.Put, "dont-care"),
672        create("R1", "cf", "a", 1, KeyValue.Type.DeleteColumn, "dont-care"),
673    };
674    List<KeyValueScanner> scanners = scanFixture(kvs);
675    try (StoreScanner scan =
676        new StoreScanner(new Scan().withStartRow(Bytes.toBytes("R1")), scanInfo, null, scanners)) {
677      List<Cell> results = new ArrayList<>();
678      assertEquals(true, scan.next(results));
679      assertEquals(2, results.size());
680      assertEquals(kvs[0], results.get(0));
681      assertEquals(kvs[1], results.get(1));
682    }
683  }
684
685  @Test
686  public void testWildCardScannerUnderDeletes() throws IOException {
687    KeyValue [] kvs = new KeyValue [] {
688        create("R1", "cf", "a", 2, KeyValue.Type.Put, "dont-care"), // inc
689        // orphaned delete column.
690        create("R1", "cf", "a", 1, KeyValue.Type.DeleteColumn, "dont-care"),
691        // column b
692        create("R1", "cf", "b", 2, KeyValue.Type.Put, "dont-care"), // inc
693        create("R1", "cf", "b", 1, KeyValue.Type.Put, "dont-care"), // inc
694        // column c
695        create("R1", "cf", "c", 10, KeyValue.Type.Delete, "dont-care"),
696        create("R1", "cf", "c", 10, KeyValue.Type.Put, "dont-care"), // no
697        create("R1", "cf", "c", 9, KeyValue.Type.Put, "dont-care"),  // inc
698        // column d
699        create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"), // inc
700        create("R1", "cf", "d", 10, KeyValue.Type.DeleteColumn, "dont-care"),
701        create("R1", "cf", "d", 9, KeyValue.Type.Put, "dont-care"),  // no
702        create("R1", "cf", "d", 8, KeyValue.Type.Put, "dont-care"),  // no
703
704    };
705    List<KeyValueScanner> scanners = scanFixture(kvs);
706    try (StoreScanner scan =
707        new StoreScanner(new Scan().readVersions(2), scanInfo, null, scanners)) {
708      List<Cell> results = new ArrayList<>();
709      assertEquals(true, scan.next(results));
710      assertEquals(5, results.size());
711      assertEquals(kvs[0], results.get(0));
712      assertEquals(kvs[2], results.get(1));
713      assertEquals(kvs[3], results.get(2));
714      assertEquals(kvs[6], results.get(3));
715      assertEquals(kvs[7], results.get(4));
716    }
717  }
718
719  @Test
720  public void testDeleteFamily() throws IOException {
721    KeyValue[] kvs = new KeyValue[] {
722        create("R1", "cf", "a", 100, KeyValue.Type.DeleteFamily, "dont-care"),
723        create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
724        create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
725        create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
726        create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
727        create("R1", "cf", "e", 11, KeyValue.Type.DeleteColumn, "dont-care"),
728        create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
729        create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
730        create("R1", "cf", "g", 11, KeyValue.Type.Delete, "dont-care"),
731        create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
732        create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
733        create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
734    };
735    List<KeyValueScanner> scanners = scanFixture(kvs);
736    try (StoreScanner scan =
737        new StoreScanner(new Scan().readAllVersions(), scanInfo, null, scanners)) {
738      List<Cell> results = new ArrayList<>();
739      assertEquals(true, scan.next(results));
740      assertEquals(0, results.size());
741      assertEquals(true, scan.next(results));
742      assertEquals(1, results.size());
743      assertEquals(kvs[kvs.length - 1], results.get(0));
744
745      assertEquals(false, scan.next(results));
746    }
747  }
748
749  @Test
750  public void testDeleteColumn() throws IOException {
751    KeyValue [] kvs = new KeyValue[] {
752        create("R1", "cf", "a", 10, KeyValue.Type.DeleteColumn, "dont-care"),
753        create("R1", "cf", "a", 9, KeyValue.Type.Delete, "dont-care"),
754        create("R1", "cf", "a", 8, KeyValue.Type.Put, "dont-care"),
755        create("R1", "cf", "b", 5, KeyValue.Type.Put, "dont-care")
756    };
757    List<KeyValueScanner> scanners = scanFixture(kvs);
758    try (StoreScanner scan = new StoreScanner(new Scan(), scanInfo, null, scanners)) {
759      List<Cell> results = new ArrayList<>();
760      assertEquals(true, scan.next(results));
761      assertEquals(1, results.size());
762      assertEquals(kvs[3], results.get(0));
763    }
764  }
765
766  private static final KeyValue[] kvs = new KeyValue[] {
767        create("R1", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
768        create("R1", "cf", "b", 11, KeyValue.Type.Put, "dont-care"),
769        create("R1", "cf", "c", 11, KeyValue.Type.Put, "dont-care"),
770        create("R1", "cf", "d", 11, KeyValue.Type.Put, "dont-care"),
771        create("R1", "cf", "e", 11, KeyValue.Type.Put, "dont-care"),
772        create("R1", "cf", "f", 11, KeyValue.Type.Put, "dont-care"),
773        create("R1", "cf", "g", 11, KeyValue.Type.Put, "dont-care"),
774        create("R1", "cf", "h", 11, KeyValue.Type.Put, "dont-care"),
775        create("R1", "cf", "i", 11, KeyValue.Type.Put, "dont-care"),
776        create("R2", "cf", "a", 11, KeyValue.Type.Put, "dont-care"),
777    };
778
779  @Test
780  public void testSkipColumn() throws IOException {
781    List<KeyValueScanner> scanners = scanFixture(kvs);
782    try (StoreScanner scan = new StoreScanner(new Scan(), scanInfo, getCols("a", "d"), scanners)) {
783      List<Cell> results = new ArrayList<>();
784      assertEquals(true, scan.next(results));
785      assertEquals(2, results.size());
786      assertEquals(kvs[0], results.get(0));
787      assertEquals(kvs[3], results.get(1));
788      results.clear();
789
790      assertEquals(true, scan.next(results));
791      assertEquals(1, results.size());
792      assertEquals(kvs[kvs.length - 1], results.get(0));
793
794      results.clear();
795      assertEquals(false, scan.next(results));
796    }
797  }
798
799  /*
800   * Test expiration of KeyValues in combination with a configured TTL for
801   * a column family (as should be triggered in a major compaction).
802   */
803  @Test
804  public void testWildCardTtlScan() throws IOException {
805    long now = System.currentTimeMillis();
806    KeyValue [] kvs = new KeyValue[] {
807        create("R1", "cf", "a", now-1000, KeyValue.Type.Put, "dont-care"),
808        create("R1", "cf", "b", now-10, KeyValue.Type.Put, "dont-care"),
809        create("R1", "cf", "c", now-200, KeyValue.Type.Put, "dont-care"),
810        create("R1", "cf", "d", now-10000, KeyValue.Type.Put, "dont-care"),
811        create("R2", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
812        create("R2", "cf", "b", now-10, KeyValue.Type.Put, "dont-care"),
813        create("R2", "cf", "c", now-200, KeyValue.Type.Put, "dont-care"),
814        create("R2", "cf", "c", now-1000, KeyValue.Type.Put, "dont-care")
815    };
816    List<KeyValueScanner> scanners = scanFixture(kvs);
817    Scan scan = new Scan();
818    scan.readVersions(1);
819    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
820        HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
821    try (StoreScanner scanner = new StoreScanner(scan, scanInfo, null, scanners)) {
822      List<Cell> results = new ArrayList<>();
823      assertEquals(true, scanner.next(results));
824      assertEquals(2, results.size());
825      assertEquals(kvs[1], results.get(0));
826      assertEquals(kvs[2], results.get(1));
827      results.clear();
828
829      assertEquals(true, scanner.next(results));
830      assertEquals(3, results.size());
831      assertEquals(kvs[4], results.get(0));
832      assertEquals(kvs[5], results.get(1));
833      assertEquals(kvs[6], results.get(2));
834      results.clear();
835
836      assertEquals(false, scanner.next(results));
837    }
838  }
839
840  @Test
841  public void testScannerReseekDoesntNPE() throws Exception {
842    List<KeyValueScanner> scanners = scanFixture(kvs);
843    try (StoreScanner scan = new StoreScanner(new Scan(), scanInfo, getCols("a", "d"), scanners)) {
844      // Previously a updateReaders twice in a row would cause an NPE. In test this would also
845      // normally cause an NPE because scan.store is null. So as long as we get through these
846      // two calls we are good and the bug was quashed.
847      scan.updateReaders(Collections.emptyList(), Collections.emptyList());
848      scan.updateReaders(Collections.emptyList(), Collections.emptyList());
849      scan.peek();
850    }
851  }
852
853  @Test @Ignore("this fails, since we don't handle deletions, etc, in peek")
854  public void testPeek() throws Exception {
855    KeyValue[] kvs = new KeyValue [] {
856        create("R1", "cf", "a", 1, KeyValue.Type.Put, "dont-care"),
857        create("R1", "cf", "a", 1, KeyValue.Type.Delete, "dont-care"),
858    };
859    List<KeyValueScanner> scanners = scanFixture(kvs);
860    Scan scanSpec = new Scan().withStartRow(Bytes.toBytes("R1"));
861    try (StoreScanner scan = new StoreScanner(scanSpec, scanInfo, getCols("a"), scanners)) {
862      assertNull(scan.peek());
863    }
864  }
865
866  /**
867   * Ensure that expired delete family markers don't override valid puts
868   */
869  @Test
870  public void testExpiredDeleteFamily() throws Exception {
871    long now = System.currentTimeMillis();
872    KeyValue[] kvs = new KeyValue[] {
873        new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, now-1000,
874            KeyValue.Type.DeleteFamily),
875        create("R1", "cf", "a", now-10, KeyValue.Type.Put,
876            "dont-care"),
877    };
878    List<KeyValueScanner> scanners = scanFixture(kvs);
879    Scan scan = new Scan();
880    scan.readVersions(1);
881    // scanner with ttl equal to 500
882    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
883        HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
884    try (StoreScanner scanner = new StoreScanner(scan, scanInfo, null, scanners)) {
885      List<Cell> results = new ArrayList<>();
886      assertEquals(true, scanner.next(results));
887      assertEquals(1, results.size());
888      assertEquals(kvs[1], results.get(0));
889      results.clear();
890
891      assertEquals(false, scanner.next(results));
892    }
893  }
894
895  @Test
896  public void testDeleteMarkerLongevity() throws Exception {
897    try {
898      final long now = System.currentTimeMillis();
899      EnvironmentEdgeManagerTestHelper.injectEdge(new EnvironmentEdge() {
900        @Override
901        public long currentTime() {
902          return now;
903        }
904      });
905      KeyValue[] kvs = new KeyValue[]{
906        /*0*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null,
907        now - 100, KeyValue.Type.DeleteFamily), // live
908        /*1*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null,
909        now - 1000, KeyValue.Type.DeleteFamily), // expired
910        /*2*/ create("R1", "cf", "a", now - 50,
911        KeyValue.Type.Put, "v3"), // live
912        /*3*/ create("R1", "cf", "a", now - 55,
913        KeyValue.Type.Delete, "dontcare"), // live
914        /*4*/ create("R1", "cf", "a", now - 55,
915        KeyValue.Type.Put, "deleted-version v2"), // deleted
916        /*5*/ create("R1", "cf", "a", now - 60,
917        KeyValue.Type.Put, "v1"), // live
918        /*6*/ create("R1", "cf", "a", now - 65,
919        KeyValue.Type.Put, "v0"), // max-version reached
920        /*7*/ create("R1", "cf", "a",
921        now - 100, KeyValue.Type.DeleteColumn, "dont-care"), // max-version
922        /*8*/ create("R1", "cf", "b", now - 600,
923        KeyValue.Type.DeleteColumn, "dont-care"), //expired
924        /*9*/ create("R1", "cf", "b", now - 70,
925        KeyValue.Type.Put, "v2"), //live
926        /*10*/ create("R1", "cf", "b", now - 750,
927        KeyValue.Type.Put, "v1"), //expired
928        /*11*/ create("R1", "cf", "c", now - 500,
929        KeyValue.Type.Delete, "dontcare"), //expired
930        /*12*/ create("R1", "cf", "c", now - 600,
931        KeyValue.Type.Put, "v1"), //expired
932        /*13*/ create("R1", "cf", "c", now - 1000,
933        KeyValue.Type.Delete, "dontcare"), //expired
934        /*14*/ create("R1", "cf", "d", now - 60,
935        KeyValue.Type.Put, "expired put"), //live
936        /*15*/ create("R1", "cf", "d", now - 100,
937        KeyValue.Type.Delete, "not-expired delete"), //live
938      };
939      List<KeyValueScanner> scanners = scanFixture(kvs);
940      ScanInfo scanInfo = new ScanInfo(CONF, Bytes.toBytes("cf"),
941        0 /* minVersions */,
942        2 /* maxVersions */, 500 /* ttl */,
943        KeepDeletedCells.FALSE /* keepDeletedCells */,
944        HConstants.DEFAULT_BLOCKSIZE /* block size */,
945        200, /* timeToPurgeDeletes */
946        CellComparator.getInstance(), false);
947      try (StoreScanner scanner =
948          new StoreScanner(scanInfo, OptionalInt.of(2), ScanType.COMPACT_DROP_DELETES, scanners)) {
949        List<Cell> results = new ArrayList<>();
950        results = new ArrayList<>();
951        assertEquals(true, scanner.next(results));
952        assertEquals(kvs[0], results.get(0));
953        assertEquals(kvs[2], results.get(1));
954        assertEquals(kvs[3], results.get(2));
955        assertEquals(kvs[5], results.get(3));
956        assertEquals(kvs[9], results.get(4));
957        assertEquals(kvs[14], results.get(5));
958        assertEquals(kvs[15], results.get(6));
959        assertEquals(7, results.size());
960      }
961    } finally {
962      EnvironmentEdgeManagerTestHelper.reset();
963    }
964  }
965
966  @Test
967  public void testPreadNotEnabledForCompactionStoreScanners() throws Exception {
968    long now = System.currentTimeMillis();
969    KeyValue[] kvs = new KeyValue[] {
970        new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, now - 1000,
971            KeyValue.Type.DeleteFamily),
972        create("R1", "cf", "a", now - 10, KeyValue.Type.Put, "dont-care"), };
973    List<KeyValueScanner> scanners = scanFixture(kvs);
974    ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
975        HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false);
976    try (StoreScanner storeScanner = new StoreScanner(scanInfo, OptionalInt.empty(),
977        ScanType.COMPACT_RETAIN_DELETES, scanners)) {
978      assertFalse(storeScanner.isScanUsePread());
979    }
980  }
981}