001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import io.opentelemetry.api.trace.Span; 021import io.opentelemetry.context.Scope; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.nio.ByteBuffer; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.NoSuchElementException; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034import net.spy.memcached.CachedData; 035import net.spy.memcached.ConnectionFactory; 036import net.spy.memcached.ConnectionFactoryBuilder; 037import net.spy.memcached.FailureMode; 038import net.spy.memcached.MemcachedClient; 039import net.spy.memcached.OperationTimeoutException; 040import net.spy.memcached.transcoders.Transcoder; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.io.ByteBuffAllocator; 043import org.apache.hadoop.hbase.nio.ByteBuff; 044import org.apache.hadoop.hbase.nio.SingleByteBuff; 045import org.apache.hadoop.hbase.trace.TraceUtil; 046import org.apache.hadoop.hbase.util.Addressing; 047import org.apache.hadoop.util.StringUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 053 054/** 055 * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons 056 * that are tuned well and have a good network connection to the HBase regionservers. Any other use 057 * will likely slow down HBase greatly. 058 */ 059@InterfaceAudience.Private 060@SuppressWarnings("FutureReturnValueIgnored") 061public class MemcachedBlockCache implements BlockCache { 062 private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName()); 063 064 // Some memcache versions won't take more than 1024 * 1024. So set the limit below 065 // that just in case this client is used with those versions. 066 public static final int MAX_SIZE = 1020 * 1024; 067 068 // Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size 069 public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached 070 // spec 071 072 // Config key for what memcached servers to use. 073 // They should be specified in a comma sperated list with ports. 074 // like: 075 // 076 // host1:11211,host3:8080,host4:11211 077 public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers"; 078 public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout"; 079 public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout"; 080 public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze"; 081 public static final long MEMCACHED_DEFAULT_TIMEOUT = 500; 082 public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false; 083 public static final int STAT_THREAD_PERIOD = 60 * 5; 084 085 private final MemcachedClient client; 086 private final HFileBlockTranscoder tc = new HFileBlockTranscoder(); 087 private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache"); 088 private final AtomicLong cachedCount = new AtomicLong(); 089 private final AtomicLong notCachedCount = new AtomicLong(); 090 private final AtomicLong cacheErrorCount = new AtomicLong(); 091 private final AtomicLong timeoutCount = new AtomicLong(); 092 093 /** Statistics thread schedule pool (for heavy debugging, could remove) */ 094 private transient final ScheduledExecutorService scheduleThreadPool = 095 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 096 .setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build()); 097 098 public MemcachedBlockCache(Configuration c) throws IOException { 099 LOG.info("Creating MemcachedBlockCache"); 100 101 long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT); 102 long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT); 103 boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT); 104 105 ConnectionFactoryBuilder builder = 106 new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout) 107 .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true) 108 .setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE); 109 110 // Assume only the localhost is serving memcached. 111 // A la mcrouter or co-locating memcached with split regionservers. 112 // 113 // If this config is a pool of memcached servers they will all be used according to the 114 // default hashing scheme defined by the memcached client. Spy Memecache client in this 115 // case. 116 String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211"); 117 String[] servers = serverListString.split(","); 118 // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any 119 // resolved identities cannot have their address mappings changed while the MemcachedClient 120 // instance is alive. We won't get a chance to trigger re-resolution. 121 List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length); 122 for (String s : servers) { 123 serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s)); 124 } 125 126 client = createMemcachedClient(builder.build(), serverAddresses); 127 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD, 128 STAT_THREAD_PERIOD, TimeUnit.SECONDS); 129 } 130 131 protected MemcachedClient createMemcachedClient(ConnectionFactory factory, 132 List<InetSocketAddress> serverAddresses) throws IOException { 133 return new MemcachedClient(factory, serverAddresses); 134 } 135 136 @Override 137 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { 138 cacheBlock(cacheKey, buf); 139 } 140 141 @Override 142 public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { 143 if (buf instanceof HFileBlock) { 144 if (buf.getSerializedLength() > MAX_SIZE) { 145 LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache", 146 buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE); 147 notCachedCount.incrementAndGet(); 148 return; 149 } 150 client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> { 151 try { 152 f.get(); 153 cachedCount.incrementAndGet(); 154 } catch (Exception e) { 155 LOG.warn("Failed to cache block with key " + cacheKey, e); 156 cacheErrorCount.incrementAndGet(); 157 } 158 }); 159 } else { 160 LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey); 161 notCachedCount.incrementAndGet(); 162 } 163 } 164 165 @Override 166 public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat, 167 boolean updateCacheMetrics) { 168 // Assume that nothing is the block cache 169 HFileBlock result = null; 170 Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan(); 171 try (Scope traceScope = span.makeCurrent()) { 172 result = client.get(cacheKey.toString(), tc); 173 } catch (Exception e) { 174 // Catch a pretty broad set of exceptions to limit any changes in the memcache client 175 // and how it handles failures from leaking into the read path. 176 if ( 177 (e instanceof OperationTimeoutException) || ((e instanceof RuntimeException) 178 && (e.getCause() instanceof OperationTimeoutException)) 179 ) { 180 timeoutCount.incrementAndGet(); 181 if (LOG.isDebugEnabled()) { 182 LOG.debug("Timeout getting key " + cacheKey.toString(), e); 183 } 184 } else { 185 cacheErrorCount.incrementAndGet(); 186 if (LOG.isDebugEnabled()) { 187 LOG.debug("Exception getting key " + cacheKey.toString(), e); 188 } 189 } 190 result = null; 191 } finally { 192 span.end(); 193 if (updateCacheMetrics) { 194 if (result == null) { 195 cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 196 } else { 197 cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType()); 198 } 199 } 200 } 201 return result; 202 } 203 204 @Override 205 public boolean evictBlock(BlockCacheKey cacheKey) { 206 try { 207 cacheStats.evict(); 208 return client.delete(cacheKey.toString()).get(); 209 } catch (InterruptedException e) { 210 LOG.warn("Error deleting " + cacheKey.toString(), e); 211 Thread.currentThread().interrupt(); 212 } catch (ExecutionException e) { 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Error deleting " + cacheKey.toString(), e); 215 } 216 } 217 return false; 218 } 219 220 /** 221 * This method does nothing so that memcached can handle all evictions. 222 */ 223 @Override 224 public int evictBlocksByHfileName(String hfileName) { 225 return 0; 226 } 227 228 @Override 229 public CacheStats getStats() { 230 return cacheStats; 231 } 232 233 @Override 234 public void shutdown() { 235 client.shutdown(); 236 this.scheduleThreadPool.shutdown(); 237 for (int i = 0; i < 10; i++) { 238 if (!this.scheduleThreadPool.isShutdown()) { 239 try { 240 Thread.sleep(10); 241 } catch (InterruptedException e) { 242 LOG.warn("Interrupted while sleeping"); 243 Thread.currentThread().interrupt(); 244 break; 245 } 246 } 247 } 248 if (!this.scheduleThreadPool.isShutdown()) { 249 List<Runnable> runnables = this.scheduleThreadPool.shutdownNow(); 250 LOG.debug("Still running " + runnables); 251 } 252 } 253 254 @Override 255 public long size() { 256 return 0; 257 } 258 259 @Override 260 public long getMaxSize() { 261 return 0; 262 } 263 264 @Override 265 public long getFreeSize() { 266 return 0; 267 } 268 269 @Override 270 public long getCurrentSize() { 271 return 0; 272 } 273 274 @Override 275 public long getCurrentDataSize() { 276 return 0; 277 } 278 279 @Override 280 public long getBlockCount() { 281 return 0; 282 } 283 284 @Override 285 public long getDataBlockCount() { 286 return 0; 287 } 288 289 @Override 290 public Iterator<CachedBlock> iterator() { 291 return new Iterator<CachedBlock>() { 292 @Override 293 public boolean hasNext() { 294 return false; 295 } 296 297 @Override 298 public CachedBlock next() { 299 throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks."); 300 } 301 302 @Override 303 public void remove() { 304 305 } 306 }; 307 } 308 309 @Override 310 public BlockCache[] getBlockCaches() { 311 return null; 312 } 313 314 /** 315 * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays. 316 */ 317 private static class HFileBlockTranscoder implements Transcoder<HFileBlock> { 318 319 @Override 320 public boolean asyncDecode(CachedData d) { 321 return false; 322 } 323 324 @Override 325 public CachedData encode(HFileBlock block) { 326 ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength()); 327 block.serialize(bb, true); 328 return new CachedData(0, bb.array(), CachedData.MAX_SIZE); 329 } 330 331 @Override 332 public HFileBlock decode(CachedData d) { 333 try { 334 ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData())); 335 return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP); 336 } catch (IOException e) { 337 LOG.warn("Failed to deserialize data from memcached", e); 338 } 339 return null; 340 } 341 342 @Override 343 public int getMaxSize() { 344 return MAX_SIZE; 345 } 346 } 347 348 private static class StatisticsThread extends Thread { 349 350 private final MemcachedBlockCache c; 351 352 public StatisticsThread(MemcachedBlockCache c) { 353 super("MemcachedBlockCacheStats"); 354 setDaemon(true); 355 this.c = c; 356 } 357 358 @Override 359 public void run() { 360 c.logStats(); 361 } 362 363 } 364 365 public void logStats() { 366 LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get() 367 + ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads=" 368 + cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio=" 369 + (cacheStats.getHitCount() == 0 370 ? "0" 371 : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", ")) 372 + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits=" 373 + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio=" 374 + (cacheStats.getHitCachingCount() == 0 375 ? "0," 376 : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", ")) 377 + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted=" 378 + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()); 379 } 380 381}