001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Executors;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicLong;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.log.HBaseMarkers;
035import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
036import org.apache.hadoop.hbase.regionserver.HRegion;
037import org.apache.hadoop.hbase.regionserver.HStore;
038import org.apache.hadoop.hbase.regionserver.Store;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.util.HFileArchiveUtil;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046
047/**
048 * As long as there is no RegionServerServices for a master local region, we need implement the
049 * flush and compaction logic by our own.
050 * <p/>
051 * The flush logic is very simple, every time after calling a modification method in
052 * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
053 * method, we will check the memstore size and if it is above the flush size, we will call
054 * {@link HRegion#flush(boolean)} to force flush all stores.
055 * <p/>
056 * And for compaction, the logic is also very simple. After flush, we will check the store file
057 * count, if it is above the compactMin, we will do a major compaction.
058 */
059@InterfaceAudience.Private
060class MasterRegionFlusherAndCompactor implements Closeable {
061
062  private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class);
063
064  private final Configuration conf;
065
066  private final Abortable abortable;
067
068  private final HRegion region;
069
070  // as we can only count this outside the region's write/flush process so it is not accurate, but
071  // it is enough.
072  private final AtomicLong changesAfterLastFlush = new AtomicLong(0);
073
074  private final long flushSize;
075
076  private final long flushPerChanges;
077
078  private final long flushIntervalMs;
079
080  private final int compactMin;
081
082  private final Path globalArchivePath;
083
084  private final String archivedHFileSuffix;
085
086  private final Thread flushThread;
087
088  private final Lock flushLock = new ReentrantLock();
089
090  private final Condition flushCond = flushLock.newCondition();
091
092  private boolean flushRequest = false;
093
094  private long lastFlushTime;
095
096  private final ExecutorService compactExecutor;
097
098  private final Lock compactLock = new ReentrantLock();
099
100  private boolean compactRequest = false;
101
102  private volatile boolean closed = false;
103
104  MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
105    long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
106    Path globalArchivePath, String archivedHFileSuffix) {
107    this.conf = conf;
108    this.abortable = abortable;
109    this.region = region;
110    this.flushSize = flushSize;
111    this.flushPerChanges = flushPerChanges;
112    this.flushIntervalMs = flushIntervalMs;
113    this.compactMin = compactMin;
114    this.globalArchivePath = globalArchivePath;
115    this.archivedHFileSuffix = archivedHFileSuffix;
116    flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
117    flushThread.setDaemon(true);
118    flushThread.start();
119    compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
120      .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
121      .build());
122    LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
123      flushSize, flushPerChanges, flushIntervalMs, compactMin);
124  }
125
126  // inject our flush related configurations
127  static void setupConf(Configuration conf, long flushSize, long flushPerChanges,
128    long flushIntervalMs) {
129    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
130    conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
131    conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
132    LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
133      flushPerChanges, flushIntervalMs);
134  }
135
136  private void moveHFileToGlobalArchiveDir() throws IOException {
137    FileSystem fs = region.getRegionFileSystem().getFileSystem();
138    for (HStore store : region.getStores()) {
139      store.closeAndArchiveCompactedFiles();
140      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
141        store.getColumnFamilyDescriptor().getName());
142      Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
143        globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
144      try {
145        MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
146          archivedHFileSuffix);
147      } catch (IOException e) {
148        LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
149          globalStoreArchiveDir, e);
150      }
151    }
152  }
153
154  private void compact() {
155    try {
156      region.compact(true);
157      moveHFileToGlobalArchiveDir();
158    } catch (IOException e) {
159      LOG.error("Failed to compact master local region", e);
160    }
161    compactLock.lock();
162    try {
163      if (needCompaction()) {
164        compactExecutor.execute(this::compact);
165      } else {
166        compactRequest = false;
167      }
168    } finally {
169      compactLock.unlock();
170    }
171  }
172
173  private boolean needCompaction() {
174    for (Store store : region.getStores()) {
175      if (store.getStorefilesCount() >= compactMin) {
176        return true;
177      }
178    }
179    return false;
180  }
181
182  private void flushLoop() {
183    lastFlushTime = EnvironmentEdgeManager.currentTime();
184    while (!closed) {
185      flushLock.lock();
186      try {
187        while (!flushRequest) {
188          long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime();
189          if (waitTimeMs <= 0) {
190            flushRequest = true;
191            break;
192          }
193          flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS);
194          if (closed) {
195            return;
196          }
197        }
198      } catch (InterruptedException e) {
199        Thread.currentThread().interrupt();
200        continue;
201      } finally {
202        flushLock.unlock();
203      }
204      assert flushRequest;
205      changesAfterLastFlush.set(0);
206      try {
207        region.flush(true);
208        lastFlushTime = EnvironmentEdgeManager.currentTime();
209      } catch (IOException e) {
210        LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
211        abortable.abort("Failed to flush master local region", e);
212        return;
213      }
214      compactLock.lock();
215      try {
216        if (!compactRequest && needCompaction()) {
217          compactRequest = true;
218          compactExecutor.execute(this::compact);
219        }
220      } finally {
221        compactLock.unlock();
222      }
223      flushLock.lock();
224      try {
225        // reset the flushRequest flag
226        if (!shouldFlush(changesAfterLastFlush.get())) {
227          flushRequest = false;
228        }
229      } finally {
230        flushLock.unlock();
231      }
232    }
233  }
234
235  private boolean shouldFlush(long changes) {
236    long heapSize = region.getMemStoreHeapSize();
237    long offHeapSize = region.getMemStoreOffHeapSize();
238    boolean flush = heapSize + offHeapSize >= flushSize || changes > flushPerChanges;
239    if (flush && LOG.isTraceEnabled()) {
240      LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}",
241        heapSize + offHeapSize, flushSize, changes, flushPerChanges);
242    }
243    return flush;
244  }
245
246  void onUpdate() {
247    long changes = changesAfterLastFlush.incrementAndGet();
248    if (shouldFlush(changes)) {
249      requestFlush();
250    }
251  }
252
253  void requestFlush() {
254    flushLock.lock();
255    try {
256      if (flushRequest) {
257        return;
258      }
259      flushRequest = true;
260      flushCond.signalAll();
261    } finally {
262      flushLock.unlock();
263    }
264  }
265
266  @Override
267  public void close() {
268    closed = true;
269    flushThread.interrupt();
270    compactExecutor.shutdown();
271  }
272}