001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.NoSuchElementException;
027
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
036
037/**
038 * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
039 * and performs the scan for compaction operation meaning it is based on SQM
040 */
041@InterfaceAudience.Private
042public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
043  private static final Logger LOG =
044      LoggerFactory.getLogger(MemStoreCompactorSegmentsIterator.class);
045
046  private final List<Cell> kvs = new ArrayList<>();
047  private boolean hasMore = true;
048  private Iterator<Cell> kvsIterator;
049
050  // scanner on top of pipeline scanner that uses ScanQueryMatcher
051  private InternalScanner compactingScanner;
052
053  // C-tor
054  public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
055      CellComparator comparator, int compactionKVMax, HStore store) throws IOException {
056    super(compactionKVMax);
057
058    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
059    AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, scanners);
060    // build the scanner based on Query Matcher
061    // reinitialize the compacting scanner for each instance of iterator
062    compactingScanner = createScanner(store, scanners);
063    refillKVS();
064  }
065
066  @Override
067  public boolean hasNext() {
068    if (kvsIterator == null) { // for the case when the result is empty
069      return false;
070    }
071    // return true either we have cells in buffer or we can get more.
072    return kvsIterator.hasNext() || refillKVS();
073  }
074
075  @Override
076  public Cell next() {
077    if (!hasNext()) {
078      throw new NoSuchElementException();
079    }
080    return kvsIterator.next();
081  }
082
083  @Override
084  public void close() {
085    try {
086      compactingScanner.close();
087    } catch (IOException e) {
088      LOG.warn("close store scanner failed", e);
089    }
090    compactingScanner = null;
091    kvs.clear();
092  }
093
094  @Override
095  public void remove() {
096    throw new UnsupportedOperationException();
097  }
098
099  /**
100   * Creates the scanner for compacting the pipeline.
101   * @return the scanner
102   */
103  private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
104      throws IOException {
105    InternalScanner scanner = null;
106    boolean success = false;
107    try {
108      RegionCoprocessorHost cpHost = store.getCoprocessorHost();
109      ScanInfo scanInfo;
110      if (cpHost != null) {
111        scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
112      } else {
113        scanInfo = store.getScanInfo();
114      }
115      scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
116          store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
117      if (cpHost != null) {
118        InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
119        if (scannerFromCp == null) {
120          throw new CoprocessorException("Got a null InternalScanner when calling" +
121              " preMemStoreCompactionCompact which is not acceptable");
122        }
123        success = true;
124        return scannerFromCp;
125      } else {
126        success = true;
127        return scanner;
128      }
129    } finally {
130      if (!success) {
131        Closeables.close(scanner, true);
132        scanners.forEach(KeyValueScanner::close);
133      }
134    }
135  }
136
137  /*
138   * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
139   * non-empty
140   */
141  private boolean refillKVS() {
142    // if there is nothing expected next in compactingScanner
143    if (!hasMore) {
144      return false;
145    }
146    // clear previous KVS, first initiated in the constructor
147    kvs.clear();
148    for (;;) {
149      try {
150        hasMore = compactingScanner.next(kvs, scannerContext);
151      } catch (IOException e) {
152        // should not happen as all data are in memory
153        throw new IllegalStateException(e);
154      }
155      if (!kvs.isEmpty()) {
156        kvsIterator = kvs.iterator();
157        return true;
158      } else if (!hasMore) {
159        return false;
160      }
161    }
162  }
163}