001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import io.opentelemetry.api.trace.Span;
021import io.opentelemetry.context.Scope;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.NoSuchElementException;
029import java.util.concurrent.ExecutionException;
030import net.spy.memcached.CachedData;
031import net.spy.memcached.ConnectionFactoryBuilder;
032import net.spy.memcached.FailureMode;
033import net.spy.memcached.MemcachedClient;
034import net.spy.memcached.transcoders.Transcoder;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.io.ByteBuffAllocator;
038import org.apache.hadoop.hbase.nio.ByteBuff;
039import org.apache.hadoop.hbase.nio.SingleByteBuff;
040import org.apache.hadoop.hbase.trace.TraceUtil;
041import org.apache.hadoop.hbase.util.Addressing;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Class to store blocks into memcached. This should only be used on a cluster of Memcached daemons
048 * that are tuned well and have a good network connection to the HBase regionservers. Any other use
049 * will likely slow down HBase greatly.
050 */
051@InterfaceAudience.Private
052public class MemcachedBlockCache implements BlockCache {
053  private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());
054
055  // Some memcache versions won't take more than 1024 * 1024. So set the limit below
056  // that just in case this client is used with those versions.
057  public static final int MAX_SIZE = 1020 * 1024;
058
059  // Config key for what memcached servers to use.
060  // They should be specified in a comma sperated list with ports.
061  // like:
062  //
063  // host1:11211,host3:8080,host4:11211
064  public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
065  public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
066  public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
067  public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
068  public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
069  public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
070
071  private final MemcachedClient client;
072  private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
073  private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
074
075  public MemcachedBlockCache(Configuration c) throws IOException {
076    LOG.info("Creating MemcachedBlockCache");
077
078    long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
079    long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
080    boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
081
082    ConnectionFactoryBuilder builder =
083      // Cap the max time before anything times out
084      new ConnectionFactoryBuilder().setOpTimeout(opTimeout).setOpQueueMaxBlockTime(queueTimeout)
085        // Don't keep threads around past the end of days.
086        .setFailureMode(FailureMode.Redistribute).setShouldOptimize(optimize).setDaemon(true)
087        .setUseNagleAlgorithm(false) // Ain't nobody got time for that
088        .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
089
090    // Assume only the localhost is serving memecached.
091    // A la mcrouter or co-locating memcached with split regionservers.
092    //
093    // If this config is a pool of memecached servers they will all be used according to the
094    // default hashing scheme defined by the memcache client. Spy Memecache client in this
095    // case.
096    String serverListString = c.get(MEMCACHED_CONFIG_KEY, "localhost:11211");
097    String[] servers = serverListString.split(",");
098    // MemcachedClient requires InetSocketAddresses, we have to create them now. Implies any
099    // resolved identities cannot have their address mappings changed while the MemcachedClient
100    // instance is alive. We won't get a chance to trigger re-resolution.
101    List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length);
102    for (String s : servers) {
103      serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
104    }
105
106    client = new MemcachedClient(builder.build(), serverAddresses);
107  }
108
109  @Override
110  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
111    cacheBlock(cacheKey, buf);
112  }
113
114  @SuppressWarnings("FutureReturnValueIgnored")
115  @Override
116  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
117    if (buf instanceof HFileBlock) {
118      client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc).addListener(f -> {
119        try {
120          f.get();
121        } catch (ExecutionException e) {
122          LOG.warn("Failed to cache block", e);
123        }
124      });
125    } else {
126      if (LOG.isDebugEnabled()) {
127        LOG.debug(
128          "MemcachedBlockCache can not cache Cacheable's of type " + buf.getClass().toString());
129      }
130    }
131  }
132
133  @Override
134  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
135    boolean updateCacheMetrics) {
136    // Assume that nothing is the block cache
137    HFileBlock result = null;
138    Span span = TraceUtil.getGlobalTracer().spanBuilder("MemcachedBlockCache.getBlock").startSpan();
139    try (Scope traceScope = span.makeCurrent()) {
140      result = client.get(cacheKey.toString(), tc);
141    } catch (Exception e) {
142      // Catch a pretty broad set of exceptions to limit any changes in the memecache client
143      // and how it handles failures from leaking into the read path.
144      if (LOG.isDebugEnabled()) {
145        LOG.debug(
146          "Exception pulling from memcached [ " + cacheKey.toString() + " ]. Treating as a miss.",
147          e);
148      }
149      result = null;
150    } finally {
151      span.end();
152      // Update stats if this request doesn't have it turned off 100% of the time
153      if (updateCacheMetrics) {
154        if (result == null) {
155          cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
156        } else {
157          cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
158        }
159      }
160    }
161
162    return result;
163  }
164
165  @Override
166  public boolean evictBlock(BlockCacheKey cacheKey) {
167    try {
168      cacheStats.evict();
169      return client.delete(cacheKey.toString()).get();
170    } catch (InterruptedException e) {
171      LOG.warn("Error deleting " + cacheKey.toString(), e);
172      Thread.currentThread().interrupt();
173    } catch (ExecutionException e) {
174      if (LOG.isDebugEnabled()) {
175        LOG.debug("Error deleting " + cacheKey.toString(), e);
176      }
177    }
178    return false;
179  }
180
181  /**
182   * This method does nothing so that memcached can handle all evictions.
183   */
184  @Override
185  public int evictBlocksByHfileName(String hfileName) {
186    return 0;
187  }
188
189  @Override
190  public CacheStats getStats() {
191    return cacheStats;
192  }
193
194  @Override
195  public void shutdown() {
196    client.shutdown();
197  }
198
199  @Override
200  public long size() {
201    return 0;
202  }
203
204  @Override
205  public long getMaxSize() {
206    return 0;
207  }
208
209  @Override
210  public long getFreeSize() {
211    return 0;
212  }
213
214  @Override
215  public long getCurrentSize() {
216    return 0;
217  }
218
219  @Override
220  public long getCurrentDataSize() {
221    return 0;
222  }
223
224  @Override
225  public long getBlockCount() {
226    return 0;
227  }
228
229  @Override
230  public long getDataBlockCount() {
231    return 0;
232  }
233
234  @Override
235  public Iterator<CachedBlock> iterator() {
236    return new Iterator<CachedBlock>() {
237      @Override
238      public boolean hasNext() {
239        return false;
240      }
241
242      @Override
243      public CachedBlock next() {
244        throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
245      }
246
247      @Override
248      public void remove() {
249
250      }
251    };
252  }
253
254  @Override
255  public BlockCache[] getBlockCaches() {
256    return null;
257  }
258
259  /**
260   * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
261   */
262  private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
263
264    @Override
265    public boolean asyncDecode(CachedData d) {
266      return false;
267    }
268
269    @Override
270    public CachedData encode(HFileBlock block) {
271      ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
272      block.serialize(bb, true);
273      return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
274    }
275
276    @Override
277    public HFileBlock decode(CachedData d) {
278      try {
279        ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
280        return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, ByteBuffAllocator.HEAP);
281      } catch (IOException e) {
282        LOG.warn("Failed to deserialize data from memcached", e);
283      }
284      return null;
285    }
286
287    @Override
288    public int getMaxSize() {
289      return MAX_SIZE;
290    }
291  }
292}