001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.NoSuchElementException; 027 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 036 037/** 038 * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator 039 * and performs the scan for compaction operation meaning it is based on SQM 040 */ 041@InterfaceAudience.Private 042public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator { 043 private static final Logger LOG = 044 LoggerFactory.getLogger(MemStoreCompactorSegmentsIterator.class); 045 046 private final List<Cell> kvs = new ArrayList<>(); 047 private boolean hasMore = true; 048 private Iterator<Cell> kvsIterator; 049 050 // scanner on top of pipeline scanner that uses ScanQueryMatcher 051 private InternalScanner compactingScanner; 052 053 // C-tor 054 public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments, 055 CellComparator comparator, int compactionKVMax, HStore store) throws IOException { 056 super(compactionKVMax); 057 058 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); 059 // create the list of scanners to traverse over all the data 060 // no dirty reads here as these are immutable segments 061 int order = segments.size(); 062 AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, order, scanners); 063 // build the scanner based on Query Matcher 064 // reinitialize the compacting scanner for each instance of iterator 065 compactingScanner = createScanner(store, scanners); 066 refillKVS(); 067 } 068 069 @Override 070 public boolean hasNext() { 071 if (kvsIterator == null) { // for the case when the result is empty 072 return false; 073 } 074 // return true either we have cells in buffer or we can get more. 075 return kvsIterator.hasNext() || refillKVS(); 076 } 077 078 @Override 079 public Cell next() { 080 if (!hasNext()) { 081 throw new NoSuchElementException(); 082 } 083 return kvsIterator.next(); 084 } 085 086 @Override 087 public void close() { 088 try { 089 compactingScanner.close(); 090 } catch (IOException e) { 091 LOG.warn("close store scanner failed", e); 092 } 093 compactingScanner = null; 094 kvs.clear(); 095 } 096 097 @Override 098 public void remove() { 099 throw new UnsupportedOperationException(); 100 } 101 102 /** 103 * Creates the scanner for compacting the pipeline. 104 * @return the scanner 105 */ 106 private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners) 107 throws IOException { 108 InternalScanner scanner = null; 109 boolean success = false; 110 try { 111 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 112 ScanInfo scanInfo; 113 if (cpHost != null) { 114 scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store); 115 } else { 116 scanInfo = store.getScanInfo(); 117 } 118 scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, 119 store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); 120 if (cpHost != null) { 121 InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner); 122 if (scannerFromCp == null) { 123 throw new CoprocessorException("Got a null InternalScanner when calling" + 124 " preMemStoreCompactionCompact which is not acceptable"); 125 } 126 success = true; 127 return scannerFromCp; 128 } else { 129 success = true; 130 return scanner; 131 } 132 } finally { 133 if (!success) { 134 Closeables.close(scanner, true); 135 scanners.forEach(KeyValueScanner::close); 136 } 137 } 138 } 139 140 /* 141 * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is 142 * non-empty 143 */ 144 private boolean refillKVS() { 145 // if there is nothing expected next in compactingScanner 146 if (!hasMore) { 147 return false; 148 } 149 // clear previous KVS, first initiated in the constructor 150 kvs.clear(); 151 for (;;) { 152 try { 153 hasMore = compactingScanner.next(kvs, scannerContext); 154 } catch (IOException e) { 155 // should not happen as all data are in memory 156 throw new IllegalStateException(e); 157 } 158 if (!kvs.isEmpty()) { 159 kvsIterator = kvs.iterator(); 160 return true; 161 } else if (!hasMore) { 162 return false; 163 } 164 } 165 } 166}