View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.List;
23  import java.util.concurrent.ScheduledExecutorService;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.commons.lang.StringUtils;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.io.hfile.BlockCache;
36  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
37  import org.apache.hadoop.hbase.io.hfile.CacheStats;
38  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.util.FSUtils;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
43  import org.apache.hadoop.metrics2.MetricsExecutor;
44  
45  /**
46   * Impl for exposing HRegionServer Information through Hadoop's metrics 2 system.
47   */
48  @InterfaceAudience.Private
49  class MetricsRegionServerWrapperImpl
50      implements MetricsRegionServerWrapper {
51  
52    public static final Log LOG = LogFactory.getLog(MetricsRegionServerWrapperImpl.class);
53  
54    private final HRegionServer regionServer;
55  
56    private BlockCache blockCache;
57  
58    private volatile long numStores = 0;
59    private volatile long numWALFiles = 0;
60    private volatile long walFileSize = 0;
61    private volatile long numStoreFiles = 0;
62    private volatile long memstoreSize = 0;
63    private volatile long storeFileSize = 0;
64    private volatile double requestsPerSecond = 0.0;
65    private volatile long readRequestsCount = 0;
66    private volatile long writeRequestsCount = 0;
67    private volatile long checkAndMutateChecksFailed = 0;
68    private volatile long checkAndMutateChecksPassed = 0;
69    private volatile long storefileIndexSize = 0;
70    private volatile long totalStaticIndexSize = 0;
71    private volatile long totalStaticBloomSize = 0;
72    private volatile long numMutationsWithoutWAL = 0;
73    private volatile long dataInMemoryWithoutWAL = 0;
74    private volatile int percentFileLocal = 0;
75    private volatile int percentFileLocalSecondaryRegions = 0;
76    private volatile long flushedCellsCount = 0;
77    private volatile long compactedCellsCount = 0;
78    private volatile long majorCompactedCellsCount = 0;
79    private volatile long flushedCellsSize = 0;
80    private volatile long compactedCellsSize = 0;
81    private volatile long majorCompactedCellsSize = 0;
82    private volatile long blockedRequestsCount = 0L;
83  
84    private CacheStats cacheStats;
85    private ScheduledExecutorService executor;
86    private Runnable runnable;
87    private long period;
88  
89    /**
90     * Can be null if not on hdfs.
91     */
92    private DFSHedgedReadMetrics dfsHedgedReadMetrics;
93  
94    public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
95      this.regionServer = regionServer;
96      initBlockCache();
97  
98      this.period =
99          regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
100           HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD);
101 
102     this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
103     this.runnable = new RegionServerMetricsWrapperRunnable();
104     this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
105       TimeUnit.MILLISECONDS);
106 
107     try {
108       this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
109     } catch (IOException e) {
110       LOG.warn("Failed to get hedged metrics", e);
111     }
112     if (LOG.isInfoEnabled()) {
113       LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
114     }
115   }
116 
117   /**
118    * It's possible that due to threading the block cache could not be initialized
119    * yet (testing multiple region servers in one jvm).  So we need to try and initialize
120    * the blockCache and cacheStats reference multiple times until we succeed.
121    */
122   private synchronized  void initBlockCache() {
123     CacheConfig cacheConfig = this.regionServer.cacheConfig;
124     if (cacheConfig != null && this.blockCache == null) {
125       this.blockCache = cacheConfig.getBlockCache();
126     }
127 
128     if (this.blockCache != null && this.cacheStats == null) {
129       this.cacheStats = blockCache.getStats();
130     }
131   }
132 
133   @Override
134   public String getClusterId() {
135     return regionServer.getClusterId();
136   }
137 
138   @Override
139   public long getStartCode() {
140     return regionServer.getStartcode();
141   }
142 
143   @Override
144   public String getZookeeperQuorum() {
145     ZooKeeperWatcher zk = regionServer.getZooKeeper();
146     if (zk == null) {
147       return "";
148     }
149     return zk.getQuorum();
150   }
151 
152   @Override
153   public String getCoprocessors() {
154     String[] coprocessors = regionServer.getRegionServerCoprocessors();
155     if (coprocessors == null || coprocessors.length == 0) {
156       return "";
157     }
158     return StringUtils.join(coprocessors, ", ");
159   }
160 
161   @Override
162   public String getServerName() {
163     ServerName serverName = regionServer.getServerName();
164     if (serverName == null) {
165       return "";
166     }
167     return serverName.getServerName();
168   }
169 
170   @Override
171   public long getNumOnlineRegions() {
172     Collection<Region> onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext();
173     if (onlineRegionsLocalContext == null) {
174       return 0;
175     }
176     return onlineRegionsLocalContext.size();
177   }
178 
179   @Override
180   public long getTotalRequestCount() {
181     return regionServer.rpcServices.requestCount.get();
182   }
183 
184   @Override
185   public int getSplitQueueSize() {
186     if (this.regionServer.compactSplitThread == null) {
187       return 0;
188     }
189     return this.regionServer.compactSplitThread.getSplitQueueSize();
190   }
191 
192   @Override
193   public int getCompactionQueueSize() {
194     //The thread could be zero.  if so assume there is no queue.
195     if (this.regionServer.compactSplitThread == null) {
196       return 0;
197     }
198     return this.regionServer.compactSplitThread.getCompactionQueueSize();
199   }
200 
201   @Override
202   public int getSmallCompactionQueueSize() {
203     //The thread could be zero.  if so assume there is no queue.
204     if (this.regionServer.compactSplitThread == null) {
205       return 0;
206     }
207     return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
208   }
209 
210   @Override
211   public int getLargeCompactionQueueSize() {
212     //The thread could be zero.  if so assume there is no queue.
213     if (this.regionServer.compactSplitThread == null) {
214       return 0;
215     }
216     return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
217   }
218 
219   @Override
220   public int getFlushQueueSize() {
221     //If there is no flusher there should be no queue.
222     if (this.regionServer.cacheFlusher == null) {
223       return 0;
224     }
225     return this.regionServer.cacheFlusher.getFlushQueueSize();
226   }
227 
228   @Override
229   public long getBlockCacheCount() {
230     if (this.blockCache == null) {
231       return 0;
232     }
233     return this.blockCache.getBlockCount();
234   }
235 
236   @Override
237   public long getBlockCacheSize() {
238     if (this.blockCache == null) {
239       return 0;
240     }
241     return this.blockCache.getCurrentSize();
242   }
243 
244   @Override
245   public long getBlockCacheFreeSize() {
246     if (this.blockCache == null) {
247       return 0;
248     }
249     return this.blockCache.getFreeSize();
250   }
251 
252   @Override
253   public long getBlockCacheHitCount() {
254     if (this.cacheStats == null) {
255       return 0;
256     }
257     return this.cacheStats.getHitCount();
258   }
259 
260   @Override
261   public long getBlockCacheMissCount() {
262     if (this.cacheStats == null) {
263       return 0;
264     }
265     return this.cacheStats.getMissCount();
266   }
267 
268   @Override
269   public long getBlockCacheEvictedCount() {
270     if (this.cacheStats == null) {
271       return 0;
272     }
273     return this.cacheStats.getEvictedCount();
274   }
275 
276   @Override
277   public double getBlockCacheHitPercent() {
278     if (this.cacheStats == null) {
279       return 0;
280     }
281     return (int) (this.cacheStats.getHitRatio() * 100);
282   }
283 
284   @Override
285   public int getBlockCacheHitCachingPercent() {
286     if (this.cacheStats == null) {
287       return 0;
288     }
289     return (int) (this.cacheStats.getHitCachingRatio() * 100);
290   }
291 
292   @Override public void forceRecompute() {
293     this.runnable.run();
294   }
295 
296   @Override
297   public long getNumStores() {
298     return numStores;
299   }
300   
301   @Override
302   public long getNumWALFiles() {
303     return numWALFiles;
304   }
305 
306   @Override
307   public long getWALFileSize() {
308     return walFileSize;
309   }
310   
311   @Override
312   public long getNumStoreFiles() {
313     return numStoreFiles;
314   }
315 
316   @Override
317   public long getMemstoreSize() {
318     return memstoreSize;
319   }
320 
321   @Override
322   public long getStoreFileSize() {
323     return storeFileSize;
324   }
325 
326   @Override public double getRequestsPerSecond() {
327     return requestsPerSecond;
328   }
329 
330   @Override
331   public long getReadRequestsCount() {
332     return readRequestsCount;
333   }
334 
335   @Override
336   public long getWriteRequestsCount() {
337     return writeRequestsCount;
338   }
339 
340   @Override
341   public long getCheckAndMutateChecksFailed() {
342     return checkAndMutateChecksFailed;
343   }
344 
345   @Override
346   public long getCheckAndMutateChecksPassed() {
347     return checkAndMutateChecksPassed;
348   }
349 
350   @Override
351   public long getStoreFileIndexSize() {
352     return storefileIndexSize;
353   }
354 
355   @Override
356   public long getTotalStaticIndexSize() {
357     return totalStaticIndexSize;
358   }
359 
360   @Override
361   public long getTotalStaticBloomSize() {
362     return totalStaticBloomSize;
363   }
364 
365   @Override
366   public long getNumMutationsWithoutWAL() {
367     return numMutationsWithoutWAL;
368   }
369 
370   @Override
371   public long getDataInMemoryWithoutWAL() {
372     return dataInMemoryWithoutWAL;
373   }
374 
375   @Override
376   public int getPercentFileLocal() {
377     return percentFileLocal;
378   }
379 
380   @Override
381   public int getPercentFileLocalSecondaryRegions() {
382     return percentFileLocalSecondaryRegions;
383   }
384 
385   @Override
386   public long getUpdatesBlockedTime() {
387     if (this.regionServer.cacheFlusher == null) {
388       return 0;
389     }
390     return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get();
391   }
392 
393   @Override
394   public long getFlushedCellsCount() {
395     return flushedCellsCount;
396   }
397 
398   @Override
399   public long getCompactedCellsCount() {
400     return compactedCellsCount;
401   }
402 
403   @Override
404   public long getMajorCompactedCellsCount() {
405     return majorCompactedCellsCount;
406   }
407 
408   @Override
409   public long getFlushedCellsSize() {
410     return flushedCellsSize;
411   }
412 
413   @Override
414   public long getCompactedCellsSize() {
415     return compactedCellsSize;
416   }
417 
418   @Override
419   public long getMajorCompactedCellsSize() {
420     return majorCompactedCellsSize;
421   }
422 
423   /**
424    * This is the runnable that will be executed on the executor every PERIOD number of seconds
425    * It will take metrics/numbers from all of the regions and use them to compute point in
426    * time metrics.
427    */
428   public class RegionServerMetricsWrapperRunnable implements Runnable {
429 
430     private long lastRan = 0;
431     private long lastRequestCount = 0;
432 
433     @Override
434     synchronized public void run() {
435       initBlockCache();
436       cacheStats = blockCache.getStats();
437 
438       HDFSBlocksDistribution hdfsBlocksDistribution =
439           new HDFSBlocksDistribution();
440       HDFSBlocksDistribution hdfsBlocksDistributionSecondaryRegions =
441           new HDFSBlocksDistribution();
442 
443       long tempNumStores = 0;
444       long tempNumStoreFiles = 0;
445       long tempMemstoreSize = 0;
446       long tempStoreFileSize = 0;
447       long tempReadRequestsCount = 0;
448       long tempWriteRequestsCount = 0;
449       long tempCheckAndMutateChecksFailed = 0;
450       long tempCheckAndMutateChecksPassed = 0;
451       long tempStorefileIndexSize = 0;
452       long tempTotalStaticIndexSize = 0;
453       long tempTotalStaticBloomSize = 0;
454       long tempNumMutationsWithoutWAL = 0;
455       long tempDataInMemoryWithoutWAL = 0;
456       int tempPercentFileLocal = 0;
457       int tempPercentFileLocalSecondaryRegions = 0;
458       long tempFlushedCellsCount = 0;
459       long tempCompactedCellsCount = 0;
460       long tempMajorCompactedCellsCount = 0;
461       long tempFlushedCellsSize = 0;
462       long tempCompactedCellsSize = 0;
463       long tempMajorCompactedCellsSize = 0;
464       long tempBlockedRequestsCount = 0L;
465 
466       for (Region r : regionServer.getOnlineRegionsLocalContext()) {
467         tempNumMutationsWithoutWAL += r.getNumMutationsWithoutWAL();
468         tempDataInMemoryWithoutWAL += r.getDataInMemoryWithoutWAL();
469         tempReadRequestsCount += r.getReadRequestsCount();
470         tempWriteRequestsCount += r.getWriteRequestsCount();
471         tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
472         tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
473         tempBlockedRequestsCount += r.getBlockedRequestsCount();
474         List<Store> storeList = r.getStores();
475         tempNumStores += storeList.size();
476         for (Store store : storeList) {
477           tempNumStoreFiles += store.getStorefilesCount();
478           tempMemstoreSize += store.getMemStoreSize();
479           tempStoreFileSize += store.getStorefilesSize();
480           tempStorefileIndexSize += store.getStorefilesIndexSize();
481           tempTotalStaticBloomSize += store.getTotalStaticBloomSize();
482           tempTotalStaticIndexSize += store.getTotalStaticIndexSize();
483           tempFlushedCellsCount += store.getFlushedCellsCount();
484           tempCompactedCellsCount += store.getCompactedCellsCount();
485           tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount();
486           tempFlushedCellsSize += store.getFlushedCellsSize();
487           tempCompactedCellsSize += store.getCompactedCellsSize();
488           tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize();
489         }
490 
491         HDFSBlocksDistribution distro = r.getHDFSBlocksDistribution();
492         hdfsBlocksDistribution.add(distro);
493         if (r.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
494           hdfsBlocksDistributionSecondaryRegions.add(distro);
495         }
496       }
497 
498       float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
499           regionServer.getServerName().getHostname());
500       tempPercentFileLocal = (int) (localityIndex * 100);
501 
502       float localityIndexSecondaryRegions = hdfsBlocksDistributionSecondaryRegions
503           .getBlockLocalityIndex(regionServer.getServerName().getHostname());
504       tempPercentFileLocalSecondaryRegions = (int) (localityIndexSecondaryRegions * 100);
505 
506       //Compute the number of requests per second
507       long currentTime = EnvironmentEdgeManager.currentTime();
508 
509       // assume that it took PERIOD seconds to start the executor.
510       // this is a guess but it's a pretty good one.
511       if (lastRan == 0) {
512         lastRan = currentTime - period;
513       }
514 
515 
516       //If we've time traveled keep the last requests per second.
517       if ((currentTime - lastRan) > 0) {
518         long currentRequestCount = getTotalRequestCount();
519         requestsPerSecond = (currentRequestCount - lastRequestCount) / ((currentTime - lastRan) / 1000.0);
520         lastRequestCount = currentRequestCount;
521       }
522       lastRan = currentTime;
523 
524       numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory);
525       walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory);
526 
527       //Copy over computed values so that no thread sees half computed values.
528       numStores = tempNumStores;
529       numStoreFiles = tempNumStoreFiles;
530       memstoreSize = tempMemstoreSize;
531       storeFileSize = tempStoreFileSize;
532       readRequestsCount = tempReadRequestsCount;
533       writeRequestsCount = tempWriteRequestsCount;
534       checkAndMutateChecksFailed = tempCheckAndMutateChecksFailed;
535       checkAndMutateChecksPassed = tempCheckAndMutateChecksPassed;
536       storefileIndexSize = tempStorefileIndexSize;
537       totalStaticIndexSize = tempTotalStaticIndexSize;
538       totalStaticBloomSize = tempTotalStaticBloomSize;
539       numMutationsWithoutWAL = tempNumMutationsWithoutWAL;
540       dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL;
541       percentFileLocal = tempPercentFileLocal;
542       percentFileLocalSecondaryRegions = tempPercentFileLocalSecondaryRegions;
543       flushedCellsCount = tempFlushedCellsCount;
544       compactedCellsCount = tempCompactedCellsCount;
545       majorCompactedCellsCount = tempMajorCompactedCellsCount;
546       flushedCellsSize = tempFlushedCellsSize;
547       compactedCellsSize = tempCompactedCellsSize;
548       majorCompactedCellsSize = tempMajorCompactedCellsSize;
549       blockedRequestsCount = tempBlockedRequestsCount;
550     }
551   }
552 
553   @Override
554   public long getHedgedReadOps() {
555     return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps();
556   }
557 
558   @Override
559   public long getHedgedReadWins() {
560     return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
561   }
562 
563   @Override
564   public long getBlockedRequestsCount() {
565     return blockedRequestsCount;
566   }
567 }