View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.lang.management.MemoryUsage;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.ScheduledExecutorService;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
34  import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
35  import org.apache.hadoop.util.StringUtils;
36  
37  import com.google.common.annotations.VisibleForTesting;
38  import com.google.common.util.concurrent.ThreadFactoryBuilder;
39  
40  /**
41   * A pool of {@link HeapMemStoreLAB.Chunk} instances.
42   * 
43   * MemStoreChunkPool caches a number of retired chunks for reusing, it could
44   * decrease allocating bytes when writing, thereby optimizing the garbage
45   * collection on JVM.
46   * 
47   * The pool instance is globally unique and could be obtained through
48   * {@link MemStoreChunkPool#getPool(Configuration)}
49   * 
50   * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
51   * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
52   * when MemStore clearing snapshot for flush
53   */
54  @SuppressWarnings("javadoc")
55  @InterfaceAudience.Private
56  public class MemStoreChunkPool {
57    private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
58    final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
59    final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
60    final static float POOL_MAX_SIZE_DEFAULT = 0.0f;
61    final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
62  
63    // Static reference to the MemStoreChunkPool
64    private static MemStoreChunkPool GLOBAL_INSTANCE;
65    /** Boolean whether we have disabled the memstore chunk pool entirely. */
66    static boolean chunkPoolDisabled = false;
67  
68    private final int maxCount;
69  
70    // A queue of reclaimed chunks
71    private final BlockingQueue<Chunk> reclaimedChunks;
72    private final int chunkSize;
73  
74    /** Statistics thread schedule pool */
75    private final ScheduledExecutorService scheduleThreadPool;
76    /** Statistics thread */
77    private static final int statThreadPeriod = 60 * 5;
78    private AtomicLong createdChunkCount = new AtomicLong();
79    private AtomicLong reusedChunkCount = new AtomicLong();
80  
81    MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
82        int initialCount) {
83      this.maxCount = maxCount;
84      this.chunkSize = chunkSize;
85      this.reclaimedChunks = new LinkedBlockingQueue<Chunk>();
86      for (int i = 0; i < initialCount; i++) {
87        Chunk chunk = new Chunk(chunkSize);
88        chunk.init();
89        reclaimedChunks.add(chunk);
90      }
91      final String n = Thread.currentThread().getName();
92      scheduleThreadPool = Executors.newScheduledThreadPool(1,
93          new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
94              .setDaemon(true).build());
95      this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
96          statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
97    }
98  
99    /**
100    * Poll a chunk from the pool, reset it if not null, else create a new chunk
101    * to return
102    * @return a chunk
103    */
104   Chunk getChunk() {
105     Chunk chunk = reclaimedChunks.poll();
106     if (chunk == null) {
107       chunk = new Chunk(chunkSize);
108       createdChunkCount.incrementAndGet();
109     } else {
110       chunk.reset();
111       reusedChunkCount.incrementAndGet();
112     }
113     return chunk;
114   }
115 
116   /**
117    * Add the chunks to the pool, when the pool achieves the max size, it will
118    * skip the remaining chunks
119    * @param chunks
120    */
121   void putbackChunks(BlockingQueue<Chunk> chunks) {
122     int maxNumToPutback = this.maxCount - reclaimedChunks.size();
123     if (maxNumToPutback <= 0) {
124       return;
125     }
126     chunks.drainTo(reclaimedChunks, maxNumToPutback);
127     // clear reference of any non-reclaimable chunks
128     if (chunks.size() > 0) {
129       if (LOG.isTraceEnabled()) {
130         LOG.trace("Left " + chunks.size() + " unreclaimable chunks, removing them from queue");
131       }
132       chunks.clear();
133     }
134   }
135 
136   /**
137    * Add the chunk to the pool, if the pool has achieved the max size, it will
138    * skip it
139    * @param chunk
140    */
141   void putbackChunk(Chunk chunk) {
142     if (reclaimedChunks.size() >= this.maxCount) {
143       return;
144     }
145     reclaimedChunks.add(chunk);
146   }
147 
148   int getPoolSize() {
149     return this.reclaimedChunks.size();
150   }
151 
152   /*
153    * Only used in testing
154    */
155   void clearChunks() {
156     this.reclaimedChunks.clear();
157   }
158 
159   private static class StatisticsThread extends Thread {
160     MemStoreChunkPool mcp;
161 
162     public StatisticsThread(MemStoreChunkPool mcp) {
163       super("MemStoreChunkPool.StatisticsThread");
164       setDaemon(true);
165       this.mcp = mcp;
166     }
167 
168     @Override
169     public void run() {
170       mcp.logStats();
171     }
172   }
173 
174   private void logStats() {
175     if (!LOG.isDebugEnabled()) return;
176     long created = createdChunkCount.get();
177     long reused = reusedChunkCount.get();
178     long total = created + reused;
179     LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
180         + ",created chunk count=" + created
181         + ",reused chunk count=" + reused
182         + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
183             (float) reused / (float) total, 2)));
184   }
185 
186   /**
187    * @param conf
188    * @return the global MemStoreChunkPool instance
189    */
190   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK",
191       justification="Intentional")
192   static MemStoreChunkPool getPool(Configuration conf) {
193     if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
194 
195     synchronized (MemStoreChunkPool.class) {
196       if (chunkPoolDisabled) return null;
197       if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
198       float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
199       if (poolSizePercentage <= 0) {
200         chunkPoolDisabled = true;
201         return null;
202       }
203       if (poolSizePercentage > 1.0) {
204         throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
205       }
206       long heapMax = -1L;
207       final MemoryUsage usage = HeapMemorySizeUtil.safeGetHeapMemoryUsage();
208       if (usage != null) {
209         heapMax = usage.getMax();
210       }
211       long globalMemStoreLimit = (long) (heapMax * HeapMemorySizeUtil.getGlobalMemStorePercent(conf,
212           false));
213       int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY,
214           HeapMemStoreLAB.CHUNK_SIZE_DEFAULT);
215       int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
216 
217       float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
218           POOL_INITIAL_SIZE_DEFAULT);
219       if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
220         throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
221             + " must be between 0.0 and 1.0");
222       }
223 
224       int initialCount = (int) (initialCountPercentage * maxCount);
225       LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
226           + ", max count " + maxCount + ", initial count " + initialCount);
227       GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
228       return GLOBAL_INSTANCE;
229     }
230   }
231 
232   int getMaxCount() {
233     return this.maxCount;
234   }
235 
236   @VisibleForTesting
237   static void clearDisableFlag() {
238     chunkPoolDisabled = false;
239   }
240 
241 }