001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.regionserver; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.List; 026import java.util.NoSuchElementException; 027 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellComparator; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.coprocessor.CoprocessorException; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 036 037/** 038 * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator 039 * and performs the scan for compaction operation meaning it is based on SQM 040 */ 041@InterfaceAudience.Private 042public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator { 043 private static final Logger LOG = 044 LoggerFactory.getLogger(MemStoreCompactorSegmentsIterator.class); 045 046 private final List<Cell> kvs = new ArrayList<>(); 047 private boolean hasMore = true; 048 private Iterator<Cell> kvsIterator; 049 050 // scanner on top of pipeline scanner that uses ScanQueryMatcher 051 private InternalScanner compactingScanner; 052 053 // C-tor 054 public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments, 055 CellComparator comparator, int compactionKVMax, HStore store) throws IOException { 056 super(compactionKVMax); 057 058 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); 059 AbstractMemStore.addToScanners(segments, Long.MAX_VALUE, scanners); 060 // build the scanner based on Query Matcher 061 // reinitialize the compacting scanner for each instance of iterator 062 compactingScanner = createScanner(store, scanners); 063 refillKVS(); 064 } 065 066 @Override 067 public boolean hasNext() { 068 if (kvsIterator == null) { // for the case when the result is empty 069 return false; 070 } 071 // return true either we have cells in buffer or we can get more. 072 return kvsIterator.hasNext() || refillKVS(); 073 } 074 075 @Override 076 public Cell next() { 077 if (!hasNext()) { 078 throw new NoSuchElementException(); 079 } 080 return kvsIterator.next(); 081 } 082 083 @Override 084 public void close() { 085 try { 086 compactingScanner.close(); 087 } catch (IOException e) { 088 LOG.warn("close store scanner failed", e); 089 } 090 compactingScanner = null; 091 kvs.clear(); 092 } 093 094 @Override 095 public void remove() { 096 throw new UnsupportedOperationException(); 097 } 098 099 /** 100 * Creates the scanner for compacting the pipeline. 101 * @return the scanner 102 */ 103 private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners) 104 throws IOException { 105 InternalScanner scanner = null; 106 boolean success = false; 107 try { 108 RegionCoprocessorHost cpHost = store.getCoprocessorHost(); 109 ScanInfo scanInfo; 110 if (cpHost != null) { 111 scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store); 112 } else { 113 scanInfo = store.getScanInfo(); 114 } 115 scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, 116 store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); 117 if (cpHost != null) { 118 InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner); 119 if (scannerFromCp == null) { 120 throw new CoprocessorException("Got a null InternalScanner when calling" + 121 " preMemStoreCompactionCompact which is not acceptable"); 122 } 123 success = true; 124 return scannerFromCp; 125 } else { 126 success = true; 127 return scanner; 128 } 129 } finally { 130 if (!success) { 131 Closeables.close(scanner, true); 132 scanners.forEach(KeyValueScanner::close); 133 } 134 } 135 } 136 137 /* 138 * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is 139 * non-empty 140 */ 141 private boolean refillKVS() { 142 // if there is nothing expected next in compactingScanner 143 if (!hasMore) { 144 return false; 145 } 146 // clear previous KVS, first initiated in the constructor 147 kvs.clear(); 148 for (;;) { 149 try { 150 hasMore = compactingScanner.next(kvs, scannerContext); 151 } catch (IOException e) { 152 // should not happen as all data are in memory 153 throw new IllegalStateException(e); 154 } 155 if (!kvs.isEmpty()) { 156 kvsIterator = kvs.iterator(); 157 return true; 158 } else if (!hasMore) { 159 return false; 160 } 161 } 162 } 163}