001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019 020package org.apache.hadoop.hbase.io.hfile; 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.NoSuchElementException; 029import java.util.concurrent.ExecutionException; 030 031import net.spy.memcached.CachedData; 032import net.spy.memcached.ConnectionFactoryBuilder; 033import net.spy.memcached.FailureMode; 034import net.spy.memcached.MemcachedClient; 035import net.spy.memcached.transcoders.Transcoder; 036 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; 040import org.apache.hadoop.hbase.nio.ByteBuff; 041import org.apache.hadoop.hbase.nio.SingleByteBuff; 042import org.apache.hadoop.hbase.trace.TraceUtil; 043import org.apache.hadoop.hbase.util.Addressing; 044import org.apache.htrace.core.TraceScope; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Class to store blocks into memcached. 051 * This should only be used on a cluster of Memcached daemons that are tuned well and have a 052 * good network connection to the HBase regionservers. Any other use will likely slow down HBase 053 * greatly. 054 */ 055@InterfaceAudience.Private 056public class MemcachedBlockCache implements BlockCache { 057 private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName()); 058 059 // Some memcache versions won't take more than 1024 * 1024. So set the limit below 060 // that just in case this client is used with those versions. 061 public static final int MAX_SIZE = 1020 * 1024; 062 063 // Config key for what memcached servers to use. 064 // They should be specified in a comma sperated list with ports. 065 // like: 066 // 067 // host1:11211,host3:8080,host4:11211 068 public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers"; 069 public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout"; 070 public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout"; 071 public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze"; 072 public static final long MEMCACHED_DEFAULT_TIMEOUT = 500; 073 public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false; 074 075 private final MemcachedClient client; 076 private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); 077 private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); 078 079 public MemcachedBlockCache(Configuration c) throws IOException { 080 LOG.info("Creating MemcachedBlockCache"); 081 082 long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT); 083 long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT); 084 boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); 085 086 ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder() 087 .setOpTimeout(opTimeout) 088 .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out 089 .setFailureMode(FailureMode.Redistribute) 090 .setShouldOptimize(optimize) 091 .setDaemon(true) // Don't keep threads around past the end of days. 092 .setUseNagleAlgorithm(false) // Ain't nobody got time for that 093 .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case 094 095 // Assume only the localhost is serving memecached. 096 // A la mcrouter or co-locating memcached with split regionservers. 097 // 098 // If this config is a pool of memecached servers they will all be used according to the 099 // default hashing scheme defined by the memcache client. Spy Memecache client in this 100 // case. 101 String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211"); 102 String[] servers = serverListString.split(","); 103 List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length); 104 for (String s:servers) { 105 serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); 106 } 107 108 client = new MemcachedClient(builder.build(), serverAddresses); 109 } 110 111 @Override 112 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 113 cacheBlock(cacheKey, buf); 114 } 115 116 @Override 117 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 118 if (buf instanceof HFileBlock) { 119 client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc); 120 } else { 121 if (LOG.isDebugEnabled()) { 122 LOG.debug("MemcachedBlockCache can not cache Cacheable's of type " 123 + buf.getClass().toString()); 124 } 125 } 126 } 127 128 @Override 129 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, 130 boolean repeat, boolean updateCacheMetrics) { 131 // Assume that nothing is the block cache 132 HFileBlock result = null; 133 134 try (TraceScope traceScope = TraceUtil.createTrace("MemcachedBlockCache.getBlock")) { 135 result = client.get(cacheKey.toString(), tc); 136 } catch (Exception e) { 137 // Catch a pretty broad set of exceptions to limit any changes in the memecache client 138 // and how it handles failures from leaking into the read path. 139 if (LOG.isDebugEnabled()) { 140 LOG.debug("Exception pulling from memcached [ " 141 + cacheKey.toString() 142 + " ]. Treating as a miss.", e); 143 } 144 result = null; 145 } finally { 146 // Update stats if this request doesn't have it turned off 100% of the time 147 if (updateCacheMetrics) { 148 if (result == null) { 149 cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 150 } else { 151 cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 152 } 153 } 154 } 155 156 return result; 157 } 158 159 @Override 160 public boolean evictBlock(BlockCacheKey cacheKey) { 161 try { 162 cacheStats.evict(); 163 return client.delete(cacheKey.toString()).get(); 164 } catch (InterruptedException e) { 165 LOG.warn("Error deleting " + cacheKey.toString(), e); 166 Thread.currentThread().interrupt(); 167 } catch (ExecutionException e) { 168 if (LOG.isDebugEnabled()) { 169 LOG.debug("Error deleting " + cacheKey.toString(), e); 170 } 171 } 172 return false; 173 } 174 175 /** 176 * This method does nothing so that memcached can handle all evictions. 177 */ 178 @Override 179 public int evictBlocksByHfileName(String hfileName) { 180 return 0; 181 } 182 183 @Override 184 public CacheStats getStats() { 185 return cacheStats; 186 } 187 188 @Override 189 public void shutdown() { 190 client.shutdown(); 191 } 192 193 @Override 194 public long size() { 195 return 0; 196 } 197 198 @Override 199 public long getMaxSize() { 200 return 0; 201 } 202 203 @Override 204 public long getFreeSize() { 205 return 0; 206 } 207 208 @Override 209 public long getCurrentSize() { 210 return 0; 211 } 212 213 @Override 214 public long getCurrentDataSize() { 215 return 0; 216 } 217 218 @Override 219 public long getBlockCount() { 220 return 0; 221 } 222 223 @Override 224 public long getDataBlockCount() { 225 return 0; 226 } 227 228 @Override 229 public Iterator<CachedBlock> iterator() { 230 return new Iterator<CachedBlock>() { 231 @Override 232 public boolean hasNext() { 233 return false; 234 } 235 236 @Override 237 public CachedBlock next() { 238 throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks."); 239 } 240 241 @Override 242 public void remove() { 243 244 } 245 }; 246 } 247 248 @Override 249 public BlockCache[] getBlockCaches() { 250 return null; 251 } 252 253 /** 254 * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays. 255 */ 256 private static class HFileBlockTranscoder implements Transcoder<HFileBlock> { 257 258 @Override 259 public boolean asyncDecode(CachedData d) { 260 return false; 261 } 262 263 @Override 264 public CachedData encode(HFileBlock block) { 265 ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength()); 266 block.serialize(bb, true); 267 return new CachedData(0, bb.array(), CachedData.MAX_SIZE); 268 } 269 270 @Override 271 public HFileBlock decode(CachedData d) { 272 try { 273 ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); 274 return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, true, 275 MemoryType.EXCLUSIVE); 276 } catch (IOException e) { 277 LOG.warn("Error deserializing data from memcached",e); 278 } 279 return null; 280 } 281 282 @Override 283 public int getMaxSize() { 284 return MAX_SIZE; 285 } 286 } 287}