001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.NoSuchElementException;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import net.spy.memcached.CachedData;
035import net.spy.memcached.ConnectionFactoryBuilder;
036import net.spy.memcached.FailureMode;
037import net.spy.memcached.MemcachedClient;
038import net.spy.memcached.OperationTimeoutException;
039import net.spy.memcached.transcoders.Transcoder;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.io.ByteBuffAllocator;
042import org.apache.hadoop.hbase.nio.ByteBuff;
043import org.apache.hadoop.hbase.nio.SingleByteBuff;
044import org.apache.hadoop.hbase.trace.TraceUtil;
045import org.apache.hadoop.hbase.util.Addressing;
046import org.apache.hadoop.util.StringUtils;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
052
053/**
054 * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
055 * that are tuned well and have a good network connection to the HBase regionservers. Any other use
056 * will likely slow down HBase greatly.
057 */
058@InterfaceAudience.Private
059@SuppressWarnings("FutureReturnValueIgnored")
060public class MemcachedBlockCache implements BlockCache {
061  private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());
062
063  // Some memcache versions won't take more than 1024 * 1024. So set the limit below
064  // that just in case this client is used with those versions.
065  public static final int MAX_SIZE = 1020 * 1024;
066
067  // Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size
068  public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached
069                                                        // spec
070
071  // Config key for what memcached servers to use.
072  // They should be specified in a comma sperated list with ports.
073  // like:
074  //
075  // host1:11211,host3:8080,host4:11211
076  public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
077  public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
078  public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
079  public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
080  public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
081  public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
082  public static final int STAT_THREAD_PERIOD = 60 * 5;
083
084  private final MemcachedClient client;
085  private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
086  private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
087  private final AtomicLong cachedCount = new AtomicLong();
088  private final AtomicLong notCachedCount = new AtomicLong();
089  private final AtomicLong cacheErrorCount = new AtomicLong();
090  private final AtomicLong timeoutCount = new AtomicLong();
091
092  /** Statistics thread schedule pool (for heavy debugging, could remove) */
093  private transient final ScheduledExecutorService scheduleThreadPool =
094    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
095      .setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build());
096
097  public MemcachedBlockCache(Configuration c) throws IOException {
098    LOG.info("Creating MemcachedBlockCache");
099
100    long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
101    long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
102    boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
103
104    ConnectionFactoryBuilder builder =
105      new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
106        .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
107        .setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE);
108
109    // Assume only the localhost is serving memcached.
110    // A la mcrouter or co-locating memcached with split regionservers.
111    //
112    // If this config is a pool of memcached servers they will all be used according to the
113    // default hashing scheme defined by the memcached client. Spy Memecache client in this
114    // case.
115    String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
116    String[] servers = serverListString.split(",");
117    // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any
118    // resolved identities cannot have their address mappings changed while the MemcachedClient
119    // instance is alive. We won't get a chance to trigger re-resolution.
120    List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length);
121    for (String s : servers) {
122      serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
123    }
124
125    client = new MemcachedClient(builder.build(), serverAddresses);
126    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
127      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
128  }
129
130  @Override
131  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
132    cacheBlock(cacheKey, buf);
133  }
134
135  @Override
136  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
137    if (buf instanceof HFileBlock) {
138      if (buf.getSerializedLength() > MAX_SIZE) {
139        LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache",
140          buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE);
141        notCachedCount.incrementAndGet();
142        return;
143      }
144      client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> {
145        try {
146          f.get();
147          cachedCount.incrementAndGet();
148        } catch (Exception e) {
149          LOG.warn("Failed to cache block with key " + cacheKey, e);
150          cacheErrorCount.incrementAndGet();
151        }
152      });
153    } else {
154      LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey);
155      notCachedCount.incrementAndGet();
156    }
157  }
158
159  @Override
160  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
161    boolean updateCacheMetrics) {
162    // Assume that nothing is the block cache
163    HFileBlock result = null;
164    Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan();
165    try (Scope traceScope = span.makeCurrent()) {
166      result = client.get(cacheKey.toString(), tc);
167    } catch (Exception e) {
168      // Catch a pretty broad set of exceptions to limit any changes in the memcache client
169      // and how it handles failures from leaking into the read path.
170      if (
171        (e instanceof OperationTimeoutException) || ((e instanceof RuntimeException)
172          && (e.getCause() instanceof OperationTimeoutException))
173      ) {
174        timeoutCount.incrementAndGet();
175        if (LOG.isDebugEnabled()) {
176          LOG.debug("Timeout getting key " + cacheKey.toString(), e);
177        }
178      } else {
179        cacheErrorCount.incrementAndGet();
180        if (LOG.isDebugEnabled()) {
181          LOG.debug("Exception getting key " + cacheKey.toString(), e);
182        }
183      }
184      result = null;
185    } finally {
186      span.end();
187      if (updateCacheMetrics) {
188        if (result == null) {
189          cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
190        } else {
191          cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
192        }
193      }
194    }
195    return result;
196  }
197
198  @Override
199  public boolean evictBlock(BlockCacheKey cacheKey) {
200    try {
201      cacheStats.evict();
202      return client.delete(cacheKey.toString()).get();
203    } catch (InterruptedException e) {
204      LOG.warn("Error deleting " + cacheKey.toString(), e);
205      Thread.currentThread().interrupt();
206    } catch (ExecutionException e) {
207      if (LOG.isDebugEnabled()) {
208        LOG.debug("Error deleting " + cacheKey.toString(), e);
209      }
210    }
211    return false;
212  }
213
214  /**
215   * This method does nothing so that memcached can handle all evictions.
216   */
217  @Override
218  public int evictBlocksByHfileName(String hfileName) {
219    return 0;
220  }
221
222  @Override
223  public CacheStats getStats() {
224    return cacheStats;
225  }
226
227  @Override
228  public void shutdown() {
229    client.shutdown();
230    this.scheduleThreadPool.shutdown();
231    for (int i = 0; i < 10; i++) {
232      if (!this.scheduleThreadPool.isShutdown()) {
233        try {
234          Thread.sleep(10);
235        } catch (InterruptedException e) {
236          LOG.warn("Interrupted while sleeping");
237          Thread.currentThread().interrupt();
238          break;
239        }
240      }
241    }
242    if (!this.scheduleThreadPool.isShutdown()) {
243      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
244      LOG.debug("Still running " + runnables);
245    }
246  }
247
248  @Override
249  public long size() {
250    return 0;
251  }
252
253  @Override
254  public long getMaxSize() {
255    return 0;
256  }
257
258  @Override
259  public long getFreeSize() {
260    return 0;
261  }
262
263  @Override
264  public long getCurrentSize() {
265    return 0;
266  }
267
268  @Override
269  public long getCurrentDataSize() {
270    return 0;
271  }
272
273  @Override
274  public long getBlockCount() {
275    return 0;
276  }
277
278  @Override
279  public long getDataBlockCount() {
280    return 0;
281  }
282
283  @Override
284  public Iterator<CachedBlock> iterator() {
285    return new Iterator<CachedBlock>() {
286      @Override
287      public boolean hasNext() {
288        return false;
289      }
290
291      @Override
292      public CachedBlock next() {
293        throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
294      }
295
296      @Override
297      public void remove() {
298
299      }
300    };
301  }
302
303  @Override
304  public BlockCache[] getBlockCaches() {
305    return null;
306  }
307
308  /**
309   * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
310   */
311  private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
312
313    @Override
314    public boolean asyncDecode(CachedData d) {
315      return false;
316    }
317
318    @Override
319    public CachedData encode(HFileBlock block) {
320      ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
321      block.serialize(bb, true);
322      return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
323    }
324
325    @Override
326    public HFileBlock decode(CachedData d) {
327      try {
328        ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
329        return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP);
330      } catch (IOException e) {
331        LOG.warn("Failed to deserialize data from memcached", e);
332      }
333      return null;
334    }
335
336    @Override
337    public int getMaxSize() {
338      return MAX_SIZE;
339    }
340  }
341
342  private static class StatisticsThread extends Thread {
343
344    private final MemcachedBlockCache c;
345
346    public StatisticsThread(MemcachedBlockCache c) {
347      super("MemcachedBlockCacheStats");
348      setDaemon(true);
349      this.c = c;
350    }
351
352    @Override
353    public void run() {
354      c.logStats();
355    }
356
357  }
358
359  public void logStats() {
360    LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get()
361      + ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads="
362      + cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio="
363      + (cacheStats.getHitCount() == 0
364        ? "0"
365        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
366      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
367      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
368      + (cacheStats.getHitCachingCount() == 0
369        ? "0,"
370        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
371      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
372      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction());
373  }
374
375}