001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.mockito.ArgumentMatchers.any;
024import static org.mockito.ArgumentMatchers.anyInt;
025import static org.mockito.ArgumentMatchers.anyString;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.when;
028
029import java.io.IOException;
030import java.net.InetSocketAddress;
031import java.util.List;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.ForkJoinPool;
036import java.util.concurrent.ThreadLocalRandom;
037import net.spy.memcached.CachedData;
038import net.spy.memcached.ConnectionFactory;
039import net.spy.memcached.FailureMode;
040import net.spy.memcached.MemcachedClient;
041import net.spy.memcached.internal.OperationFuture;
042import net.spy.memcached.ops.Operation;
043import net.spy.memcached.ops.OperationState;
044import net.spy.memcached.ops.OperationStatus;
045import net.spy.memcached.transcoders.Transcoder;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.Waiter;
049import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
050import org.apache.hadoop.hbase.testclassification.IOTests;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.junit.jupiter.api.BeforeEach;
053import org.junit.jupiter.api.Tag;
054import org.junit.jupiter.api.Test;
055
056@Tag(IOTests.TAG)
057@Tag(SmallTests.TAG)
058public class TestMemcachedBlockCache {
059
060  private MemcachedBlockCache cache;
061
062  private ConcurrentMap<String, CachedData> backingMap;
063
064  @BeforeEach
065  public void setup() throws Exception {
066    int port = ThreadLocalRandom.current().nextInt(1024, 65536);
067    Configuration conf = new Configuration();
068    conf.set("hbase.cache.memcached.servers", "localhost:" + port);
069    backingMap = new ConcurrentHashMap<>();
070    cache = new MemcachedBlockCache(conf) {
071
072      private <T> OperationFuture<T> createFuture(String key, long opTimeout, T result) {
073        OperationFuture<T> future =
074          new OperationFuture<>(key, new CountDownLatch(0), opTimeout, ForkJoinPool.commonPool());
075        Operation op = mock(Operation.class);
076        when(op.getState()).thenReturn(OperationState.COMPLETE);
077        future.setOperation(op);
078        future.set(result, new OperationStatus(true, ""));
079
080        return future;
081      }
082
083      @Override
084      protected MemcachedClient createMemcachedClient(ConnectionFactory factory,
085        List<InetSocketAddress> serverAddresses) throws IOException {
086        assertEquals(FailureMode.Redistribute, factory.getFailureMode());
087        assertTrue(factory.isDaemon());
088        assertFalse(factory.useNagleAlgorithm());
089        assertEquals(MAX_SIZE, factory.getReadBufSize());
090        assertEquals(1, serverAddresses.size());
091        assertEquals("localhost", serverAddresses.get(0).getHostName());
092        assertEquals(port, serverAddresses.get(0).getPort());
093        MemcachedClient client = mock(MemcachedClient.class);
094        when(client.set(anyString(), anyInt(), any(), any())).then(inv -> {
095          String key = inv.getArgument(0);
096          HFileBlock block = inv.getArgument(2);
097          Transcoder<HFileBlock> tc = inv.getArgument(3);
098          CachedData cd = tc.encode(block);
099          backingMap.put(key, cd);
100          return createFuture(key, factory.getOperationTimeout(), true);
101        });
102        when(client.delete(anyString())).then(inv -> {
103          String key = inv.getArgument(0);
104          backingMap.remove(key);
105          return createFuture(key, factory.getOperationTimeout(), true);
106        });
107        when(client.get(anyString(), any())).then(inv -> {
108          String key = inv.getArgument(0);
109          Transcoder<HFileBlock> tc = inv.getArgument(1);
110          CachedData cd = backingMap.get(key);
111          return tc.decode(cd);
112        });
113        return client;
114      }
115    };
116  }
117
118  @Test
119  public void testCache() throws Exception {
120    final int numBlocks = 10;
121    HFileBlockPair[] blocks =
122      CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks);
123    for (int i = 0; i < numBlocks; i++) {
124      cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
125    }
126    Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks);
127    for (int i = 0; i < numBlocks; i++) {
128      HFileBlock actual = (HFileBlock) cache.getBlock(blocks[i].getBlockName(), false, false, true);
129      HFileBlock expected = blocks[i].getBlock();
130      assertEquals(expected.getBlockType(), actual.getBlockType());
131      assertEquals(expected.getSerializedLength(), actual.getSerializedLength());
132    }
133
134    CacheTestUtils.testConvertToJSON(cache);
135  }
136
137  @Test
138  public void testEviction() throws Exception {
139    final int numBlocks = 10;
140    HFileBlockPair[] blocks =
141      CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks);
142    for (int i = 0; i < numBlocks; i++) {
143      cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock());
144    }
145    Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks);
146    for (int i = 0; i < numBlocks; i++) {
147      cache.evictBlock(blocks[i].getBlockName());
148    }
149    Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == 0);
150  }
151}