001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019
020package org.apache.hadoop.hbase.io.hfile;
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.nio.ByteBuffer;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.NoSuchElementException;
029import java.util.concurrent.ExecutionException;
030
031import net.spy.memcached.CachedData;
032import net.spy.memcached.ConnectionFactoryBuilder;
033import net.spy.memcached.FailureMode;
034import net.spy.memcached.MemcachedClient;
035import net.spy.memcached.transcoders.Transcoder;
036
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
040import org.apache.hadoop.hbase.nio.ByteBuff;
041import org.apache.hadoop.hbase.nio.SingleByteBuff;
042import org.apache.hadoop.hbase.trace.TraceUtil;
043import org.apache.hadoop.hbase.util.Addressing;
044import org.apache.htrace.core.TraceScope;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Class to store blocks into memcached.
051 * This should only be used on a cluster of Memcached daemons that are tuned well and have a
052 * good network connection to the HBase regionservers. Any other use will likely slow down HBase
053 * greatly.
054 */
055@InterfaceAudience.Private
056public class MemcachedBlockCache implements BlockCache {
057  private static final Logger LOG = LoggerFactory.getLogger(MemcachedBlockCache.class.getName());
058
059  // Some memcache versions won't take more than 1024 * 1024. So set the limit below
060  // that just in case this client is used with those versions.
061  public static final int MAX_SIZE = 1020 * 1024;
062
063  // Config key for what memcached servers to use.
064  // They should be specified in a comma sperated list with ports.
065  // like:
066  //
067  // host1:11211,host3:8080,host4:11211
068  public static final String MEMCACHED_CONFIG_KEY = "hbase.cache.memcached.servers";
069  public static final String MEMCACHED_TIMEOUT_KEY = "hbase.cache.memcached.timeout";
070  public static final String MEMCACHED_OPTIMEOUT_KEY = "hbase.cache.memcached.optimeout";
071  public static final String MEMCACHED_OPTIMIZE_KEY = "hbase.cache.memcached.spy.optimze";
072  public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
073  public static final boolean MEMCACHED_OPTIMIZE_DEFAULT = false;
074
075  private final MemcachedClient client;
076  private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
077  private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
078
079  public MemcachedBlockCache(Configuration c) throws IOException {
080    LOG.info("Creating MemcachedBlockCache");
081
082    long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, MEMCACHED_DEFAULT_TIMEOUT);
083    long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + MEMCACHED_DEFAULT_TIMEOUT);
084    boolean optimize = c.getBoolean(MEMCACHED_OPTIMIZE_KEY, MEMCACHED_OPTIMIZE_DEFAULT);
085
086    ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
087        .setOpTimeout(opTimeout)
088        .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before anything times out
089        .setFailureMode(FailureMode.Redistribute)
090        .setShouldOptimize(optimize)
091        .setDaemon(true)                      // Don't keep threads around past the end of days.
092        .setUseNagleAlgorithm(false)          // Ain't nobody got time for that
093        .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024); // Much larger just in case
094
095    // Assume only the localhost is serving memecached.
096    // A la mcrouter or co-locating memcached with split regionservers.
097    //
098    // If this config is a pool of memecached servers they will all be used according to the
099    // default hashing scheme defined by the memcache client. Spy Memecache client in this
100    // case.
101    String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
102    String[] servers = serverListString.split(",");
103    List<InetSocketAddress> serverAddresses = new ArrayList<>(servers.length);
104    for (String s:servers) {
105      serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
106    }
107
108    client = new MemcachedClient(builder.build(), serverAddresses);
109  }
110
111  @Override
112  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
113    cacheBlock(cacheKey, buf);
114  }
115
116  @Override
117  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
118    if (buf instanceof HFileBlock) {
119      client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
120    } else {
121      if (LOG.isDebugEnabled()) {
122        LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
123            + buf.getClass().toString());
124      }
125    }
126  }
127
128  @Override
129  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
130                            boolean repeat, boolean updateCacheMetrics) {
131    // Assume that nothing is the block cache
132    HFileBlock result = null;
133
134    try (TraceScope traceScope = TraceUtil.createTrace("MemcachedBlockCache.getBlock")) {
135      result = client.get(cacheKey.toString(), tc);
136    } catch (Exception e) {
137      // Catch a pretty broad set of exceptions to limit any changes in the memecache client
138      // and how it handles failures from leaking into the read path.
139      if (LOG.isDebugEnabled()) {
140        LOG.debug("Exception pulling from memcached [ "
141            + cacheKey.toString()
142            + " ]. Treating as a miss.", e);
143      }
144      result = null;
145    } finally {
146      // Update stats if this request doesn't have it turned off 100% of the time
147      if (updateCacheMetrics) {
148        if (result == null) {
149          cacheStats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
150        } else {
151          cacheStats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
152        }
153      }
154    }
155
156    return result;
157  }
158
159  @Override
160  public boolean evictBlock(BlockCacheKey cacheKey) {
161    try {
162      cacheStats.evict();
163      return client.delete(cacheKey.toString()).get();
164    } catch (InterruptedException e) {
165      LOG.warn("Error deleting " + cacheKey.toString(), e);
166      Thread.currentThread().interrupt();
167    } catch (ExecutionException e) {
168      if (LOG.isDebugEnabled()) {
169        LOG.debug("Error deleting " + cacheKey.toString(), e);
170      }
171    }
172    return false;
173  }
174
175  /**
176   * This method does nothing so that memcached can handle all evictions.
177   */
178  @Override
179  public int evictBlocksByHfileName(String hfileName) {
180    return 0;
181  }
182
183  @Override
184  public CacheStats getStats() {
185    return cacheStats;
186  }
187
188  @Override
189  public void shutdown() {
190    client.shutdown();
191  }
192
193  @Override
194  public long size() {
195    return 0;
196  }
197
198  @Override
199  public long getMaxSize() {
200    return 0;
201  }
202
203  @Override
204  public long getFreeSize() {
205    return 0;
206  }
207
208  @Override
209  public long getCurrentSize() {
210    return 0;
211  }
212
213  @Override
214  public long getCurrentDataSize() {
215    return 0;
216  }
217
218  @Override
219  public long getBlockCount() {
220    return 0;
221  }
222
223  @Override
224  public long getDataBlockCount() {
225    return 0;
226  }
227
228  @Override
229  public Iterator<CachedBlock> iterator() {
230    return new Iterator<CachedBlock>() {
231      @Override
232      public boolean hasNext() {
233        return false;
234      }
235
236      @Override
237      public CachedBlock next() {
238        throw new NoSuchElementException("MemcachedBlockCache can't iterate over blocks.");
239      }
240
241      @Override
242      public void remove() {
243
244      }
245    };
246  }
247
248  @Override
249  public BlockCache[] getBlockCaches() {
250    return null;
251  }
252
253  /**
254   * Class to encode and decode an HFileBlock to and from memecached's resulting byte arrays.
255   */
256  private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
257
258    @Override
259    public boolean asyncDecode(CachedData d) {
260      return false;
261    }
262
263    @Override
264    public CachedData encode(HFileBlock block) {
265      ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
266      block.serialize(bb, true);
267      return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
268    }
269
270    @Override
271    public HFileBlock decode(CachedData d) {
272      try {
273        ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(d.getData()));
274        return (HFileBlock) HFileBlock.BLOCK_DESERIALIZER.deserialize(buf, true,
275          MemoryType.EXCLUSIVE);
276      } catch (IOException e) {
277        LOG.warn("Error deserializing data from memcached",e);
278      }
279      return null;
280    }
281
282    @Override
283    public int getMaxSize() {
284      return MAX_SIZE;
285    }
286  }
287}