View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.lang.management.ManagementFactory;
24  import java.util.ConcurrentModificationException;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.SortedMap;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.DelayQueue;
32  import java.util.concurrent.Delayed;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  import java.util.concurrent.locks.Condition;
36  import java.util.concurrent.locks.ReentrantLock;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.hbase.DroppedSnapshotException;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.RemoteExceptionHandler;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.util.StringUtils;
48  import org.cliffc.high_scale_lib.Counter;
49  
50  import com.google.common.base.Preconditions;
51  
52  /**
53   * Thread that flushes cache on request
54   *
55   * NOTE: This class extends Thread rather than Chore because the sleep time
56   * can be interrupted when there is something to do, rather than the Chore
57   * sleep time which is invariant.
58   *
59   * @see FlushRequester
60   */
61  class MemStoreFlusher extends HasThread implements FlushRequester {
62    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
63    // These two data members go together.  Any entry in the one must have
64    // a corresponding entry in the other.
65    private final BlockingQueue<FlushQueueEntry> flushQueue =
66      new DelayQueue<FlushQueueEntry>();
67    private final Map<HRegion, FlushRegionEntry> regionsInQueue =
68      new HashMap<HRegion, FlushRegionEntry>();
69    private AtomicBoolean wakeupPending = new AtomicBoolean();
70  
71    private final long threadWakeFrequency;
72    private final HRegionServer server;
73    private final ReentrantLock lock = new ReentrantLock();
74    private final Condition flushOccurred = lock.newCondition();
75  
76    protected final long globalMemStoreLimit;
77    protected final long globalMemStoreLimitLowMark;
78  
79    private static final float DEFAULT_UPPER = 0.4f;
80    private static final float DEFAULT_LOWER = 0.35f;
81    private static final String UPPER_KEY =
82      "hbase.regionserver.global.memstore.upperLimit";
83    private static final String LOWER_KEY =
84      "hbase.regionserver.global.memstore.lowerLimit";
85    
86    private long blockingWaitTime;
87    private final Counter updatesBlockedMsHighWater = new Counter();
88  
89    /**
90     * @param conf
91     * @param server
92     */
93    public MemStoreFlusher(final Configuration conf,
94        final HRegionServer server) {
95      super();
96      this.server = server;
97      this.threadWakeFrequency =
98        conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
99      long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
100     this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
101       UPPER_KEY, conf);
102     long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
103     if (lower > this.globalMemStoreLimit) {
104       lower = this.globalMemStoreLimit;
105       LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
106         "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
107     }
108     this.globalMemStoreLimitLowMark = lower;
109     
110     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
111       90000);
112     LOG.info("globalMemStoreLimit=" +
113       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
114       ", globalMemStoreLimitLowMark=" +
115       StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
116       ", maxHeap=" + StringUtils.humanReadableInt(max));
117   }
118 
119   /**
120    * Calculate size using passed <code>key</code> for configured
121    * percentage of <code>max</code>.
122    * @param max
123    * @param defaultLimit
124    * @param key
125    * @param c
126    * @return Limit.
127    */
128   static long globalMemStoreLimit(final long max,
129      final float defaultLimit, final String key, final Configuration c) {
130     float limit = c.getFloat(key, defaultLimit);
131     return getMemStoreLimit(max, limit, defaultLimit);
132   }
133 
134   static long getMemStoreLimit(final long max, final float limit,
135       final float defaultLimit) {
136     float effectiveLimit = limit;
137     if (limit >= 0.9f || limit < 0.1f) {
138       LOG.warn("Setting global memstore limit to default of " + defaultLimit +
139         " because supplied value outside allowed range of 0.1 -> 0.9");
140       effectiveLimit = defaultLimit;
141     }
142     return (long)(max * effectiveLimit);
143   }
144 
145   public Counter getUpdatesBlockedMsHighWater() {
146     return this.updatesBlockedMsHighWater;
147   }
148 
149   /**
150    * The memstore across all regions has exceeded the low water mark. Pick
151    * one region to flush and flush it synchronously (this is called from the
152    * flush thread)
153    * @return true if successful
154    */
155   private boolean flushOneForGlobalPressure() {
156     SortedMap<Long, HRegion> regionsBySize =
157         server.getCopyOfOnlineRegionsSortedBySize();
158 
159     Set<HRegion> excludedRegions = new HashSet<HRegion>();
160 
161     boolean flushedOne = false;
162     while (!flushedOne) {
163       // Find the biggest region that doesn't have too many storefiles
164       // (might be null!)
165       HRegion bestFlushableRegion = getBiggestMemstoreRegion(
166           regionsBySize, excludedRegions, true);
167       // Find the biggest region, total, even if it might have too many flushes.
168       HRegion bestAnyRegion = getBiggestMemstoreRegion(
169           regionsBySize, excludedRegions, false);
170 
171       if (bestAnyRegion == null) {
172         LOG.error("Above memory mark but there are no flushable regions!");
173         return false;
174       }
175 
176       HRegion regionToFlush;
177       if (bestFlushableRegion != null &&
178           bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
179         // Even if it's not supposed to be flushed, pick a region if it's more than twice
180         // as big as the best flushable one - otherwise when we're under pressure we make
181         // lots of little flushes and cause lots of compactions, etc, which just makes
182         // life worse!
183         if (LOG.isDebugEnabled()) {
184           LOG.debug("Under global heap pressure: " +
185             "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
186             "store files, but is " +
187             StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
188             " vs best flushable region's " +
189             StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
190             ". Choosing the bigger.");
191         }
192         regionToFlush = bestAnyRegion;
193       } else {
194         if (bestFlushableRegion == null) {
195           regionToFlush = bestAnyRegion;
196         } else {
197           regionToFlush = bestFlushableRegion;
198         }
199       }
200 
201       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
202 
203       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
204       flushedOne = flushRegion(regionToFlush, true);
205       if (!flushedOne) {
206         LOG.info("Excluding unflushable region " + regionToFlush +
207           " - trying to find a different region to flush.");
208         excludedRegions.add(regionToFlush);
209       }
210     }
211     return true;
212   }
213 
214   @Override
215   public void run() {
216     while (!this.server.isStopped()) {
217       FlushQueueEntry fqe = null;
218       try {
219         wakeupPending.set(false); // allow someone to wake us up again
220         fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
221         if (fqe == null || fqe instanceof WakeupFlushThread) {
222           if (isAboveLowWaterMark()) {
223             LOG.debug("Flush thread woke up because memory above low water=" +
224               StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
225             if (!flushOneForGlobalPressure()) {
226               // Wasn't able to flush any region, but we're above low water mark
227               // This is unlikely to happen, but might happen when closing the
228               // entire server - another thread is flushing regions. We'll just
229               // sleep a little bit to avoid spinning, and then pretend that
230               // we flushed one, so anyone blocked will check again
231               lock.lock();
232               try {
233                 Thread.sleep(1000);
234                 flushOccurred.signalAll();
235               } finally {
236                 lock.unlock();
237               }
238             }
239             // Enqueue another one of these tokens so we'll wake up again
240             wakeupFlushThread();
241           }
242           continue;
243         }
244         FlushRegionEntry fre = (FlushRegionEntry)fqe;
245         if (!flushRegion(fre)) {
246           break;
247         }
248       } catch (InterruptedException ex) {
249         continue;
250       } catch (ConcurrentModificationException ex) {
251         continue;
252       } catch (Exception ex) {
253         LOG.error("Cache flusher failed for entry " + fqe, ex);
254         if (!server.checkFileSystem()) {
255           break;
256         }
257       }
258     }
259     this.regionsInQueue.clear();
260     this.flushQueue.clear();
261 
262     // Signal anyone waiting, so they see the close flag
263     lock.lock();
264     try {
265       flushOccurred.signalAll();
266     } finally {
267       lock.unlock();
268     }
269     LOG.info(getName() + " exiting");
270   }
271 
272   private void wakeupFlushThread() {
273     if (wakeupPending.compareAndSet(false, true)) {
274       flushQueue.add(new WakeupFlushThread());
275     }
276   }
277 
278   private HRegion getBiggestMemstoreRegion(
279       SortedMap<Long, HRegion> regionsBySize,
280       Set<HRegion> excludedRegions,
281       boolean checkStoreFileCount) {
282     synchronized (regionsInQueue) {
283       for (HRegion region : regionsBySize.values()) {
284         if (excludedRegions.contains(region)) {
285           continue;
286         }
287 
288         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
289           continue;
290         }
291         return region;
292       }
293     }
294     return null;
295   }
296 
297   /**
298    * Return true if global memory usage is above the high watermark
299    */
300   private boolean isAboveHighWaterMark() {
301     return server.getRegionServerAccounting().
302       getGlobalMemstoreSize() >= globalMemStoreLimit;
303   }
304 
305   /**
306    * Return true if we're above the high watermark
307    */
308   private boolean isAboveLowWaterMark() {
309     return server.getRegionServerAccounting().
310       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
311   }
312 
313   public void requestFlush(HRegion r) {
314     synchronized (regionsInQueue) {
315       if (!regionsInQueue.containsKey(r)) {
316         // This entry has no delay so it will be added at the top of the flush
317         // queue.  It'll come out near immediately.
318         FlushRegionEntry fqe = new FlushRegionEntry(r);
319         this.regionsInQueue.put(r, fqe);
320         this.flushQueue.add(fqe);
321       }
322     }
323   }
324 
325   public void requestDelayedFlush(HRegion r, long delay) {
326     synchronized (regionsInQueue) {
327       if (!regionsInQueue.containsKey(r)) {
328         // This entry has some delay
329         FlushRegionEntry fqe = new FlushRegionEntry(r);
330         fqe.requeue(delay);
331         this.regionsInQueue.put(r, fqe);
332         this.flushQueue.add(fqe);
333       }
334     }
335   }
336 
337   public int getFlushQueueSize() {
338     return flushQueue.size();
339   }
340 
341   /**
342    * Only interrupt once it's done with a run through the work loop.
343    */
344   void interruptIfNecessary() {
345     lock.lock();
346     try {
347       this.interrupt();
348     } finally {
349       lock.unlock();
350     }
351   }
352 
353   /*
354    * A flushRegion that checks store file count.  If too many, puts the flush
355    * on delay queue to retry later.
356    * @param fqe
357    * @return true if the region was successfully flushed, false otherwise. If
358    * false, there will be accompanying log messages explaining why the log was
359    * not flushed.
360    */
361   private boolean flushRegion(final FlushRegionEntry fqe) {
362     HRegion region = fqe.region;
363     if (!fqe.region.getRegionInfo().isMetaRegion() &&
364         isTooManyStoreFiles(region)) {
365       if (fqe.isMaximumWait(this.blockingWaitTime)) {
366         LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
367           "ms on a compaction to clean up 'too many store files'; waited " +
368           "long enough... proceeding with flush of " +
369           region.getRegionNameAsString());
370       } else {
371         // If this is first time we've been put off, then emit a log message.
372         if (fqe.getRequeueCount() <= 0) {
373           // Note: We don't impose blockingStoreFiles constraint on meta regions
374           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
375             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
376           if (!this.server.compactSplitThread.requestSplit(region)) {
377             try {
378               this.server.compactSplitThread.requestCompaction(region, getName());
379             }  catch (IOException e) {
380               LOG.error("Cache flush failed" +
381                 (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
382                 RemoteExceptionHandler.checkIOException(e));
383             }
384           }
385         }
386 
387         // Put back on the queue.  Have it come back out of the queue
388         // after a delay of this.blockingWaitTime / 100 ms.
389         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
390         // Tell a lie, it's not flushed but it's ok
391         return true;
392       }
393     }
394     return flushRegion(region, false);
395   }
396 
397   /*
398    * Flush a region.
399    * @param region Region to flush.
400    * @param emergencyFlush Set if we are being force flushed. If true the region
401    * needs to be removed from the flush queue. If false, when we were called
402    * from the main flusher run loop and we got the entry to flush by calling
403    * poll on the flush queue (which removed it).
404    *
405    * @return true if the region was successfully flushed, false otherwise. If
406    * false, there will be accompanying log messages explaining why the log was
407    * not flushed.
408    */
409   private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
410     synchronized (this.regionsInQueue) {
411       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
412       if (fqe != null && emergencyFlush) {
413         // Need to remove from region from delay queue.  When NOT an
414         // emergencyFlush, then item was removed via a flushQueue.poll.
415         flushQueue.remove(fqe);
416      }
417      lock.lock();
418     }
419     try {
420       boolean shouldCompact = region.flushcache().isCompactionNeeded();
421       // We just want to check the size
422       boolean shouldSplit = region.checkSplit() != null;
423       if (shouldSplit) {
424         this.server.compactSplitThread.requestSplit(region);
425       } else if (shouldCompact) {
426         server.compactSplitThread.requestCompaction(region, getName());
427       }
428 
429       server.getMetrics().addFlush(region.getRecentFlushInfo());
430     } catch (DroppedSnapshotException ex) {
431       // Cache flush can fail in a few places. If it fails in a critical
432       // section, we get a DroppedSnapshotException and a replay of hlog
433       // is required. Currently the only way to do this is a restart of
434       // the server. Abort because hdfs is probably bad (HBASE-644 is a case
435       // where hdfs was bad but passed the hdfs check).
436       server.abort("Replay of HLog required. Forcing server shutdown", ex);
437       return false;
438     } catch (IOException ex) {
439       LOG.error("Cache flush failed" +
440         (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
441         RemoteExceptionHandler.checkIOException(ex));
442       if (!server.checkFileSystem()) {
443         return false;
444       }
445     } finally {
446       flushOccurred.signalAll();
447       lock.unlock();
448     }
449     return true;
450   }
451 
452   private boolean isTooManyStoreFiles(HRegion region) {
453     for (Store hstore: region.stores.values()) {
454       if (hstore.hasTooManyStoreFiles()) {
455         return true;
456       }
457     }
458     return false;
459   }
460 
461   /**
462    * Check if the regionserver's memstore memory usage is greater than the
463    * limit. If so, flush regions with the biggest memstores until we're down
464    * to the lower limit. This method blocks callers until we're down to a safe
465    * amount of memstore consumption.
466    */
467   public void reclaimMemStoreMemory() {
468     if (isAboveHighWaterMark()) {
469       lock.lock();
470       try {
471         boolean blocked = false;
472         long startTime = 0;
473         while (isAboveHighWaterMark() && !server.isStopped()) {
474           if(!blocked){
475             startTime = EnvironmentEdgeManager.currentTimeMillis();
476             LOG.info("Blocking updates on " + server.toString() +
477             ": the global memstore size " +
478             StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
479             " is >= than blocking " +
480             StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
481           }
482           blocked = true;
483           wakeupFlushThread();
484           try {
485             // we should be able to wait forever, but we've seen a bug where
486             // we miss a notify, so put a 5 second bound on it at least.
487             flushOccurred.await(5, TimeUnit.SECONDS);
488           } catch (InterruptedException ie) {
489             Thread.currentThread().interrupt();
490           }
491         }
492         if(blocked){
493           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
494           if(totalTime > 0){
495             this.updatesBlockedMsHighWater.add(totalTime);
496           }
497           LOG.info("Unblocking updates for server " + server.toString());
498         }
499       } finally {
500         lock.unlock();
501       }
502     } else if (isAboveLowWaterMark()) {
503       wakeupFlushThread();
504     }
505   }
506 
507   @Override
508   public String toString() {
509     return "flush_queue="
510         + flushQueue.size();
511   }
512   
513   public String dumpQueue() {
514     StringBuilder queueList = new StringBuilder();
515     queueList.append("Flush Queue Queue dump:\n");
516     queueList.append("  Flush Queue:\n");
517     java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
518     
519     while(it.hasNext()){
520       queueList.append("    "+it.next().toString());
521       queueList.append("\n");
522     }
523     
524     return queueList.toString();
525   }
526   
527   interface FlushQueueEntry extends Delayed {}
528 
529   /**
530    * Token to insert into the flush queue that ensures that the flusher does not sleep
531    */
532   static class WakeupFlushThread implements FlushQueueEntry {
533     @Override
534     public long getDelay(TimeUnit unit) {
535       return 0;
536     }
537 
538     @Override
539     public int compareTo(Delayed o) {
540       return -1;
541     }
542   }
543 
544   /**
545    * Datastructure used in the flush queue.  Holds region and retry count.
546    * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
547    * construction, the delay is zero. When added to a delay queue, we'll come
548    * out near immediately.  Call {@link #requeue(long)} passing delay in
549    * milliseconds before readding to delay queue if you want it to stay there
550    * a while.
551    */
552   static class FlushRegionEntry implements FlushQueueEntry {
553     private final HRegion region;
554 
555     private final long createTime;
556     private long whenToExpire;
557     private int requeueCount = 0;
558 
559     FlushRegionEntry(final HRegion r) {
560       this.region = r;
561       this.createTime = System.currentTimeMillis();
562       this.whenToExpire = this.createTime;
563     }
564 
565     /**
566      * @param maximumWait
567      * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
568      */
569     public boolean isMaximumWait(final long maximumWait) {
570       return (System.currentTimeMillis() - this.createTime) > maximumWait;
571     }
572 
573     /**
574      * @return Count of times {@link #resetDelay()} was called; i.e this is
575      * number of times we've been requeued.
576      */
577     public int getRequeueCount() {
578       return this.requeueCount;
579     }
580 
581     /**
582      * @param when When to expire, when to come up out of the queue.
583      * Specify in milliseconds.  This method adds System.currentTimeMillis()
584      * to whatever you pass.
585      * @return This.
586      */
587     public FlushRegionEntry requeue(final long when) {
588       this.whenToExpire = System.currentTimeMillis() + when;
589       this.requeueCount++;
590       return this;
591     }
592 
593     @Override
594     public long getDelay(TimeUnit unit) {
595       return unit.convert(this.whenToExpire - System.currentTimeMillis(),
596           TimeUnit.MILLISECONDS);
597     }
598 
599     @Override
600     public int compareTo(Delayed other) {
601       return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
602         other.getDelay(TimeUnit.MILLISECONDS)).intValue();
603     }
604 
605     @Override
606     public String toString() {
607       return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
608     }
609   }
610 }