1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import net.spy.memcached.CachedData;
23 import net.spy.memcached.ConnectionFactoryBuilder;
24 import net.spy.memcached.FailureMode;
25 import net.spy.memcached.MemcachedClient;
26 import net.spy.memcached.transcoders.Transcoder;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.util.Addressing;
33 import org.apache.htrace.Trace;
34 import org.apache.htrace.TraceScope;
35
36 import java.io.IOException;
37 import java.net.InetSocketAddress;
38 import java.nio.ByteBuffer;
39 import java.util.ArrayList;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.NoSuchElementException;
43 import java.util.concurrent.ExecutionException;
44
45
46
47
48
49
50
51 @InterfaceAudience.Private
52 public class MemcachedBlockCache implements BlockCache {
53 private static final Log LOG = LogFactory.getLog(MemcachedBlockCache.class.getName());
54
55
56
57 public static final int MAX_SIZE = 1020 * 1024;
58
59
60
61
62
63
64 public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
65 public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
66 public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
67 public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
68
69 private final MemcachedClient client;
70 private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
71 private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
72
73 public MemcachedBlockCache(Configuration c) throws IOException {
74 LOG.info("Creating MemcachedBlockCache");
75
76 long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
77 long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
78
79 ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
80 .setOpTimeout(opTimeout)
81 .setOpQueueMaxBlockTime(queueTimeout)
82 .setFailureMode(FailureMode.Redistribute)
83 .setShouldOptimize(true)
84
85 .setDaemon(true)
86 .setUseNagleAlgorithm(false)
87 .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024);
88
89
90
91
92
93
94
95
96
97 String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
98 String[] servers = serverListString.split(",");
99 List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(servers.length);
100 for (String s:servers) {
101 serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
102 }
103
104 client = new MemcachedClient(builder.build(), serverAddresses);
105 }
106
107 @Override
108 public void cacheBlock(BlockCacheKey cacheKey,
109 Cacheable buf,
110 boolean inMemory,
111 boolean cacheDataInL1) {
112 cacheBlock(cacheKey, buf);
113 }
114
115 @Override
116 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
117 if (buf instanceof HFileBlock) {
118 client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
119 } else {
120 if (LOG.isDebugEnabled()) {
121 LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
122 + buf.getClass().toString());
123 }
124 }
125 }
126
127 @Override
128 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
129 boolean repeat, boolean updateCacheMetrics) {
130
131 HFileBlock result = null;
132
133 try (TraceScope traceScope = Trace.startSpan("MemcachedBlockCache.getBlock")) {
134 result = client.get(cacheKey.toString(), tc);
135 } catch (Exception e) {
136
137
138 if (LOG.isDebugEnabled()) {
139 LOG.debug("Exception pulling from memcached [ "
140 + cacheKey.toString()
141 + " ]. Treating as a miss.", e);
142 }
143 result = null;
144 } finally {
145
146 if (updateCacheMetrics) {
147 if (result == null) {
148 cacheStats.miss(caching);
149 } else {
150 cacheStats.hit(caching);
151 }
152 }
153 }
154
155
156 return result;
157 }
158
159 @Override
160 public boolean evictBlock(BlockCacheKey cacheKey) {
161 try {
162 cacheStats.evict();
163 return client.delete(cacheKey.toString()).get();
164 } catch (InterruptedException e) {
165 LOG.warn("Error deleting " + cacheKey.toString(), e);
166 Thread.currentThread().interrupt();
167 } catch (ExecutionException e) {
168 if (LOG.isDebugEnabled()) {
169 LOG.debug("Error deleting " + cacheKey.toString(), e);
170 }
171 }
172 return false;
173 }
174
175
176
177
178 @Override
179 public int evictBlocksByHfileName(String hfileName) {
180 return 0;
181 }
182
183 @Override
184 public CacheStats getStats() {
185 return cacheStats;
186 }
187
188 @Override
189 public void shutdown() {
190 client.shutdown();
191 }
192
193 @Override
194 public long size() {
195 return 0;
196 }
197
198 @Override
199 public long getFreeSize() {
200 return 0;
201 }
202
203 @Override
204 public long getCurrentSize() {
205 return 0;
206 }
207
208 @Override
209 public long getBlockCount() {
210 return 0;
211 }
212
213 @Override
214 public Iterator<CachedBlock> iterator() {
215 return new Iterator<CachedBlock>() {
216 @Override
217 public boolean hasNext() {
218 return false;
219 }
220
221 @Override
222 public CachedBlock next() {
223 throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
224 }
225
226 @Override
227 public void remove() {
228
229 }
230 };
231 }
232
233 @Override
234 public BlockCache[] getBlockCaches() {
235 return null;
236 }
237
238
239
240
241 private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
242
243 @Override
244 public boolean asyncDecode(CachedData d) {
245 return false;
246 }
247
248 @Override
249 public CachedData encode(HFileBlock block) {
250 ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
251 block.serialize(bb);
252 return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
253 }
254
255 @Override
256 public HFileBlock decode(CachedData d) {
257 try {
258 ByteBuffer buf = ByteBuffer.wrap(d.getData());
259 return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true);
260 } catch (IOException e) {
261 LOG.warn("Error deserializing data from memcached",e);
262 }
263 return null;
264 }
265
266 @Override
267 public int getMaxSize() {
268 return MAX_SIZE;
269 }
270 }
271
272 }