001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.region;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Executors;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicLong;
026import java.util.concurrent.locks.Condition;
027import java.util.concurrent.locks.Lock;
028import java.util.concurrent.locks.ReentrantLock;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.log.HBaseMarkers;
035import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
036import org.apache.hadoop.hbase.regionserver.HRegion;
037import org.apache.hadoop.hbase.regionserver.HStore;
038import org.apache.hadoop.hbase.regionserver.Store;
039import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
040import org.apache.hadoop.hbase.util.HFileArchiveUtil;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046
047/**
048 * As long as there is no RegionServerServices for a master local region, we need implement the
049 * flush and compaction logic by our own.
050 * <p/>
051 * The flush logic is very simple, every time after calling a modification method in
052 * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
053 * method, we will check the memstore size and if it is above the flush size, we will call
054 * {@link HRegion#flush(boolean)} to force flush all stores.
055 * <p/>
056 * And for compaction, the logic is also very simple. After flush, we will check the store file
057 * count, if it is above the compactMin, we will do a major compaction.
058 */
059@InterfaceAudience.Private
060class MasterRegionFlusherAndCompactor implements Closeable {
061
062  private static final Logger LOG = LoggerFactory.getLogger(MasterRegionFlusherAndCompactor.class);
063
064  private final Configuration conf;
065
066  private final Abortable abortable;
067
068  private final HRegion region;
069
070  // as we can only count this outside the region's write/flush process so it is not accurate, but
071  // it is enough.
072  private final AtomicLong changesAfterLastFlush = new AtomicLong(0);
073
074  private final long flushSize;
075
076  private final long flushPerChanges;
077
078  private final long flushIntervalMs;
079
080  private final int compactMin;
081
082  private final Path globalArchivePath;
083
084  private final String archivedHFileSuffix;
085
086  private final Thread flushThread;
087
088  private final Lock flushLock = new ReentrantLock();
089
090  private final Condition flushCond = flushLock.newCondition();
091
092  private boolean flushRequest = false;
093
094  private long lastFlushTime;
095
096  private final ExecutorService compactExecutor;
097
098  private final Lock compactLock = new ReentrantLock();
099
100  private boolean compactRequest = false;
101
102  private volatile boolean closed = false;
103
104  MasterRegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region,
105    long flushSize, long flushPerChanges, long flushIntervalMs, int compactMin,
106    Path globalArchivePath, String archivedHFileSuffix) {
107    this.conf = conf;
108    this.abortable = abortable;
109    this.region = region;
110    this.flushSize = flushSize;
111    this.flushPerChanges = flushPerChanges;
112    this.flushIntervalMs = flushIntervalMs;
113    this.compactMin = compactMin;
114    this.globalArchivePath = globalArchivePath;
115    this.archivedHFileSuffix = archivedHFileSuffix;
116    flushThread = new Thread(this::flushLoop, region.getRegionInfo().getTable() + "-Flusher");
117    flushThread.setDaemon(true);
118    flushThread.start();
119    compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
120      .setNameFormat(region.getRegionInfo().getTable() + "-Store-Compactor").setDaemon(true)
121      .build());
122    LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, compactMin={}",
123      flushSize, flushPerChanges, flushIntervalMs, compactMin);
124  }
125
126  // inject our flush related configurations
127  static void setupConf(Configuration conf, long flushSize, long flushPerChanges,
128    long flushIntervalMs) {
129    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
130    conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
131    conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
132    LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
133      flushPerChanges, flushIntervalMs);
134  }
135
136  private void moveHFileToGlobalArchiveDir() throws IOException {
137    FileSystem fs = region.getRegionFileSystem().getFileSystem();
138    for (HStore store : region.getStores()) {
139      store.closeAndArchiveCompactedFiles();
140      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, region.getRegionInfo(),
141        store.getColumnFamilyDescriptor().getName());
142      Path globalStoreArchiveDir = HFileArchiveUtil.getStoreArchivePathForArchivePath(
143        globalArchivePath, region.getRegionInfo(), store.getColumnFamilyDescriptor().getName());
144      try {
145        if (fs.exists(storeArchiveDir)) {
146          MasterRegionUtils.moveFilesUnderDir(fs, storeArchiveDir, globalStoreArchiveDir,
147            archivedHFileSuffix);
148        } else {
149          LOG.warn(
150            "Archived dir {} does not exist, there is no need to move archived hfiles from {} "
151              + "to global dir {} .",
152            storeArchiveDir, storeArchiveDir, globalStoreArchiveDir);
153        }
154      } catch (IOException e) {
155        LOG.warn("Failed to move archived hfiles from {} to global dir {}", storeArchiveDir,
156          globalStoreArchiveDir, e);
157      }
158    }
159  }
160
161  private void compact() {
162    try {
163      region.compact(true);
164      moveHFileToGlobalArchiveDir();
165    } catch (IOException e) {
166      LOG.error("Failed to compact master local region", e);
167    }
168    compactLock.lock();
169    try {
170      if (needCompaction()) {
171        compactExecutor.execute(this::compact);
172      } else {
173        compactRequest = false;
174      }
175    } finally {
176      compactLock.unlock();
177    }
178  }
179
180  private boolean needCompaction() {
181    for (Store store : region.getStores()) {
182      if (store.getStorefilesCount() >= compactMin) {
183        return true;
184      }
185    }
186    return false;
187  }
188
189  private void flushLoop() {
190    recordLastFlushTime();
191    while (!closed) {
192      flushLock.lock();
193      try {
194        while (!flushRequest) {
195          long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime();
196          if (waitTimeMs <= 0) {
197            flushRequest = true;
198            break;
199          }
200          flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS);
201          if (closed) {
202            return;
203          }
204        }
205      } catch (InterruptedException e) {
206        Thread.currentThread().interrupt();
207        continue;
208      } finally {
209        flushLock.unlock();
210      }
211      assert flushRequest;
212      resetChangesAfterLastFlush();
213      try {
214        region.flush(true);
215        recordLastFlushTime();
216      } catch (IOException e) {
217        LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
218        abortable.abort("Failed to flush master local region", e);
219        return;
220      }
221      compactLock.lock();
222      try {
223        if (!compactRequest && needCompaction()) {
224          compactRequest = true;
225          compactExecutor.execute(this::compact);
226        }
227      } finally {
228        compactLock.unlock();
229      }
230      flushLock.lock();
231      try {
232        // reset the flushRequest flag
233        if (!shouldFlush(changesAfterLastFlush.get())) {
234          flushRequest = false;
235        }
236      } finally {
237        flushLock.unlock();
238      }
239    }
240  }
241
242  private boolean shouldFlush(long changes) {
243    long heapSize = region.getMemStoreHeapSize();
244    long offHeapSize = region.getMemStoreOffHeapSize();
245    boolean flush = heapSize + offHeapSize >= flushSize || changes > flushPerChanges;
246    if (flush && LOG.isTraceEnabled()) {
247      LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}",
248        heapSize + offHeapSize, flushSize, changes, flushPerChanges);
249    }
250    return flush;
251  }
252
253  void onUpdate() {
254    long changes = changesAfterLastFlush.incrementAndGet();
255    if (shouldFlush(changes)) {
256      requestFlush();
257    }
258  }
259
260  void requestFlush() {
261    flushLock.lock();
262    try {
263      if (flushRequest) {
264        return;
265      }
266      flushRequest = true;
267      flushCond.signalAll();
268    } finally {
269      flushLock.unlock();
270    }
271  }
272
273  void resetChangesAfterLastFlush() {
274    changesAfterLastFlush.set(0);
275  }
276
277  void recordLastFlushTime() {
278    lastFlushTime = EnvironmentEdgeManager.currentTime();
279  }
280
281  @Override
282  public void close() {
283    closed = true;
284    flushThread.interrupt();
285    compactExecutor.shutdown();
286  }
287}