001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Executors; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicLong; 026import java.util.concurrent.locks.Condition; 027import java.util.concurrent.locks.Lock; 028import java.util.concurrent.locks.ReentrantLock; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.log.HBaseMarkers; 035import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore; 036import org.apache.hadoop.hbase.regionserver.HRegion; 037import org.apache.hadoop.hbase.regionserver.HStore; 038import org.apache.hadoop.hbase.regionserver.Store; 039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 040import org.apache.hadoop.hbase.util.HFileArchiveUtil; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 046 047/** 048 * As long as there is no RegionServerServices for a master local region, we need implement the 049 * flush and compaction logic by our own. 050 * <p/> 051 * The flush logic is very simple, every time after calling a modification method in 052 * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this 053 * method, we will check the memstore size and if it is above the flush size, we will call 054 * {@link HRegion#flush(boolean)} to force flush all stores. 055 * <p/> 056 * And for compaction, the logic is also very simple. After flush, we will check the store file 057 * count, if it is above the compactMin, we will do a major compaction. 058 */ 059@InterfaceAudience.Private 060class MasterRegionFlusherAndCompactor implements Closeable { 061 062 private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class); 063 064 private final Configuration conf; 065 066 private final Abortable abortable; 067 068 private final HRegion region; 069 070 // as we can only count this outside the region's write/flush process so it is not accurate, but 071 // it is enough. 072 private final AtomicLong changesAfterLastFlush = new AtomicLong(0); 073 074 private final long flushSize; 075 076 private final long flushPerChanges; 077 078 private final long flushIntervalMs; 079 080 private final int compactMin; 081 082 private final Path globalArchivePath; 083 084 private final String archivedHFileSuffix; 085 086 private final Thread flushThread; 087 088 private final Lock flushLock = new ReentrantLock(); 089 090 private final Condition flushCond = flushLock.newCondition(); 091 092 private boolean flushRequest = false; 093 094 private long lastFlushTime; 095 096 private final ExecutorService compactExecutor; 097 098 private final Lock compactLock = new ReentrantLock(); 099 100 private boolean compactRequest = false; 101 102 private volatile boolean closed = false; 103 104 MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region, 105 long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin, 106 Path globalArchivePath, String archivedHFileSuffix) { 107 this.conf = conf; 108 this.abortable = abortable; 109 this.region = region; 110 this.flushSize = flushSize; 111 this.flushPerChanges = flushPerChanges; 112 this.flushIntervalMs = flushIntervalMs; 113 this.compactMin = compactMin; 114 this.globalArchivePath = globalArchivePath; 115 this.archivedHFileSuffix = archivedHFileSuffix; 116 flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher"); 117 flushThread.setDaemon(true); 118 flushThread.start(); 119 compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() 120 .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true) 121 .build()); 122 LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}", 123 flushSize, flushPerChanges, flushIntervalMs, compactMin); 124 } 125 126 // inject our flush related configurations 127 static void setupConf(Configuration conf, long flushSize, long flushPerChanges, 128 long flushIntervalMs) { 129 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize); 130 conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges); 131 conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs); 132 LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize, 133 flushPerChanges, flushIntervalMs); 134 } 135 136 private void moveHFileToGlobalArchiveDir() throws IOException { 137 FileSystem fs = region.getRegionFileSystem().getFileSystem(); 138 for (HStore store : region.getStores()) { 139 store.closeAndArchiveCompactedFiles(); 140 Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(), 141 store.getColumnFamilyDescriptor().getName()); 142 Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath( 143 globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName()); 144 try { 145 MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir, 146 archivedHFileSuffix); 147 } catch (IOException e) { 148 LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir, 149 globalStoreArchiveDir, e); 150 } 151 } 152 } 153 154 private void compact() { 155 try { 156 region.compact(true); 157 moveHFileToGlobalArchiveDir(); 158 } catch (IOException e) { 159 LOG.error("Failed to compact master local region", e); 160 } 161 compactLock.lock(); 162 try { 163 if (needCompaction()) { 164 compactExecutor.execute(this::compact); 165 } else { 166 compactRequest = false; 167 } 168 } finally { 169 compactLock.unlock(); 170 } 171 } 172 173 private boolean needCompaction() { 174 for (Store store : region.getStores()) { 175 if (store.getStorefilesCount() >= compactMin) { 176 return true; 177 } 178 } 179 return false; 180 } 181 182 private void flushLoop() { 183 lastFlushTime = EnvironmentEdgeManager.currentTime(); 184 while (!closed) { 185 flushLock.lock(); 186 try { 187 while (!flushRequest) { 188 long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime(); 189 if (waitTimeMs <= 0) { 190 flushRequest = true; 191 break; 192 } 193 flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS); 194 if (closed) { 195 return; 196 } 197 } 198 } catch (InterruptedException e) { 199 Thread.currentThread().interrupt(); 200 continue; 201 } finally { 202 flushLock.unlock(); 203 } 204 assert flushRequest; 205 changesAfterLastFlush.set(0); 206 try { 207 region.flush(true); 208 lastFlushTime = EnvironmentEdgeManager.currentTime(); 209 } catch (IOException e) { 210 LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e); 211 abortable.abort("Failed to flush master local region", e); 212 return; 213 } 214 compactLock.lock(); 215 try { 216 if (!compactRequest && needCompaction()) { 217 compactRequest = true; 218 compactExecutor.execute(this::compact); 219 } 220 } finally { 221 compactLock.unlock(); 222 } 223 flushLock.lock(); 224 try { 225 // reset the flushRequest flag 226 if (!shouldFlush(changesAfterLastFlush.get())) { 227 flushRequest = false; 228 } 229 } finally { 230 flushLock.unlock(); 231 } 232 } 233 } 234 235 private boolean shouldFlush(long changes) { 236 long heapSize = region.getMemStoreHeapSize(); 237 long offHeapSize = region.getMemStoreOffHeapSize(); 238 boolean flush = heapSize + offHeapSize >= flushSize || changes > flushPerChanges; 239 if (flush && LOG.isTraceEnabled()) { 240 LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}", 241 heapSize + offHeapSize, flushSize, changes, flushPerChanges); 242 } 243 return flush; 244 } 245 246 void onUpdate() { 247 long changes = changesAfterLastFlush.incrementAndGet(); 248 if (shouldFlush(changes)) { 249 requestFlush(); 250 } 251 } 252 253 void requestFlush() { 254 flushLock.lock(); 255 try { 256 if (flushRequest) { 257 return; 258 } 259 flushRequest = true; 260 flushCond.signalAll(); 261 } finally { 262 flushLock.unlock(); 263 } 264 } 265 266 @Override 267 public void close() { 268 closed = true; 269 flushThread.interrupt(); 270 compactExecutor.shutdown(); 271 } 272}