001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.NoSuchElementException; 029import java.util.concurrent.ExecutionException; 030import net.spy.memcached.CachedData; 031import net.spy.memcached.ConnectionFactoryBuilder; 032import net.spy.memcached.FailureMode; 033import net.spy.memcached.MemcachedClient; 034import net.spy.memcached.transcoders.Transcoder; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.io.ByteBuffAllocator; 038import org.apache.hadoop.hbase.nio.ByteBuff; 039import org.apache.hadoop.hbase.nio.SingleByteBuff; 040import org.apache.hadoop.hbase.trace.TraceUtil; 041import org.apache.hadoop.hbase.util.Addressing; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons 048 * that are tuned well and have a good network connection to the HBase regionservers. Any other use 049 * will likely slow down HBase greatly. 050 */ 051@InterfaceAudience.Private 052public class MemcachedBlockCache implements BlockCache { 053 private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName()); 054 055 // Some memcache versions won't take more than 1024 * 1024. So set the limit below 056 // that just in case this client is used with those versions. 057 public static final int MAX_SIZE = 1020 * 1024; 058 059 // Config key for what memcached servers to use. 060 // They should be specified in a comma sperated list with ports. 061 // like: 062 // 063 // host1:11211,host3:8080,host4:11211 064 public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers"; 065 public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout"; 066 public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout"; 067 public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze"; 068 public static final long MEMCACHED_DEFAULT_TIMEOUT = 500; 069 public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false; 070 071 private final MemcachedClient client; 072 private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); 073 private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); 074 075 public MemcachedBlockCache(Configuration c) throws IOException { 076 LOG.info("Creating MemcachedBlockCache"); 077 078 long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT); 079 long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT); 080 boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); 081 082 ConnectionFactoryBuilder builder = 083 // Cap the max time before anything times out 084 new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) 085 // Don't keep threads around past the end of days. 086 .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) 087 .setUseNagleAlgorithm(false) // Ain't nobody got time for that 088 .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case 089 090 // Assume only the localhost is serving memecached. 091 // A la mcrouter or co-locating memcached with split regionservers. 092 // 093 // If this config is a pool of memecached servers they will all be used according to the 094 // default hashing scheme defined by the memcache client. Spy Memecache client in this 095 // case. 096 String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211"); 097 String[] servers = serverListString.split(","); 098 // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any 099 // resolved identities cannot have their address mappings changed while the MemcachedClient 100 // instance is alive. We won't get a chance to trigger re-resolution. 101 List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length); 102 for (String s : servers) { 103 serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); 104 } 105 106 client = new MemcachedClient(builder.build(), serverAddresses); 107 } 108 109 @Override 110 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 111 cacheBlock(cacheKey, buf); 112 } 113 114 @SuppressWarnings("FutureReturnValueIgnored") 115 @Override 116 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 117 if (buf instanceof HFileBlock) { 118 client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> { 119 try { 120 f.get(); 121 } catch (ExecutionException e) { 122 LOG.warn("Failed to cache block", e); 123 } 124 }); 125 } else { 126 if (LOG.isDebugEnabled()) { 127 LOG.debug( 128 "MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString()); 129 } 130 } 131 } 132 133 @Override 134 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 135 boolean updateCacheMetrics) { 136 // Assume that nothing is the block cache 137 HFileBlock result = null; 138 Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan(); 139 try (Scope traceScope = span.makeCurrent()) { 140 result = client.get(cacheKey.toString(), tc); 141 } catch (Exception e) { 142 // Catch a pretty broad set of exceptions to limit any changes in the memecache client 143 // and how it handles failures from leaking into the read path. 144 if (LOG.isDebugEnabled()) { 145 LOG.debug( 146 "Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.", 147 e); 148 } 149 result = null; 150 } finally { 151 span.end(); 152 // Update stats if this request doesn't have it turned off 100% of the time 153 if (updateCacheMetrics) { 154 if (result == null) { 155 cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 156 } else { 157 cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 158 } 159 } 160 } 161 162 return result; 163 } 164 165 @Override 166 public boolean evictBlock(BlockCacheKey cacheKey) { 167 try { 168 cacheStats.evict(); 169 return client.delete(cacheKey.toString()).get(); 170 } catch (InterruptedException e) { 171 LOG.warn("Error deleting " + cacheKey.toString(), e); 172 Thread.currentThread().interrupt(); 173 } catch (ExecutionException e) { 174 if (LOG.isDebugEnabled()) { 175 LOG.debug("Error deleting " + cacheKey.toString(), e); 176 } 177 } 178 return false; 179 } 180 181 /** 182 * This method does nothing so that memcached can handle all evictions. 183 */ 184 @Override 185 public int evictBlocksByHfileName(String hfileName) { 186 return 0; 187 } 188 189 @Override 190 public CacheStats getStats() { 191 return cacheStats; 192 } 193 194 @Override 195 public void shutdown() { 196 client.shutdown(); 197 } 198 199 @Override 200 public long size() { 201 return 0; 202 } 203 204 @Override 205 public long getMaxSize() { 206 return 0; 207 } 208 209 @Override 210 public long getFreeSize() { 211 return 0; 212 } 213 214 @Override 215 public long getCurrentSize() { 216 return 0; 217 } 218 219 @Override 220 public long getCurrentDataSize() { 221 return 0; 222 } 223 224 @Override 225 public long getBlockCount() { 226 return 0; 227 } 228 229 @Override 230 public long getDataBlockCount() { 231 return 0; 232 } 233 234 @Override 235 public Iterator<CachedBlock> iterator() { 236 return new Iterator<CachedBlock>() { 237 @Override 238 public boolean hasNext() { 239 return false; 240 } 241 242 @Override 243 public CachedBlock next() { 244 throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks."); 245 } 246 247 @Override 248 public void remove() { 249 250 } 251 }; 252 } 253 254 @Override 255 public BlockCache[] getBlockCaches() { 256 return null; 257 } 258 259 /** 260 * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays. 261 */ 262 private static class HFileBlockTranscoder implements Transcoder<HFileBlock> { 263 264 @Override 265 public boolean asyncDecode(CachedData d) { 266 return false; 267 } 268 269 @Override 270 public CachedData encode(HFileBlock block) { 271 ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength()); 272 block.serialize(bb, true); 273 return new CachedData(0, bb.array(), CachedData.MAX_SIZE); 274 } 275 276 @Override 277 public HFileBlock decode(CachedData d) { 278 try { 279 ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); 280 return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP); 281 } catch (IOException e) { 282 LOG.warn("Failed to deserialize data from memcached", e); 283 } 284 return null; 285 } 286 287 @Override 288 public int getMaxSize() { 289 return MAX_SIZE; 290 } 291 } 292}