1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.lang.Thread.UncaughtExceptionHandler;
23  import java.lang.management.ManagementFactory;
24  import java.util.ConcurrentModificationException;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.SortedMap;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.DelayQueue;
32  import java.util.concurrent.Delayed;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.locks.ReentrantReadWriteLock;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.RemoteExceptionHandler;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.HasThread;
48  import org.apache.hadoop.hbase.util.Threads;
49  import org.apache.hadoop.util.StringUtils;
50  import org.cliffc.high_scale_lib.Counter;
51  
52  import com.google.common.base.Preconditions;
53  
54  /**
55   * Thread that flushes cache on request
56   *
57   * NOTE: This class extends Thread rather than Chore because the sleep time
58   * can be interrupted when there is something to do, rather than the Chore
59   * sleep time which is invariant.
60   *
61   * @see FlushRequester
62   */
63  @InterfaceAudience.Private
64  class MemStoreFlusher implements FlushRequester {
65    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
66    // These two data members go together.  Any entry in the one must have
67    // a corresponding entry in the other.
68    private final BlockingQueue<FlushQueueEntry> flushQueue =
69      new DelayQueue<FlushQueueEntry>();
70    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
71      new HashMap<HRegion, FlushRegionEntry>();
72    private AtomicBoolean wakeupPending = new AtomicBoolean();
73  
74    private final long threadWakeFrequency;
75    private final HRegionServer server;
76    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
77    private final Object blockSignal = new Object();
78  
79    protected final long globalMemStoreLimit;
80    protected final long globalMemStoreLimitLowMark;
81  
82    static final float DEFAULT_UPPER = 0.4f;
83    private static final float DEFAULT_LOWER = 0.35f;
84    static final String UPPER_KEY =
85      "hbase.regionserver.global.memstore.upperLimit";
86    private static final String LOWER_KEY =
87      "hbase.regionserver.global.memstore.lowerLimit";
88    private long blockingWaitTime;
89    private final Counter updatesBlockedMsHighWater = new Counter();
90  
91    private final FlushHandler[] flushHandlers;
92  
93    /**
94     * @param conf
95     * @param server
96     */
97    public MemStoreFlusher(final Configuration conf,
98        final HRegionServer server) {
99      super();
100     this.server = server;
101     this.threadWakeFrequency =
102       conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
103     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
104     this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
105       UPPER_KEY, conf);
106     long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
107     if (lower > this.globalMemStoreLimit) {
108       lower = this.globalMemStoreLimit;
109       LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
110         "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
111     }
112     this.globalMemStoreLimitLowMark = lower;
113     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
114       90000);
115     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
116     this.flushHandlers = new FlushHandler[handlerCount];
117     LOG.info("globalMemStoreLimit=" +
118       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
119       ", globalMemStoreLimitLowMark=" +
120       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
121       ", maxHeap=" + StringUtils.humanReadableInt(max));
122   }
123 
124   /**
125    * Calculate size using passed <code>key</code> for configured
126    * percentage of <code>max</code>.
127    * @param max
128    * @param defaultLimit
129    * @param key
130    * @param c
131    * @return Limit.
132    */
133   static long globalMemStoreLimit(final long max,
134      final float defaultLimit, final String key, final Configuration c) {
135     float limit = c.getFloat(key, defaultLimit);
136     return getMemStoreLimit(max, limit, defaultLimit);
137   }
138 
139   static long getMemStoreLimit(final long max, final float limit,
140       final float defaultLimit) {
141     float effectiveLimit = limit;
142     if (limit >= 0.9f || limit < 0.1f) {
143       LOG.warn("Setting global memstore limit to default of " + defaultLimit +
144         " because supplied value outside allowed range of 0.1 -> 0.9");
145       effectiveLimit = defaultLimit;
146     }
147     return (long)(max * effectiveLimit);
148   }
149 
150   public Counter getUpdatesBlockedMsHighWater() {
151     return this.updatesBlockedMsHighWater;
152   }
153 
154   /**
155    * The memstore across all regions has exceeded the low water mark. Pick
156    * one region to flush and flush it synchronously (this is called from the
157    * flush thread)
158    * @return true if successful
159    */
160   private boolean flushOneForGlobalPressure() {
161     SortedMap<Long, HRegion> regionsBySize =
162         server.getCopyOfOnlineRegionsSortedBySize();
163 
164     Set<HRegion> excludedRegions = new HashSet<HRegion>();
165 
166     boolean flushedOne = false;
167     while (!flushedOne) {
168       // Find the biggest region that doesn't have too many storefiles
169       // (might be null!)
170       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
171           regionsBySize, excludedRegions, true);
172       // Find the biggest region, total, even if it might have too many flushes.
173       HRegion bestAnyRegion = getBiggestMemstoreRegion(
174           regionsBySize, excludedRegions, false);
175 
176       if (bestAnyRegion == null) {
177         LOG.error("Above memory mark but there are no flushable regions!");
178         return false;
179       }
180 
181       HRegion regionToFlush;
182       if (bestFlushableRegion != null &&
183           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
184         // Even if it's not supposed to be flushed, pick a region if it's more than twice
185         // as big as the best flushable one - otherwise when we're under pressure we make
186         // lots of little flushes and cause lots of compactions, etc, which just makes
187         // life worse!
188         if (LOG.isDebugEnabled()) {
189           LOG.debug("Under global heap pressure: " +
190             "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
191             "store files, but is " +
192             StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
193             " vs best flushable region's " +
194             StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
195             ". Choosing the bigger.");
196         }
197         regionToFlush = bestAnyRegion;
198       } else {
199         if (bestFlushableRegion == null) {
200           regionToFlush = bestAnyRegion;
201         } else {
202           regionToFlush = bestFlushableRegion;
203         }
204       }
205 
206       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
207 
208       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
209       flushedOne = flushRegion(regionToFlush, true);
210       if (!flushedOne) {
211         LOG.info("Excluding unflushable region " + regionToFlush +
212           " - trying to find a different region to flush.");
213         excludedRegions.add(regionToFlush);
214       }
215     }
216     return true;
217   }
218 
219   private class FlushHandler extends HasThread {
220     @Override
221     public void run() {
222       while (!server.isStopped()) {
223         FlushQueueEntry fqe = null;
224         try {
225           wakeupPending.set(false); // allow someone to wake us up again
226           fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
227           if (fqe == null || fqe instanceof WakeupFlushThread) {
228             if (isAboveLowWaterMark()) {
229               LOG.debug("Flush thread woke up because memory above low water="
230                   + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
231               if (!flushOneForGlobalPressure()) {
232                 // Wasn't able to flush any region, but we're above low water mark
233                 // This is unlikely to happen, but might happen when closing the
234                 // entire server - another thread is flushing regions. We'll just
235                 // sleep a little bit to avoid spinning, and then pretend that
236                 // we flushed one, so anyone blocked will check again
237                 Thread.sleep(1000);
238                 wakeUpIfBlocking();
239               }
240               // Enqueue another one of these tokens so we'll wake up again
241               wakeupFlushThread();
242             }
243             continue;
244           }
245           FlushRegionEntry fre = (FlushRegionEntry) fqe;
246           if (!flushRegion(fre)) {
247             break;
248           }
249         } catch (InterruptedException ex) {
250           continue;
251         } catch (ConcurrentModificationException ex) {
252           continue;
253         } catch (Exception ex) {
254           LOG.error("Cache flusher failed for entry " + fqe, ex);
255           if (!server.checkFileSystem()) {
256             break;
257           }
258         }
259       }
260       synchronized (regionsInQueue) {
261         regionsInQueue.clear();
262         flushQueue.clear();
263       }
264 
265       // Signal anyone waiting, so they see the close flag
266       wakeUpIfBlocking();
267       LOG.info(getName() + " exiting");
268     }
269   }
270 
271 
272   private void wakeupFlushThread() {
273     if (wakeupPending.compareAndSet(false, true)) {
274       flushQueue.add(new WakeupFlushThread());
275     }
276   }
277 
278   private HRegion getBiggestMemstoreRegion(
279       SortedMap<Long, HRegion> regionsBySize,
280       Set<HRegion> excludedRegions,
281       boolean checkStoreFileCount) {
282     synchronized (regionsInQueue) {
283       for (HRegion region : regionsBySize.values()) {
284         if (excludedRegions.contains(region)) {
285           continue;
286         }
287 
288         if (region.writestate.flushing || !region.writestate.writesEnabled) {
289           continue;
290         }
291 
292         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
293           continue;
294         }
295         return region;
296       }
297     }
298     return null;
299   }
300 
301   /**
302    * Return true if global memory usage is above the high watermark
303    */
304   private boolean isAboveHighWaterMark() {
305     return server.getRegionServerAccounting().
306       getGlobalMemstoreSize() >= globalMemStoreLimit;
307   }
308 
309   /**
310    * Return true if we're above the high watermark
311    */
312   private boolean isAboveLowWaterMark() {
313     return server.getRegionServerAccounting().
314       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
315   }
316 
317   public void requestFlush(HRegion r) {
318     synchronized (regionsInQueue) {
319       if (!regionsInQueue.containsKey(r)) {
320         // This entry has no delay so it will be added at the top of the flush
321         // queue.  It'll come out near immediately.
322         FlushRegionEntry fqe = new FlushRegionEntry(r);
323         this.regionsInQueue.put(r, fqe);
324         this.flushQueue.add(fqe);
325       }
326     }
327   }
328 
329   public void requestDelayedFlush(HRegion r, long delay) {
330     synchronized (regionsInQueue) {
331       if (!regionsInQueue.containsKey(r)) {
332         // This entry has some delay
333         FlushRegionEntry fqe = new FlushRegionEntry(r);
334         fqe.requeue(delay);
335         this.regionsInQueue.put(r, fqe);
336         this.flushQueue.add(fqe);
337       }
338     }
339   }
340 
341   public int getFlushQueueSize() {
342     return flushQueue.size();
343   }
344 
345   /**
346    * Only interrupt once it's done with a run through the work loop.
347    */
348   void interruptIfNecessary() {
349     lock.writeLock().lock();
350     try {
351       for (FlushHandler flushHander : flushHandlers) {
352         if (flushHander != null) flushHander.interrupt();
353       }
354     } finally {
355       lock.writeLock().unlock();
356     }
357   }
358 
359   synchronized void start(UncaughtExceptionHandler eh) {
360     ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
361         server.getServerName().toString() + "-MemStoreFlusher", eh);
362     for (int i = 0; i < flushHandlers.length; i++) {
363       flushHandlers[i] = new FlushHandler();
364       flusherThreadFactory.newThread(flushHandlers[i]);
365       flushHandlers[i].start();
366     }
367   }
368 
369   boolean isAlive() {
370     for (FlushHandler flushHander : flushHandlers) {
371       if (flushHander != null && flushHander.isAlive()) {
372         return true;
373       }
374     }
375     return false;
376   }
377 
378   void join() {
379     for (FlushHandler flushHander : flushHandlers) {
380       if (flushHander != null) {
381         Threads.shutdown(flushHander.getThread());
382       }
383     }
384   }
385 
386   /*
387    * A flushRegion that checks store file count.  If too many, puts the flush
388    * on delay queue to retry later.
389    * @param fqe
390    * @return true if the region was successfully flushed, false otherwise. If
391    * false, there will be accompanying log messages explaining why the log was
392    * not flushed.
393    */
394   private boolean flushRegion(final FlushRegionEntry fqe) {
395     HRegion region = fqe.region;
396     if (!region.getRegionInfo().isMetaRegion() &&
397         isTooManyStoreFiles(region)) {
398       if (fqe.isMaximumWait(this.blockingWaitTime)) {
399         LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
400           "ms on a compaction to clean up 'too many store files'; waited " +
401           "long enough... proceeding with flush of " +
402           region.getRegionNameAsString());
403       } else {
404         // If this is first time we've been put off, then emit a log message.
405         if (fqe.getRequeueCount() <= 0) {
406           // Note: We don't impose blockingStoreFiles constraint on meta regions
407           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
408             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
409           if (!this.server.compactSplitThread.requestSplit(region)) {
410             try {
411               this.server.compactSplitThread.requestCompaction(region, Thread
412                   .currentThread().getName());
413             } catch (IOException e) {
414               LOG.error(
415                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
416                 RemoteExceptionHandler.checkIOException(e));
417             }
418           }
419         }
420 
421         // Put back on the queue.  Have it come back out of the queue
422         // after a delay of this.blockingWaitTime / 100 ms.
423         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
424         // Tell a lie, it's not flushed but it's ok
425         return true;
426       }
427     }
428     return flushRegion(region, false);
429   }
430 
431   /*
432    * Flush a region.
433    * @param region Region to flush.
434    * @param emergencyFlush Set if we are being force flushed. If true the region
435    * needs to be removed from the flush queue. If false, when we were called
436    * from the main flusher run loop and we got the entry to flush by calling
437    * poll on the flush queue (which removed it).
438    *
439    * @return true if the region was successfully flushed, false otherwise. If
440    * false, there will be accompanying log messages explaining why the log was
441    * not flushed.
442    */
443   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
444     synchronized (this.regionsInQueue) {
445       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
446       if (fqe != null && emergencyFlush) {
447         // Need to remove from region from delay queue.  When NOT an
448         // emergencyFlush, then item was removed via a flushQueue.poll.
449         flushQueue.remove(fqe);
450      }
451     }
452     lock.readLock().lock();
453     try {
454       boolean shouldCompact = region.flushcache();
455       // We just want to check the size
456       boolean shouldSplit = region.checkSplit() != null;
457       if (shouldSplit) {
458         this.server.compactSplitThread.requestSplit(region);
459       } else if (shouldCompact) {
460         server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
461       }
462 
463     } catch (DroppedSnapshotException ex) {
464       // Cache flush can fail in a few places. If it fails in a critical
465       // section, we get a DroppedSnapshotException and a replay of hlog
466       // is required. Currently the only way to do this is a restart of
467       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
468       // where hdfs was bad but passed the hdfs check).
469       server.abort("Replay of HLog required. Forcing server shutdown", ex);
470       return false;
471     } catch (IOException ex) {
472       LOG.error("Cache flush failed" +
473         (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
474         RemoteExceptionHandler.checkIOException(ex));
475       if (!server.checkFileSystem()) {
476         return false;
477       }
478     } finally {
479       lock.readLock().unlock();
480       wakeUpIfBlocking();
481     }
482     return true;
483   }
484 
485   private void wakeUpIfBlocking() {
486     synchronized (blockSignal) {
487       blockSignal.notifyAll();
488     }
489   }
490 
491   private boolean isTooManyStoreFiles(HRegion region) {
492     for (Store store : region.stores.values()) {
493       if (store.hasTooManyStoreFiles()) {
494         return true;
495       }
496     }
497     return false;
498   }
499 
500   /**
501    * Check if the regionserver's memstore memory usage is greater than the
502    * limit. If so, flush regions with the biggest memstores until we're down
503    * to the lower limit. This method blocks callers until we're down to a safe
504    * amount of memstore consumption.
505    */
506   public void reclaimMemStoreMemory() {
507     if (isAboveHighWaterMark()) {
508       long start = System.currentTimeMillis();
509       synchronized (this.blockSignal) {
510         boolean blocked = false;
511         long startTime = 0;
512         while (isAboveHighWaterMark() && !server.isStopped()) {
513           if (!blocked) {
514             startTime = EnvironmentEdgeManager.currentTimeMillis();
515             LOG.info("Blocking updates on " + server.toString() +
516             ": the global memstore size " +
517             StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
518             " is >= than blocking " +
519             StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
520           }
521           blocked = true;
522           wakeupFlushThread();
523           try {
524             // we should be able to wait forever, but we've seen a bug where
525             // we miss a notify, so put a 5 second bound on it at least.
526             blockSignal.wait(5 * 1000);
527           } catch (InterruptedException ie) {
528             Thread.currentThread().interrupt();
529           }
530           long took = System.currentTimeMillis() - start;
531           LOG.warn("Memstore is above high water mark and block " + took + "ms");
532         }
533         if(blocked){
534           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
535           if(totalTime > 0){
536             this.updatesBlockedMsHighWater.add(totalTime);
537           }
538           LOG.info("Unblocking updates for server " + server.toString());
539         }
540       }
541     } else if (isAboveLowWaterMark()) {
542       wakeupFlushThread();
543     }
544   }
545   @Override
546   public String toString() {
547     return "flush_queue="
548         + flushQueue.size();
549   }
550 
551   public String dumpQueue() {
552     StringBuilder queueList = new StringBuilder();
553     queueList.append("Flush Queue Queue dump:\n");
554     queueList.append("  Flush Queue:\n");
555     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
556 
557     while(it.hasNext()){
558       queueList.append("    "+it.next().toString());
559       queueList.append("\n");
560     }
561 
562     return queueList.toString();
563   }
564 
565   interface FlushQueueEntry extends Delayed {}
566 
567   /**
568    * Token to insert into the flush queue that ensures that the flusher does not sleep
569    */
570   static class WakeupFlushThread implements FlushQueueEntry {
571     @Override
572     public long getDelay(TimeUnit unit) {
573       return 0;
574     }
575 
576     @Override
577     public int compareTo(Delayed o) {
578       return -1;
579     }
580 
581     @Override
582     public boolean equals(Object obj) {
583       return (this == obj);
584     }
585 
586   }
587 
588   /**
589    * Datastructure used in the flush queue.  Holds region and retry count.
590    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
591    * construction, the delay is zero. When added to a delay queue, we'll come
592    * out near immediately.  Call {@link #requeue(long)} passing delay in
593    * milliseconds before readding to delay queue if you want it to stay there
594    * a while.
595    */
596   static class FlushRegionEntry implements FlushQueueEntry {
597     private final HRegion region;
598 
599     private final long createTime;
600     private long whenToExpire;
601     private int requeueCount = 0;
602 
603     FlushRegionEntry(final HRegion r) {
604       this.region = r;
605       this.createTime = System.currentTimeMillis();
606       this.whenToExpire = this.createTime;
607     }
608 
609     /**
610      * @param maximumWait
611      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
612      */
613     public boolean isMaximumWait(final long maximumWait) {
614       return (System.currentTimeMillis() - this.createTime) > maximumWait;
615     }
616 
617     /**
618      * @return Count of times {@link #requeue(long)} was called; i.e this is
619      * number of times we've been requeued.
620      */
621     public int getRequeueCount() {
622       return this.requeueCount;
623     }
624 
625     /**
626      * @param when When to expire, when to come up out of the queue.
627      * Specify in milliseconds.  This method adds System.currentTimeMillis()
628      * to whatever you pass.
629      * @return This.
630      */
631     public FlushRegionEntry requeue(final long when) {
632       this.whenToExpire = System.currentTimeMillis() + when;
633       this.requeueCount++;
634       return this;
635     }
636 
637     @Override
638     public long getDelay(TimeUnit unit) {
639       return unit.convert(this.whenToExpire - System.currentTimeMillis(),
640           TimeUnit.MILLISECONDS);
641     }
642 
643     @Override
644     public int compareTo(Delayed other) {
645       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
646         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
647     }
648 
649     @Override
650     public String toString() {
651       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
652     }
653 
654     @Override
655     public boolean equals(Object obj) {
656       if (this == obj) {
657         return true;
658       }
659       if (obj == null || getClass() != obj.getClass()) {
660         return false;
661       }
662       Delayed other = (Delayed) obj;
663       return compareTo(other) == 0;
664     }
665   }
666 }