001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.List;
024import java.util.NoSuchElementException;
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellComparator;
027import org.apache.hadoop.hbase.PrivateConstants;
028import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
034
035/**
036 * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator and performs the scan for
037 * compaction operation meaning it is based on SQM
038 */
039@InterfaceAudience.Private
040public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
041  private static final Logger LOG =
042    LoggerFactory.getLogger(MemStoreCompactorSegmentsIterator.class);
043
044  private final List<Cell> kvs = new ArrayList<>();
045  private boolean hasMore = true;
046  private Iterator<Cell> kvsIterator;
047
048  // scanner on top of pipeline scanner that uses ScanQueryMatcher
049  private InternalScanner compactingScanner;
050
051  // C-tor
052  public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
053    CellComparator comparator, int compactionKVMax, HStore store) throws IOException {
054    super(compactionKVMax);
055
056    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
057    AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, scanners);
058    // build the scanner based on Query Matcher
059    // reinitialize the compacting scanner for each instance of iterator
060    compactingScanner = createScanner(store, scanners);
061    refillKVS();
062  }
063
064  @Override
065  public boolean hasNext() {
066    if (kvsIterator == null) { // for the case when the result is empty
067      return false;
068    }
069    // return true either we have cells in buffer or we can get more.
070    return kvsIterator.hasNext() || refillKVS();
071  }
072
073  @Override
074  public Cell next() {
075    if (!hasNext()) {
076      throw new NoSuchElementException();
077    }
078    return kvsIterator.next();
079  }
080
081  @Override
082  public void close() {
083    try {
084      compactingScanner.close();
085    } catch (IOException e) {
086      LOG.warn("close store scanner failed", e);
087    }
088    compactingScanner = null;
089    kvs.clear();
090  }
091
092  @Override
093  public void remove() {
094    throw new UnsupportedOperationException();
095  }
096
097  /**
098   * Creates the scanner for compacting the pipeline.
099   * @return the scanner
100   */
101  private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
102    throws IOException {
103    InternalScanner scanner = null;
104    boolean success = false;
105    try {
106      RegionCoprocessorHost cpHost = store.getCoprocessorHost();
107      ScanInfo scanInfo;
108      if (cpHost != null) {
109        scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
110      } else {
111        scanInfo = store.getScanInfo();
112      }
113      scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
114        store.getSmallestReadPoint(), PrivateConstants.OLDEST_TIMESTAMP);
115      if (cpHost != null) {
116        InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
117        if (scannerFromCp == null) {
118          throw new CoprocessorException("Got a null InternalScanner when calling"
119            + " preMemStoreCompactionCompact which is not acceptable");
120        }
121        success = true;
122        return scannerFromCp;
123      } else {
124        success = true;
125        return scanner;
126      }
127    } finally {
128      if (!success) {
129        Closeables.close(scanner, true);
130        scanners.forEach(KeyValueScanner::close);
131      }
132    }
133  }
134
135  /*
136   * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
137   * non-empty
138   */
139  private boolean refillKVS() {
140    // if there is nothing expected next in compactingScanner
141    if (!hasMore) {
142      return false;
143    }
144    // clear previous KVS, first initiated in the constructor
145    kvs.clear();
146    for (;;) {
147      try {
148        hasMore = compactingScanner.next(kvs, scannerContext);
149      } catch (IOException e) {
150        // should not happen as all data are in memory
151        throw new IllegalStateException(e);
152      }
153      if (!kvs.isEmpty()) {
154        kvsIterator = kvs.iterator();
155        return true;
156      } else if (!hasMore) {
157        return false;
158      }
159    }
160  }
161}