View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import net.spy.memcached.CachedData;
23  import net.spy.memcached.ConnectionFactoryBuilder;
24  import net.spy.memcached.FailureMode;
25  import net.spy.memcached.MemcachedClient;
26  import net.spy.memcached.transcoders.Transcoder;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.util.Addressing;
33  import org.apache.htrace.Trace;
34  import org.apache.htrace.TraceScope;
35  
36  import java.io.IOException;
37  import java.net.InetSocketAddress;
38  import java.nio.ByteBuffer;
39  import java.util.ArrayList;
40  import java.util.Iterator;
41  import java.util.List;
42  import java.util.NoSuchElementException;
43  import java.util.concurrent.ExecutionException;
44  
45  /**
46   * Class to store blocks into memcached.
47   * This should only be used on a cluster of Memcached daemons that are tuned well and have a
48   * good network connection to the HBase regionservers. Any other use will likely slow down HBase
49   * greatly.
50   */
51  @InterfaceAudience.Private
52  public class MemcachedBlockCache implements BlockCache {
53    private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName());
54  
55    // Some memcache versions won't take more than 1024 * 1024. So set the limit below
56    // that just in case this client is used with those versions.
57    public static final int MAX_SIZE = 1020 * 1024;
58  
59    // Config key for what memcached servers to use.
60    // They should be specified in a comma sperated list with ports.
61    // like:
62    //
63    // host1:11211,host3:8080,host4:11211
64    public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
65    public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
66    public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
67    public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
68  
69    private final MemcachedClient client;
70    private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
71    private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
72  
73    public MemcachedBlockCache(Configuration c) throws IOException {
74      LOG.info("Creating MemcachedBlockCache");
75  
76      long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
77      long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
78  
79      ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
80          .setOpTimeout(opTimeout)
81          .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out
82          .setFailureMode(FailureMode.Redistribute)
83          .setShouldOptimize(true)              // When regions move lots of reads happen together
84                                                // So combining them into single requests is nice.
85          .setDaemon(true)                      // Don't keep threads around past the end of days.
86          .setUseNagleAlgorithm(false)          // Ain't nobody got time for that
87          .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024);  // 4 times larger than the
88                                                                        // default block just in case
89  
90  
91      // Assume only the localhost is serving memecached.
92      // A la mcrouter or co-locating memcached with split regionservers.
93      //
94      // If this config is a pool of memecached servers they will all be used according to the
95      // default hashing scheme defined by the memcache client. Spy Memecache client in this
96      // case.
97      String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
98      String[] servers = serverListString.split(",");
99      List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length);
100     for (String s:servers) {
101       serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
102     }
103 
104     client = new MemcachedClient(builder.build(), serverAddresses);
105   }
106 
107   @Override
108   public void cacheBlock(BlockCacheKey cacheKey,
109                          Cacheable buf,
110                          boolean inMemory,
111                          boolean cacheDataInL1) {
112     cacheBlock(cacheKey, buf);
113   }
114 
115   @Override
116   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
117     if (buf instanceof HFileBlock) {
118       client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
119     } else {
120       if (LOG.isDebugEnabled()) {
121         LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
122             + buf.getClass().toString());
123       }
124     }
125   }
126 
127   @Override
128   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
129                             boolean repeat, boolean updateCacheMetrics) {
130     // Assume that nothing is the block cache
131     HFileBlock result = null;
132 
133     try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) {
134       result = client.get(cacheKey.toString(), tc);
135     } catch (Exception e) {
136       // Catch a pretty broad set of exceptions to limit any changes in the memecache client
137       // and how it handles failures from leaking into the read path.
138       if (LOG.isDebugEnabled()) {
139         LOG.debug("Exception pulling from memcached [ "
140             + cacheKey.toString()
141             + " ]. Treating as a miss.", e);
142       }
143       result = null;
144     } finally {
145       // Update stats if this request doesn't have it turned off 100% of the time
146       if (updateCacheMetrics) {
147         if (result == null) {
148           cacheStats.miss(caching);
149         } else {
150           cacheStats.hit(caching);
151         }
152       }
153     }
154 
155 
156     return result;
157   }
158 
159   @Override
160   public boolean evictBlock(BlockCacheKey cacheKey) {
161     try {
162       cacheStats.evict();
163       return client.delete(cacheKey.toString()).get();
164     } catch (InterruptedException e) {
165       LOG.warn("Error deleting " + cacheKey.toString(), e);
166       Thread.currentThread().interrupt();
167     } catch (ExecutionException e) {
168       if (LOG.isDebugEnabled()) {
169         LOG.debug("Error deleting " + cacheKey.toString(), e);
170       }
171     }
172     return false;
173   }
174 
175   /**
176    * This method does nothing so that memcached can handle all evictions.
177    */
178   @Override
179   public int evictBlocksByHfileName(String hfileName) {
180     return 0;
181   }
182 
183   @Override
184   public CacheStats getStats() {
185     return cacheStats;
186   }
187 
188   @Override
189   public void shutdown() {
190     client.shutdown();
191   }
192 
193   @Override
194   public long size() {
195     return 0;
196   }
197 
198   @Override
199   public long getFreeSize() {
200     return 0;
201   }
202 
203   @Override
204   public long getCurrentSize() {
205     return 0;
206   }
207 
208   @Override
209   public long getBlockCount() {
210     return 0;
211   }
212 
213   @Override
214   public Iterator<CachedBlock> iterator() {
215     return new Iterator<CachedBlock>() {
216       @Override
217       public boolean hasNext() {
218         return false;
219       }
220 
221       @Override
222       public CachedBlock next() {
223         throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
224       }
225 
226       @Override
227       public void remove() {
228 
229       }
230     };
231   }
232 
233   @Override
234   public BlockCache[] getBlockCaches() {
235     return null;
236   }
237 
238   /**
239    * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
240    */
241   private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
242 
243     @Override
244     public boolean asyncDecode(CachedData d) {
245       return false;
246     }
247 
248     @Override
249     public CachedData encode(HFileBlock block) {
250       ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
251       block.serialize(bb);
252       return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
253     }
254 
255     @Override
256     public HFileBlock decode(CachedData d) {
257       try {
258         ByteBuffer buf = ByteBuffer.wrap(d.getData());
259         return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
260       } catch (IOException e) {
261         LOG.warn("Error deserializing data from memcached",e);
262       }
263       return null;
264     }
265 
266     @Override
267     public int getMaxSize() {
268       return MAX_SIZE;
269     }
270   }
271 
272 }