View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Collection;
22  import java.util.concurrent.ScheduledExecutorService;
23  import java.util.concurrent.TimeUnit;
24  
25  import org.apache.commons.lang.StringUtils;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.io.hfile.BlockCache;
34  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
35  import org.apache.hadoop.hbase.io.hfile.CacheStats;
36  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
37  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38  import org.apache.hadoop.hbase.util.FSUtils;
39  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40  import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
41  import org.apache.hadoop.metrics2.MetricsExecutor;
42  
43  /**
44   * Impl for exposing HRegionServer Information through Hadoop's metrics 2 system.
45   */
46  @InterfaceAudience.Private
47  class MetricsRegionServerWrapperImpl
48      implements MetricsRegionServerWrapper {
49  
50    public static final Log LOG = LogFactory.getLog(MetricsRegionServerWrapperImpl.class);
51  
52    private final HRegionServer regionServer;
53  
54    private BlockCache blockCache;
55  
56    private volatile long numStores = 0;
57    private volatile long numWALFiles = 0;
58    private volatile long walFileSize = 0;
59    private volatile long numStoreFiles = 0;
60    private volatile long memstoreSize = 0;
61    private volatile long storeFileSize = 0;
62    private volatile double requestsPerSecond = 0.0;
63    private volatile long readRequestsCount = 0;
64    private volatile long writeRequestsCount = 0;
65    private volatile long checkAndMutateChecksFailed = 0;
66    private volatile long checkAndMutateChecksPassed = 0;
67    private volatile long storefileIndexSize = 0;
68    private volatile long totalStaticIndexSize = 0;
69    private volatile long totalStaticBloomSize = 0;
70    private volatile long numMutationsWithoutWAL = 0;
71    private volatile long dataInMemoryWithoutWAL = 0;
72    private volatile int percentFileLocal = 0;
73    private volatile long flushedCellsCount = 0;
74    private volatile long compactedCellsCount = 0;
75    private volatile long majorCompactedCellsCount = 0;
76    private volatile long flushedCellsSize = 0;
77    private volatile long compactedCellsSize = 0;
78    private volatile long majorCompactedCellsSize = 0;
79  
80    private CacheStats cacheStats;
81    private ScheduledExecutorService executor;
82    private Runnable runnable;
83    private long period;
84  
85    /**
86     * Can be null if not on hdfs.
87     */
88    private DFSHedgedReadMetrics dfsHedgedReadMetrics;
89  
90    public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
91      this.regionServer = regionServer;
92      initBlockCache();
93  
94      this.period =
95          regionServer.conf.getLong(HConstants.REGIONSERVER_METRICS_PERIOD,
96            HConstants.DEFAULT_REGIONSERVER_METRICS_PERIOD);
97  
98      this.executor = CompatibilitySingletonFactory.getInstance(MetricsExecutor.class).getExecutor();
99      this.runnable = new RegionServerMetricsWrapperRunnable();
100     this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
101       TimeUnit.MILLISECONDS);
102 
103     try {
104       this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
105     } catch (IOException e) {
106       LOG.warn("Failed to get hedged metrics", e);
107     }
108     if (LOG.isInfoEnabled()) {
109       LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
110     }
111   }
112 
113   /**
114    * It's possible that due to threading the block cache could not be initialized
115    * yet (testing multiple region servers in one jvm).  So we need to try and initialize
116    * the blockCache and cacheStats reference multiple times until we succeed.
117    */
118   private synchronized  void initBlockCache() {
119     CacheConfig cacheConfig = this.regionServer.cacheConfig;
120     if (cacheConfig != null && this.blockCache == null) {
121       this.blockCache = cacheConfig.getBlockCache();
122     }
123 
124     if (this.blockCache != null && this.cacheStats == null) {
125       this.cacheStats = blockCache.getStats();
126     }
127   }
128 
129   @Override
130   public String getClusterId() {
131     return regionServer.getClusterId();
132   }
133 
134   @Override
135   public long getStartCode() {
136     return regionServer.getStartcode();
137   }
138 
139   @Override
140   public String getZookeeperQuorum() {
141     ZooKeeperWatcher zk = regionServer.getZooKeeper();
142     if (zk == null) {
143       return "";
144     }
145     return zk.getQuorum();
146   }
147 
148   @Override
149   public String getCoprocessors() {
150     String[] coprocessors = regionServer.getRegionServerCoprocessors();
151     if (coprocessors == null || coprocessors.length == 0) {
152       return "";
153     }
154     return StringUtils.join(coprocessors, ", ");
155   }
156 
157   @Override
158   public String getServerName() {
159     ServerName serverName = regionServer.getServerName();
160     if (serverName == null) {
161       return "";
162     }
163     return serverName.getServerName();
164   }
165 
166   @Override
167   public long getNumOnlineRegions() {
168     Collection<HRegion> onlineRegionsLocalContext = regionServer.getOnlineRegionsLocalContext();
169     if (onlineRegionsLocalContext == null) {
170       return 0;
171     }
172     return onlineRegionsLocalContext.size();
173   }
174 
175   @Override
176   public long getTotalRequestCount() {
177     return regionServer.rpcServices.requestCount.get();
178   }
179 
180   @Override
181   public int getCompactionQueueSize() {
182     //The thread could be zero.  if so assume there is no queue.
183     if (this.regionServer.compactSplitThread == null) {
184       return 0;
185     }
186     return this.regionServer.compactSplitThread.getCompactionQueueSize();
187   }
188 
189   @Override
190   public int getSmallCompactionQueueSize() {
191     //The thread could be zero.  if so assume there is no queue.
192     if (this.regionServer.compactSplitThread == null) {
193       return 0;
194     }
195     return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
196   }
197 
198   @Override
199   public int getLargeCompactionQueueSize() {
200     //The thread could be zero.  if so assume there is no queue.
201     if (this.regionServer.compactSplitThread == null) {
202       return 0;
203     }
204     return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
205   }
206 
207   @Override
208   public int getFlushQueueSize() {
209     //If there is no flusher there should be no queue.
210     if (this.regionServer.cacheFlusher == null) {
211       return 0;
212     }
213     return this.regionServer.cacheFlusher.getFlushQueueSize();
214   }
215 
216   @Override
217   public long getBlockCacheCount() {
218     if (this.blockCache == null) {
219       return 0;
220     }
221     return this.blockCache.getBlockCount();
222   }
223 
224   @Override
225   public long getBlockCacheSize() {
226     if (this.blockCache == null) {
227       return 0;
228     }
229     return this.blockCache.getCurrentSize();
230   }
231 
232   @Override
233   public long getBlockCacheFreeSize() {
234     if (this.blockCache == null) {
235       return 0;
236     }
237     return this.blockCache.getFreeSize();
238   }
239 
240   @Override
241   public long getBlockCacheHitCount() {
242     if (this.cacheStats == null) {
243       return 0;
244     }
245     return this.cacheStats.getHitCount();
246   }
247 
248   @Override
249   public long getBlockCacheMissCount() {
250     if (this.cacheStats == null) {
251       return 0;
252     }
253     return this.cacheStats.getMissCount();
254   }
255 
256   @Override
257   public long getBlockCacheEvictedCount() {
258     if (this.cacheStats == null) {
259       return 0;
260     }
261     return this.cacheStats.getEvictedCount();
262   }
263 
264   @Override
265   public double getBlockCacheHitPercent() {
266     if (this.cacheStats == null) {
267       return 0;
268     }
269     return (int) (this.cacheStats.getHitRatio() * 100);
270   }
271 
272   @Override
273   public int getBlockCacheHitCachingPercent() {
274     if (this.cacheStats == null) {
275       return 0;
276     }
277     return (int) (this.cacheStats.getHitCachingRatio() * 100);
278   }
279 
280   @Override public void forceRecompute() {
281     this.runnable.run();
282   }
283 
284   @Override
285   public long getNumStores() {
286     return numStores;
287   }
288   
289   @Override
290   public long getNumWALFiles() {
291     return numWALFiles;
292   }
293 
294   @Override
295   public long getWALFileSize() {
296     return walFileSize;
297   }
298   
299   @Override
300   public long getNumStoreFiles() {
301     return numStoreFiles;
302   }
303 
304   @Override
305   public long getMemstoreSize() {
306     return memstoreSize;
307   }
308 
309   @Override
310   public long getStoreFileSize() {
311     return storeFileSize;
312   }
313 
314   @Override public double getRequestsPerSecond() {
315     return requestsPerSecond;
316   }
317 
318   @Override
319   public long getReadRequestsCount() {
320     return readRequestsCount;
321   }
322 
323   @Override
324   public long getWriteRequestsCount() {
325     return writeRequestsCount;
326   }
327 
328   @Override
329   public long getCheckAndMutateChecksFailed() {
330     return checkAndMutateChecksFailed;
331   }
332 
333   @Override
334   public long getCheckAndMutateChecksPassed() {
335     return checkAndMutateChecksPassed;
336   }
337 
338   @Override
339   public long getStoreFileIndexSize() {
340     return storefileIndexSize;
341   }
342 
343   @Override
344   public long getTotalStaticIndexSize() {
345     return totalStaticIndexSize;
346   }
347 
348   @Override
349   public long getTotalStaticBloomSize() {
350     return totalStaticBloomSize;
351   }
352 
353   @Override
354   public long getNumMutationsWithoutWAL() {
355     return numMutationsWithoutWAL;
356   }
357 
358   @Override
359   public long getDataInMemoryWithoutWAL() {
360     return dataInMemoryWithoutWAL;
361   }
362 
363   @Override
364   public int getPercentFileLocal() {
365     return percentFileLocal;
366   }
367 
368   @Override
369   public long getUpdatesBlockedTime() {
370     if (this.regionServer.cacheFlusher == null) {
371       return 0;
372     }
373     return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get();
374   }
375 
376   @Override
377   public long getFlushedCellsCount() {
378     return flushedCellsCount;
379   }
380 
381   @Override
382   public long getCompactedCellsCount() {
383     return compactedCellsCount;
384   }
385 
386   @Override
387   public long getMajorCompactedCellsCount() {
388     return majorCompactedCellsCount;
389   }
390 
391   @Override
392   public long getFlushedCellsSize() {
393     return flushedCellsSize;
394   }
395 
396   @Override
397   public long getCompactedCellsSize() {
398     return compactedCellsSize;
399   }
400 
401   @Override
402   public long getMajorCompactedCellsSize() {
403     return majorCompactedCellsSize;
404   }
405 
406   /**
407    * This is the runnable that will be executed on the executor every PERIOD number of seconds
408    * It will take metrics/numbers from all of the regions and use them to compute point in
409    * time metrics.
410    */
411   public class RegionServerMetricsWrapperRunnable implements Runnable {
412 
413     private long lastRan = 0;
414     private long lastRequestCount = 0;
415 
416     @Override
417     synchronized public void run() {
418       initBlockCache();
419       cacheStats = blockCache.getStats();
420 
421       HDFSBlocksDistribution hdfsBlocksDistribution =
422           new HDFSBlocksDistribution();
423 
424       long tempNumStores = 0;
425       long tempNumStoreFiles = 0;
426       long tempMemstoreSize = 0;
427       long tempStoreFileSize = 0;
428       long tempReadRequestsCount = 0;
429       long tempWriteRequestsCount = 0;
430       long tempCheckAndMutateChecksFailed = 0;
431       long tempCheckAndMutateChecksPassed = 0;
432       long tempStorefileIndexSize = 0;
433       long tempTotalStaticIndexSize = 0;
434       long tempTotalStaticBloomSize = 0;
435       long tempNumMutationsWithoutWAL = 0;
436       long tempDataInMemoryWithoutWAL = 0;
437       int tempPercentFileLocal = 0;
438       long tempFlushedCellsCount = 0;
439       long tempCompactedCellsCount = 0;
440       long tempMajorCompactedCellsCount = 0;
441       long tempFlushedCellsSize = 0;
442       long tempCompactedCellsSize = 0;
443       long tempMajorCompactedCellsSize = 0;
444 
445       for (HRegion r : regionServer.getOnlineRegionsLocalContext()) {
446         tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get();
447         tempDataInMemoryWithoutWAL += r.dataInMemoryWithoutWAL.get();
448         tempReadRequestsCount += r.readRequestsCount.get();
449         tempWriteRequestsCount += r.writeRequestsCount.get();
450         tempCheckAndMutateChecksFailed += r.checkAndMutateChecksFailed.get();
451         tempCheckAndMutateChecksPassed += r.checkAndMutateChecksPassed.get();
452         tempNumStores += r.stores.size();
453         for (Store store : r.stores.values()) {
454           tempNumStoreFiles += store.getStorefilesCount();
455           tempMemstoreSize += store.getMemStoreSize();
456           tempStoreFileSize += store.getStorefilesSize();
457           tempStorefileIndexSize += store.getStorefilesIndexSize();
458           tempTotalStaticBloomSize += store.getTotalStaticBloomSize();
459           tempTotalStaticIndexSize += store.getTotalStaticIndexSize();
460           tempFlushedCellsCount += store.getFlushedCellsCount();
461           tempCompactedCellsCount += store.getCompactedCellsCount();
462           tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount();
463           tempFlushedCellsSize += store.getFlushedCellsSize();
464           tempCompactedCellsSize += store.getCompactedCellsSize();
465           tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize();
466         }
467 
468         hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
469       }
470 
471       float localityIndex = hdfsBlocksDistribution.getBlockLocalityIndex(
472           regionServer.getServerName().getHostname());
473       tempPercentFileLocal = (int) (localityIndex * 100);
474 
475 
476       //Compute the number of requests per second
477       long currentTime = EnvironmentEdgeManager.currentTime();
478 
479       // assume that it took PERIOD seconds to start the executor.
480       // this is a guess but it's a pretty good one.
481       if (lastRan == 0) {
482         lastRan = currentTime - period;
483       }
484 
485 
486       //If we've time traveled keep the last requests per second.
487       if ((currentTime - lastRan) > 0) {
488         long currentRequestCount = getTotalRequestCount();
489         requestsPerSecond = (currentRequestCount - lastRequestCount) / ((currentTime - lastRan) / 1000.0);
490         lastRequestCount = currentRequestCount;
491       }
492       lastRan = currentTime;
493 
494       numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory);
495       walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory);
496 
497       //Copy over computed values so that no thread sees half computed values.
498       numStores = tempNumStores;
499       numStoreFiles = tempNumStoreFiles;
500       memstoreSize = tempMemstoreSize;
501       storeFileSize = tempStoreFileSize;
502       readRequestsCount = tempReadRequestsCount;
503       writeRequestsCount = tempWriteRequestsCount;
504       checkAndMutateChecksFailed = tempCheckAndMutateChecksFailed;
505       checkAndMutateChecksPassed = tempCheckAndMutateChecksPassed;
506       storefileIndexSize = tempStorefileIndexSize;
507       totalStaticIndexSize = tempTotalStaticIndexSize;
508       totalStaticBloomSize = tempTotalStaticBloomSize;
509       numMutationsWithoutWAL = tempNumMutationsWithoutWAL;
510       dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL;
511       percentFileLocal = tempPercentFileLocal;
512       flushedCellsCount = tempFlushedCellsCount;
513       compactedCellsCount = tempCompactedCellsCount;
514       majorCompactedCellsCount = tempMajorCompactedCellsCount;
515       flushedCellsSize = tempFlushedCellsSize;
516       compactedCellsSize = tempCompactedCellsSize;
517       majorCompactedCellsSize = tempMajorCompactedCellsSize;
518     }
519   }
520 
521   @Override
522   public long getHedgedReadOps() {
523     return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadOps();
524   }
525 
526   @Override
527   public long getHedgedReadWins() {
528     return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
529   }
530 }