001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.regionserver;
021
022import org.apache.hadoop.hbase.Cell;
023import org.apache.hadoop.hbase.CellComparator;
024import org.apache.yetus.audience.InterfaceAudience;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029
030/**
031 * The MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator
032 * and performs the scan for simple merge operation meaning it is NOT based on SQM
033 */
034@InterfaceAudience.Private
035public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator {
036
037  // heap of scanners, lazily initialized
038  private KeyValueHeap heap = null;
039  // remember the initial version of the scanners list
040  List<KeyValueScanner> scanners  = new ArrayList<KeyValueScanner>();
041
042  private boolean closed = false;
043
044  // C-tor
045  public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
046      int compactionKVMax) throws IOException {
047    super(compactionKVMax);
048    // create the list of scanners to traverse over all the data
049    // no dirty reads here as these are immutable segments
050    int order = segments.size();
051    AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, order, scanners);
052    heap = new KeyValueHeap(scanners, comparator);
053  }
054
055  @Override
056  public boolean hasNext() {
057    if (closed) {
058      return false;
059    }
060    if (this.heap != null) {
061      return (this.heap.peek() != null);
062    }
063    // Doing this way in case some test cases tries to peek directly
064    return false;
065  }
066
067  @Override
068  public Cell next()  {
069    try {                 // try to get next
070      if (!closed && heap != null) {
071        return heap.next();
072      }
073    } catch (IOException ie) {
074      throw new IllegalStateException(ie);
075    }
076    return null;
077  }
078
079  @Override
080  public void close() {
081    if (closed) {
082      return;
083    }
084    // Ensuring that all the segment scanners are closed
085    if (heap != null) {
086      heap.close();
087      // It is safe to do close as no new calls will be made to this scanner.
088      heap = null;
089    } else {
090      for (KeyValueScanner scanner : scanners) {
091        scanner.close();
092      }
093    }
094    closed = true;
095  }
096
097  @Override
098  public void remove() {
099    throw new UnsupportedOperationException();
100  }
101}