001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Iterator; 023import java.util.List; 024import java.util.NoSuchElementException; 025import org.apache.hadoop.hbase.CellComparator; 026import org.apache.hadoop.hbase.ExtendedCell; 027import org.apache.hadoop.hbase.PrivateConstants; 028import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 034 035/** 036 * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator and performs the scan for 037 * compaction operation meaning it is based on SQM 038 */ 039@InterfaceAudience.Private 040public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator { 041 private static final Logger LOG = 042 LoggerFactory.getLogger(MemStoreCompactorSegmentsIterator.class); 043 044 private final List<ExtendedCell> kvs = new ArrayList<>(); 045 private boolean hasMore = true; 046 private Iterator<ExtendedCell> kvsIterator; 047 048 // scanner on top of pipeline scanner that uses ScanQueryMatcher 049 private InternalScanner compactingScanner; 050 051 // C-tor 052 public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments, 053 CellComparator comparator, int compactionKVMax, HStore store) throws IOException { 054 super(compactionKVMax); 055 056 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); 057 AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, scanners); 058 // build the scanner based on Query Matcher 059 // reinitialize the compacting scanner for each instance of iterator 060 compactingScanner = createScanner(store, scanners); 061 refillKVS(); 062 } 063 064 @Override 065 public boolean hasNext() { 066 if (kvsIterator == null) { // for the case when the result is empty 067 return false; 068 } 069 // return true either we have cells in buffer or we can get more. 070 return kvsIterator.hasNext() || refillKVS(); 071 } 072 073 @Override 074 public ExtendedCell next() { 075 if (!hasNext()) { 076 throw new NoSuchElementException(); 077 } 078 return kvsIterator.next(); 079 } 080 081 @Override 082 public void close() { 083 try { 084 compactingScanner.close(); 085 } catch (IOException e) { 086 LOG.warn("close store scanner failed", e); 087 } 088 compactingScanner = null; 089 kvs.clear(); 090 } 091 092 @Override 093 public void remove() { 094 throw new UnsupportedOperationException(); 095 } 096 097 /** 098 * Creates the scanner for compacting the pipeline. 099 * @return the scanner 100 */ 101 private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners) 102 throws IOException { 103 InternalScanner scanner = null; 104 boolean success = false; 105 try { 106 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 107 ScanInfo scanInfo; 108 if (cpHost != null) { 109 scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store); 110 } else { 111 scanInfo = store.getScanInfo(); 112 } 113 scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, 114 store.getSmallestReadPoint(), PrivateConstants.OLDEST_TIMESTAMP); 115 if (cpHost != null) { 116 InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner); 117 if (scannerFromCp == null) { 118 throw new CoprocessorException("Got a null InternalScanner when calling" 119 + " preMemStoreCompactionCompact which is not acceptable"); 120 } 121 success = true; 122 return scannerFromCp; 123 } else { 124 success = true; 125 return scanner; 126 } 127 } finally { 128 if (!success) { 129 Closeables.close(scanner, true); 130 scanners.forEach(KeyValueScanner::close); 131 } 132 } 133 } 134 135 /** 136 * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is 137 * non-empty 138 */ 139 private boolean refillKVS() { 140 // if there is nothing expected next in compactingScanner 141 if (!hasMore) { 142 return false; 143 } 144 // clear previous KVS, first initiated in the constructor 145 kvs.clear(); 146 for (;;) { 147 try { 148 // InternalScanner is for CPs so we do not want to leak ExtendedCell to the interface, but 149 // all the server side implementation should only add ExtendedCell to the List, otherwise it 150 // will cause serious assertions in our code 151 hasMore = compactingScanner.next(kvs, scannerContext); 152 } catch (IOException e) { 153 // should not happen as all data are in memory 154 throw new IllegalStateException(e); 155 } 156 if (!kvs.isEmpty()) { 157 kvsIterator = kvs.iterator(); 158 return true; 159 } else if (!hasMore) { 160 return false; 161 } 162 } 163 } 164}