001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.NoSuchElementException;
027
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
036
037/**
038 * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
039 * and performs the scan for compaction operation meaning it is based on SQM
040 */
041@InterfaceAudience.Private
042public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
043  private static final Logger LOG =
044      LoggerFactory.getLogger(MemStoreCompactorSegmentsIterator.class);
045
046  private final List<Cell> kvs = new ArrayList<>();
047  private boolean hasMore = true;
048  private Iterator<Cell> kvsIterator;
049
050  // scanner on top of pipeline scanner that uses ScanQueryMatcher
051  private InternalScanner compactingScanner;
052
053  // C-tor
054  public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
055      CellComparator comparator, int compactionKVMax, HStore store) throws IOException {
056    super(compactionKVMax);
057
058    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
059    // create the list of scanners to traverse over all the data
060    // no dirty reads here as these are immutable segments
061    int order = segments.size();
062    AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, order, scanners);
063    // build the scanner based on Query Matcher
064    // reinitialize the compacting scanner for each instance of iterator
065    compactingScanner = createScanner(store, scanners);
066    refillKVS();
067  }
068
069  @Override
070  public boolean hasNext() {
071    if (kvsIterator == null) { // for the case when the result is empty
072      return false;
073    }
074    // return true either we have cells in buffer or we can get more.
075    return kvsIterator.hasNext() || refillKVS();
076  }
077
078  @Override
079  public Cell next() {
080    if (!hasNext()) {
081      throw new NoSuchElementException();
082    }
083    return kvsIterator.next();
084  }
085
086  @Override
087  public void close() {
088    try {
089      compactingScanner.close();
090    } catch (IOException e) {
091      LOG.warn("close store scanner failed", e);
092    }
093    compactingScanner = null;
094    kvs.clear();
095  }
096
097  @Override
098  public void remove() {
099    throw new UnsupportedOperationException();
100  }
101
102  /**
103   * Creates the scanner for compacting the pipeline.
104   * @return the scanner
105   */
106  private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
107      throws IOException {
108    InternalScanner scanner = null;
109    boolean success = false;
110    try {
111      RegionCoprocessorHost cpHost = store.getCoprocessorHost();
112      ScanInfo scanInfo;
113      if (cpHost != null) {
114        scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
115      } else {
116        scanInfo = store.getScanInfo();
117      }
118      scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
119          store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
120      if (cpHost != null) {
121        InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
122        if (scannerFromCp == null) {
123          throw new CoprocessorException("Got a null InternalScanner when calling" +
124              " preMemStoreCompactionCompact which is not acceptable");
125        }
126        success = true;
127        return scannerFromCp;
128      } else {
129        success = true;
130        return scanner;
131      }
132    } finally {
133      if (!success) {
134        Closeables.close(scanner, true);
135        scanners.forEach(KeyValueScanner::close);
136      }
137    }
138  }
139
140  /*
141   * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
142   * non-empty
143   */
144  private boolean refillKVS() {
145    // if there is nothing expected next in compactingScanner
146    if (!hasMore) {
147      return false;
148    }
149    // clear previous KVS, first initiated in the constructor
150    kvs.clear();
151    for (;;) {
152      try {
153        hasMore = compactingScanner.next(kvs, scannerContext);
154      } catch (IOException e) {
155        // should not happen as all data are in memory
156        throw new IllegalStateException(e);
157      }
158      if (!kvs.isEmpty()) {
159        kvsIterator = kvs.iterator();
160        return true;
161      } else if (!hasMore) {
162        return false;
163      }
164    }
165  }
166}