1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.lang.management.ManagementFactory;
24 import java.util.ConcurrentModificationException;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.SortedMap;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.DelayQueue;
32 import java.util.concurrent.Delayed;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.locks.ReentrantReadWriteLock;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.RemoteExceptionHandler;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47 import org.apache.hadoop.hbase.util.HasThread;
48 import org.apache.hadoop.hbase.util.Threads;
49 import org.apache.hadoop.util.StringUtils;
50 import org.cliffc.high_scale_lib.Counter;
51
52 import com.google.common.base.Preconditions;
53
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 class MemStoreFlusher implements FlushRequester {
65 static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
66
67
68 private final BlockingQueue<FlushQueueEntry> flushQueue =
69 new DelayQueue<FlushQueueEntry>();
70 private final Map<HRegion, FlushRegionEntry> regionsInQueue =
71 new HashMap<HRegion, FlushRegionEntry>();
72 private AtomicBoolean wakeupPending = new AtomicBoolean();
73
74 private final long threadWakeFrequency;
75 private final HRegionServer server;
76 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
77 private final Object blockSignal = new Object();
78
79 protected final long globalMemStoreLimit;
80 protected final long globalMemStoreLimitLowMark;
81
82 static final float DEFAULT_UPPER = 0.4f;
83 private static final float DEFAULT_LOWER = 0.35f;
84 static final String UPPER_KEY =
85 "hbase.regionserver.global.memstore.upperLimit";
86 private static final String LOWER_KEY =
87 "hbase.regionserver.global.memstore.lowerLimit";
88 private long blockingWaitTime;
89 private final Counter updatesBlockedMsHighWater = new Counter();
90
91 private final FlushHandler[] flushHandlers;
92
93
94
95
96
97 public MemStoreFlusher(final Configuration conf,
98 final HRegionServer server) {
99 super();
100 this.server = server;
101 this.threadWakeFrequency =
102 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
103 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
104 this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
105 UPPER_KEY, conf);
106 long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
107 if (lower > this.globalMemStoreLimit) {
108 lower = this.globalMemStoreLimit;
109 LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
110 "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
111 }
112 this.globalMemStoreLimitLowMark = lower;
113 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
114 90000);
115 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
116 this.flushHandlers = new FlushHandler[handlerCount];
117 LOG.info("globalMemStoreLimit=" +
118 StringUtils.humanReadableInt(this.globalMemStoreLimit) +
119 ", globalMemStoreLimitLowMark=" +
120 StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
121 ", maxHeap=" + StringUtils.humanReadableInt(max));
122 }
123
124
125
126
127
128
129
130
131
132
133 static long globalMemStoreLimit(final long max,
134 final float defaultLimit, final String key, final Configuration c) {
135 float limit = c.getFloat(key, defaultLimit);
136 return getMemStoreLimit(max, limit, defaultLimit);
137 }
138
139 static long getMemStoreLimit(final long max, final float limit,
140 final float defaultLimit) {
141 float effectiveLimit = limit;
142 if (limit >= 0.9f || limit < 0.1f) {
143 LOG.warn("Setting global memstore limit to default of " + defaultLimit +
144 " because supplied value outside allowed range of 0.1 -> 0.9");
145 effectiveLimit = defaultLimit;
146 }
147 return (long)(max * effectiveLimit);
148 }
149
150 public Counter getUpdatesBlockedMsHighWater() {
151 return this.updatesBlockedMsHighWater;
152 }
153
154
155
156
157
158
159
160 private boolean flushOneForGlobalPressure() {
161 SortedMap<Long, HRegion> regionsBySize =
162 server.getCopyOfOnlineRegionsSortedBySize();
163
164 Set<HRegion> excludedRegions = new HashSet<HRegion>();
165
166 boolean flushedOne = false;
167 while (!flushedOne) {
168
169
170 HRegion bestFlushableRegion = getBiggestMemstoreRegion(
171 regionsBySize, excludedRegions, true);
172
173 HRegion bestAnyRegion = getBiggestMemstoreRegion(
174 regionsBySize, excludedRegions, false);
175
176 if (bestAnyRegion == null) {
177 LOG.error("Above memory mark but there are no flushable regions!");
178 return false;
179 }
180
181 HRegion regionToFlush;
182 if (bestFlushableRegion != null &&
183 bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
184
185
186
187
188 if (LOG.isDebugEnabled()) {
189 LOG.debug("Under global heap pressure: " +
190 "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
191 "store files, but is " +
192 StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
193 " vs best flushable region's " +
194 StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
195 ". Choosing the bigger.");
196 }
197 regionToFlush = bestAnyRegion;
198 } else {
199 if (bestFlushableRegion == null) {
200 regionToFlush = bestAnyRegion;
201 } else {
202 regionToFlush = bestFlushableRegion;
203 }
204 }
205
206 Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
207
208 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
209 flushedOne = flushRegion(regionToFlush, true);
210 if (!flushedOne) {
211 LOG.info("Excluding unflushable region " + regionToFlush +
212 " - trying to find a different region to flush.");
213 excludedRegions.add(regionToFlush);
214 }
215 }
216 return true;
217 }
218
219 private class FlushHandler extends HasThread {
220 @Override
221 public void run() {
222 while (!server.isStopped()) {
223 FlushQueueEntry fqe = null;
224 try {
225 wakeupPending.set(false);
226 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
227 if (fqe == null || fqe instanceof WakeupFlushThread) {
228 if (isAboveLowWaterMark()) {
229 LOG.debug("Flush thread woke up because memory above low water="
230 + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
231 if (!flushOneForGlobalPressure()) {
232
233
234
235
236
237 Thread.sleep(1000);
238 wakeUpIfBlocking();
239 }
240
241 wakeupFlushThread();
242 }
243 continue;
244 }
245 FlushRegionEntry fre = (FlushRegionEntry) fqe;
246 if (!flushRegion(fre)) {
247 break;
248 }
249 } catch (InterruptedException ex) {
250 continue;
251 } catch (ConcurrentModificationException ex) {
252 continue;
253 } catch (Exception ex) {
254 LOG.error("Cache flusher failed for entry " + fqe, ex);
255 if (!server.checkFileSystem()) {
256 break;
257 }
258 }
259 }
260 synchronized (regionsInQueue) {
261 regionsInQueue.clear();
262 flushQueue.clear();
263 }
264
265
266 wakeUpIfBlocking();
267 LOG.info(getName() + " exiting");
268 }
269 }
270
271
272 private void wakeupFlushThread() {
273 if (wakeupPending.compareAndSet(false, true)) {
274 flushQueue.add(new WakeupFlushThread());
275 }
276 }
277
278 private HRegion getBiggestMemstoreRegion(
279 SortedMap<Long, HRegion> regionsBySize,
280 Set<HRegion> excludedRegions,
281 boolean checkStoreFileCount) {
282 synchronized (regionsInQueue) {
283 for (HRegion region : regionsBySize.values()) {
284 if (excludedRegions.contains(region)) {
285 continue;
286 }
287
288 if (region.writestate.flushing || !region.writestate.writesEnabled) {
289 continue;
290 }
291
292 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
293 continue;
294 }
295 return region;
296 }
297 }
298 return null;
299 }
300
301
302
303
304 private boolean isAboveHighWaterMark() {
305 return server.getRegionServerAccounting().
306 getGlobalMemstoreSize() >= globalMemStoreLimit;
307 }
308
309
310
311
312 private boolean isAboveLowWaterMark() {
313 return server.getRegionServerAccounting().
314 getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
315 }
316
317 public void requestFlush(HRegion r) {
318 synchronized (regionsInQueue) {
319 if (!regionsInQueue.containsKey(r)) {
320
321
322 FlushRegionEntry fqe = new FlushRegionEntry(r);
323 this.regionsInQueue.put(r, fqe);
324 this.flushQueue.add(fqe);
325 }
326 }
327 }
328
329 public void requestDelayedFlush(HRegion r, long delay) {
330 synchronized (regionsInQueue) {
331 if (!regionsInQueue.containsKey(r)) {
332
333 FlushRegionEntry fqe = new FlushRegionEntry(r);
334 fqe.requeue(delay);
335 this.regionsInQueue.put(r, fqe);
336 this.flushQueue.add(fqe);
337 }
338 }
339 }
340
341 public int getFlushQueueSize() {
342 return flushQueue.size();
343 }
344
345
346
347
348 void interruptIfNecessary() {
349 lock.writeLock().lock();
350 try {
351 for (FlushHandler flushHander : flushHandlers) {
352 if (flushHander != null) flushHander.interrupt();
353 }
354 } finally {
355 lock.writeLock().unlock();
356 }
357 }
358
359 synchronized void start(UncaughtExceptionHandler eh) {
360 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
361 server.getServerName().toString() + "-MemStoreFlusher", eh);
362 for (int i = 0; i < flushHandlers.length; i++) {
363 flushHandlers[i] = new FlushHandler();
364 flusherThreadFactory.newThread(flushHandlers[i]);
365 flushHandlers[i].start();
366 }
367 }
368
369 boolean isAlive() {
370 for (FlushHandler flushHander : flushHandlers) {
371 if (flushHander != null && flushHander.isAlive()) {
372 return true;
373 }
374 }
375 return false;
376 }
377
378 void join() {
379 for (FlushHandler flushHander : flushHandlers) {
380 if (flushHander != null) {
381 Threads.shutdown(flushHander.getThread());
382 }
383 }
384 }
385
386
387
388
389
390
391
392
393
394 private boolean flushRegion(final FlushRegionEntry fqe) {
395 HRegion region = fqe.region;
396 if (!region.getRegionInfo().isMetaRegion() &&
397 isTooManyStoreFiles(region)) {
398 if (fqe.isMaximumWait(this.blockingWaitTime)) {
399 LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
400 "ms on a compaction to clean up 'too many store files'; waited " +
401 "long enough... proceeding with flush of " +
402 region.getRegionNameAsString());
403 } else {
404
405 if (fqe.getRequeueCount() <= 0) {
406
407 LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
408 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
409 if (!this.server.compactSplitThread.requestSplit(region)) {
410 try {
411 this.server.compactSplitThread.requestCompaction(region, Thread
412 .currentThread().getName());
413 } catch (IOException e) {
414 LOG.error(
415 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
416 RemoteExceptionHandler.checkIOException(e));
417 }
418 }
419 }
420
421
422
423 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
424
425 return true;
426 }
427 }
428 return flushRegion(region, false);
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442
443 private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
444 synchronized (this.regionsInQueue) {
445 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
446 if (fqe != null && emergencyFlush) {
447
448
449 flushQueue.remove(fqe);
450 }
451 }
452 lock.readLock().lock();
453 try {
454 boolean shouldCompact = region.flushcache();
455
456 boolean shouldSplit = region.checkSplit() != null;
457 if (shouldSplit) {
458 this.server.compactSplitThread.requestSplit(region);
459 } else if (shouldCompact) {
460 server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
461 }
462
463 } catch (DroppedSnapshotException ex) {
464
465
466
467
468
469 server.abort("Replay of HLog required. Forcing server shutdown", ex);
470 return false;
471 } catch (IOException ex) {
472 LOG.error("Cache flush failed" +
473 (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
474 RemoteExceptionHandler.checkIOException(ex));
475 if (!server.checkFileSystem()) {
476 return false;
477 }
478 } finally {
479 lock.readLock().unlock();
480 wakeUpIfBlocking();
481 }
482 return true;
483 }
484
485 private void wakeUpIfBlocking() {
486 synchronized (blockSignal) {
487 blockSignal.notifyAll();
488 }
489 }
490
491 private boolean isTooManyStoreFiles(HRegion region) {
492 for (Store store : region.stores.values()) {
493 if (store.hasTooManyStoreFiles()) {
494 return true;
495 }
496 }
497 return false;
498 }
499
500
501
502
503
504
505
506 public void reclaimMemStoreMemory() {
507 if (isAboveHighWaterMark()) {
508 long start = System.currentTimeMillis();
509 synchronized (this.blockSignal) {
510 boolean blocked = false;
511 long startTime = 0;
512 while (isAboveHighWaterMark() && !server.isStopped()) {
513 if (!blocked) {
514 startTime = EnvironmentEdgeManager.currentTimeMillis();
515 LOG.info("Blocking updates on " + server.toString() +
516 ": the global memstore size " +
517 StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
518 " is >= than blocking " +
519 StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
520 }
521 blocked = true;
522 wakeupFlushThread();
523 try {
524
525
526 blockSignal.wait(5 * 1000);
527 } catch (InterruptedException ie) {
528 Thread.currentThread().interrupt();
529 }
530 long took = System.currentTimeMillis() - start;
531 LOG.warn("Memstore is above high water mark and block " + took + "ms");
532 }
533 if(blocked){
534 final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
535 if(totalTime > 0){
536 this.updatesBlockedMsHighWater.add(totalTime);
537 }
538 LOG.info("Unblocking updates for server " + server.toString());
539 }
540 }
541 } else if (isAboveLowWaterMark()) {
542 wakeupFlushThread();
543 }
544 }
545 @Override
546 public String toString() {
547 return "flush_queue="
548 + flushQueue.size();
549 }
550
551 public String dumpQueue() {
552 StringBuilder queueList = new StringBuilder();
553 queueList.append("Flush Queue Queue dump:\n");
554 queueList.append(" Flush Queue:\n");
555 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
556
557 while(it.hasNext()){
558 queueList.append(" "+it.next().toString());
559 queueList.append("\n");
560 }
561
562 return queueList.toString();
563 }
564
565 interface FlushQueueEntry extends Delayed {}
566
567
568
569
570 static class WakeupFlushThread implements FlushQueueEntry {
571 @Override
572 public long getDelay(TimeUnit unit) {
573 return 0;
574 }
575
576 @Override
577 public int compareTo(Delayed o) {
578 return -1;
579 }
580
581 @Override
582 public boolean equals(Object obj) {
583 return (this == obj);
584 }
585
586 }
587
588
589
590
591
592
593
594
595
596 static class FlushRegionEntry implements FlushQueueEntry {
597 private final HRegion region;
598
599 private final long createTime;
600 private long whenToExpire;
601 private int requeueCount = 0;
602
603 FlushRegionEntry(final HRegion r) {
604 this.region = r;
605 this.createTime = System.currentTimeMillis();
606 this.whenToExpire = this.createTime;
607 }
608
609
610
611
612
613 public boolean isMaximumWait(final long maximumWait) {
614 return (System.currentTimeMillis() - this.createTime) > maximumWait;
615 }
616
617
618
619
620
621 public int getRequeueCount() {
622 return this.requeueCount;
623 }
624
625
626
627
628
629
630
631 public FlushRegionEntry requeue(final long when) {
632 this.whenToExpire = System.currentTimeMillis() + when;
633 this.requeueCount++;
634 return this;
635 }
636
637 @Override
638 public long getDelay(TimeUnit unit) {
639 return unit.convert(this.whenToExpire - System.currentTimeMillis(),
640 TimeUnit.MILLISECONDS);
641 }
642
643 @Override
644 public int compareTo(Delayed other) {
645 return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
646 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
647 }
648
649 @Override
650 public String toString() {
651 return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
652 }
653
654 @Override
655 public boolean equals(Object obj) {
656 if (this == obj) {
657 return true;
658 }
659 if (obj == null || getClass() != obj.getClass()) {
660 return false;
661 }
662 Delayed other = (Delayed) obj;
663 return compareTo(other) == 0;
664 }
665 }
666 }