001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.NoSuchElementException;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ScheduledExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034import net.spy.memcached.CachedData;
035import net.spy.memcached.ConnectionFactory;
036import net.spy.memcached.ConnectionFactoryBuilder;
037import net.spy.memcached.FailureMode;
038import net.spy.memcached.MemcachedClient;
039import net.spy.memcached.OperationTimeoutException;
040import net.spy.memcached.transcoders.Transcoder;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.hbase.io.ByteBuffAllocator;
043import org.apache.hadoop.hbase.nio.ByteBuff;
044import org.apache.hadoop.hbase.nio.SingleByteBuff;
045import org.apache.hadoop.hbase.trace.TraceUtil;
046import org.apache.hadoop.hbase.util.Addressing;
047import org.apache.hadoop.util.StringUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
053
054/**
055 * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
056 * that are tuned well and have a good network connection to the HBase regionservers. Any other use
057 * will likely slow down HBase greatly.
058 */
059@InterfaceAudience.Private
060@SuppressWarnings("FutureReturnValueIgnored")
061public class MemcachedBlockCache implements BlockCache {
062  private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());
063
064  // Some memcache versions won't take more than 1024 * 1024. So set the limit below
065  // that just in case this client is used with those versions.
066  public static final int MAX_SIZE = 1020 * 1024;
067
068  // Start memcached with -I <MAX_SIZE> to ensure it has the ability to store blocks of this size
069  public static final int MAX_TIME = 60 * 60 * 24 * 30; // 30 days, max allowed per the memcached
070                                                        // spec
071
072  // Config key for what memcached servers to use.
073  // They should be specified in a comma sperated list with ports.
074  // like:
075  //
076  // host1:11211,host3:8080,host4:11211
077  public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
078  public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
079  public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
080  public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
081  public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
082  public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
083  public static final int STAT_THREAD_PERIOD = 60 * 5;
084
085  private final MemcachedClient client;
086  private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
087  private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
088  private final AtomicLong cachedCount = new AtomicLong();
089  private final AtomicLong notCachedCount = new AtomicLong();
090  private final AtomicLong cacheErrorCount = new AtomicLong();
091  private final AtomicLong timeoutCount = new AtomicLong();
092
093  /** Statistics thread schedule pool (for heavy debugging, could remove) */
094  private transient final ScheduledExecutorService scheduleThreadPool =
095    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
096      .setNameFormat("MemcachedBlockCacheStatsExecutor").setDaemon(true).build());
097
098  public MemcachedBlockCache(Configuration c) throws IOException {
099    LOG.info("Creating MemcachedBlockCache");
100
101    long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
102    long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
103    boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
104
105    ConnectionFactoryBuilder builder =
106      new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
107        .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
108        .setUseNagleAlgorithm(false).setReadBufferSize(MAX_SIZE);
109
110    // Assume only the localhost is serving memcached.
111    // A la mcrouter or co-locating memcached with split regionservers.
112    //
113    // If this config is a pool of memcached servers they will all be used according to the
114    // default hashing scheme defined by the memcached client. Spy Memecache client in this
115    // case.
116    String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
117    String[] servers = serverListString.split(",");
118    // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any
119    // resolved identities cannot have their address mappings changed while the MemcachedClient
120    // instance is alive. We won't get a chance to trigger re-resolution.
121    List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length);
122    for (String s : servers) {
123      serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
124    }
125
126    client = createMemcachedClient(builder.build(), serverAddresses);
127    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
128      STAT_THREAD_PERIOD, TimeUnit.SECONDS);
129  }
130
131  protected MemcachedClient createMemcachedClient(ConnectionFactory factory,
132    List<InetSocketAddress> serverAddresses) throws IOException {
133    return new MemcachedClient(factory, serverAddresses);
134  }
135
136  @Override
137  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
138    cacheBlock(cacheKey, buf);
139  }
140
141  @Override
142  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
143    if (buf instanceof HFileBlock) {
144      if (buf.getSerializedLength() > MAX_SIZE) {
145        LOG.debug("Block of type {} with key {} is too large, size={}, max={}, will not cache",
146          buf.getClass(), cacheKey, buf.getSerializedLength(), MAX_SIZE);
147        notCachedCount.incrementAndGet();
148        return;
149      }
150      client.set(cacheKey.toString(), MAX_TIME, (HFileBlock) buf, tc).addListener(f -> {
151        try {
152          f.get();
153          cachedCount.incrementAndGet();
154        } catch (Exception e) {
155          LOG.warn("Failed to cache block with key " + cacheKey, e);
156          cacheErrorCount.incrementAndGet();
157        }
158      });
159    } else {
160      LOG.debug("Can not cache Cacheables of type {} with key {}", buf.getClass(), cacheKey);
161      notCachedCount.incrementAndGet();
162    }
163  }
164
165  @Override
166  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
167    boolean updateCacheMetrics) {
168    // Assume that nothing is the block cache
169    HFileBlock result = null;
170    Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan();
171    try (Scope traceScope = span.makeCurrent()) {
172      result = client.get(cacheKey.toString(), tc);
173    } catch (Exception e) {
174      // Catch a pretty broad set of exceptions to limit any changes in the memcache client
175      // and how it handles failures from leaking into the read path.
176      if (
177        (e instanceof OperationTimeoutException) || ((e instanceof RuntimeException)
178          && (e.getCause() instanceof OperationTimeoutException))
179      ) {
180        timeoutCount.incrementAndGet();
181        if (LOG.isDebugEnabled()) {
182          LOG.debug("Timeout getting key " + cacheKey.toString(), e);
183        }
184      } else {
185        cacheErrorCount.incrementAndGet();
186        if (LOG.isDebugEnabled()) {
187          LOG.debug("Exception getting key " + cacheKey.toString(), e);
188        }
189      }
190      result = null;
191    } finally {
192      span.end();
193      if (updateCacheMetrics) {
194        if (result == null) {
195          cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
196        } else {
197          cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
198        }
199      }
200    }
201    return result;
202  }
203
204  @Override
205  public boolean evictBlock(BlockCacheKey cacheKey) {
206    try {
207      cacheStats.evict();
208      return client.delete(cacheKey.toString()).get();
209    } catch (InterruptedException e) {
210      LOG.warn("Error deleting " + cacheKey.toString(), e);
211      Thread.currentThread().interrupt();
212    } catch (ExecutionException e) {
213      if (LOG.isDebugEnabled()) {
214        LOG.debug("Error deleting " + cacheKey.toString(), e);
215      }
216    }
217    return false;
218  }
219
220  /**
221   * This method does nothing so that memcached can handle all evictions.
222   */
223  @Override
224  public int evictBlocksByHfileName(String hfileName) {
225    return 0;
226  }
227
228  @Override
229  public CacheStats getStats() {
230    return cacheStats;
231  }
232
233  @Override
234  public void shutdown() {
235    client.shutdown();
236    this.scheduleThreadPool.shutdown();
237    for (int i = 0; i < 10; i++) {
238      if (!this.scheduleThreadPool.isShutdown()) {
239        try {
240          Thread.sleep(10);
241        } catch (InterruptedException e) {
242          LOG.warn("Interrupted while sleeping");
243          Thread.currentThread().interrupt();
244          break;
245        }
246      }
247    }
248    if (!this.scheduleThreadPool.isShutdown()) {
249      List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
250      LOG.debug("Still running " + runnables);
251    }
252  }
253
254  @Override
255  public long size() {
256    return 0;
257  }
258
259  @Override
260  public long getMaxSize() {
261    return 0;
262  }
263
264  @Override
265  public long getFreeSize() {
266    return 0;
267  }
268
269  @Override
270  public long getCurrentSize() {
271    return 0;
272  }
273
274  @Override
275  public long getCurrentDataSize() {
276    return 0;
277  }
278
279  @Override
280  public long getBlockCount() {
281    return 0;
282  }
283
284  @Override
285  public long getDataBlockCount() {
286    return 0;
287  }
288
289  @Override
290  public Iterator<CachedBlock> iterator() {
291    return new Iterator<CachedBlock>() {
292      @Override
293      public boolean hasNext() {
294        return false;
295      }
296
297      @Override
298      public CachedBlock next() {
299        throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
300      }
301
302      @Override
303      public void remove() {
304
305      }
306    };
307  }
308
309  @Override
310  public BlockCache[] getBlockCaches() {
311    return null;
312  }
313
314  /**
315   * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
316   */
317  private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
318
319    @Override
320    public boolean asyncDecode(CachedData d) {
321      return false;
322    }
323
324    @Override
325    public CachedData encode(HFileBlock block) {
326      ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
327      block.serialize(bb, true);
328      return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
329    }
330
331    @Override
332    public HFileBlock decode(CachedData d) {
333      try {
334        ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
335        return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP);
336      } catch (IOException e) {
337        LOG.warn("Failed to deserialize data from memcached", e);
338      }
339      return null;
340    }
341
342    @Override
343    public int getMaxSize() {
344      return MAX_SIZE;
345    }
346  }
347
348  private static class StatisticsThread extends Thread {
349
350    private final MemcachedBlockCache c;
351
352    public StatisticsThread(MemcachedBlockCache c) {
353      super("MemcachedBlockCacheStats");
354      setDaemon(true);
355      this.c = c;
356    }
357
358    @Override
359    public void run() {
360      c.logStats();
361    }
362
363  }
364
365  public void logStats() {
366    LOG.info("cached=" + cachedCount.get() + ", notCached=" + notCachedCount.get()
367      + ", cacheErrors=" + cacheErrorCount.get() + ", timeouts=" + timeoutCount.get() + ", reads="
368      + cacheStats.getRequestCount() + ", " + "hits=" + cacheStats.getHitCount() + ", hitRatio="
369      + (cacheStats.getHitCount() == 0
370        ? "0"
371        : (StringUtils.formatPercent(cacheStats.getHitRatio(), 2) + ", "))
372      + "cachingAccesses=" + cacheStats.getRequestCachingCount() + ", " + "cachingHits="
373      + cacheStats.getHitCachingCount() + ", " + "cachingHitsRatio="
374      + (cacheStats.getHitCachingCount() == 0
375        ? "0,"
376        : (StringUtils.formatPercent(cacheStats.getHitCachingRatio(), 2) + ", "))
377      + "evictions=" + cacheStats.getEvictionCount() + ", " + "evicted="
378      + cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction());
379  }
380
381}