001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.mockito.ArgumentMatchers.any; 024import static org.mockito.ArgumentMatchers.anyInt; 025import static org.mockito.ArgumentMatchers.anyString; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.when; 028 029import java.io.IOException; 030import java.net.InetSocketAddress; 031import java.util.List; 032import java.util.concurrent.ConcurrentHashMap; 033import java.util.concurrent.ConcurrentMap; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.ForkJoinPool; 036import java.util.concurrent.ThreadLocalRandom; 037import net.spy.memcached.CachedData; 038import net.spy.memcached.ConnectionFactory; 039import net.spy.memcached.FailureMode; 040import net.spy.memcached.MemcachedClient; 041import net.spy.memcached.internal.OperationFuture; 042import net.spy.memcached.ops.Operation; 043import net.spy.memcached.ops.OperationState; 044import net.spy.memcached.ops.OperationStatus; 045import net.spy.memcached.transcoders.Transcoder; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.Waiter; 049import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; 050import org.apache.hadoop.hbase.testclassification.IOTests; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.junit.jupiter.api.BeforeEach; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055 056@Tag(IOTests.TAG) 057@Tag(SmallTests.TAG) 058public class TestMemcachedBlockCache { 059 060 private MemcachedBlockCache cache; 061 062 private ConcurrentMap<String, CachedData> backingMap; 063 064 @BeforeEach 065 public void setup() throws Exception { 066 int port = ThreadLocalRandom.current().nextInt(1024, 65536); 067 Configuration conf = new Configuration(); 068 conf.set("hbase.cache.memcached.servers", "localhost:" + port); 069 backingMap = new ConcurrentHashMap<>(); 070 cache = new MemcachedBlockCache(conf) { 071 072 private <T> OperationFuture<T> createFuture(String key, long opTimeout, T result) { 073 OperationFuture<T> future = 074 new OperationFuture<>(key, new CountDownLatch(0), opTimeout, ForkJoinPool.commonPool()); 075 Operation op = mock(Operation.class); 076 when(op.getState()).thenReturn(OperationState.COMPLETE); 077 future.setOperation(op); 078 future.set(result, new OperationStatus(true, "")); 079 080 return future; 081 } 082 083 @Override 084 protected MemcachedClient createMemcachedClient(ConnectionFactory factory, 085 List<InetSocketAddress> serverAddresses) throws IOException { 086 assertEquals(FailureMode.Redistribute, factory.getFailureMode()); 087 assertTrue(factory.isDaemon()); 088 assertFalse(factory.useNagleAlgorithm()); 089 assertEquals(MAX_SIZE, factory.getReadBufSize()); 090 assertEquals(1, serverAddresses.size()); 091 assertEquals("localhost", serverAddresses.get(0).getHostName()); 092 assertEquals(port, serverAddresses.get(0).getPort()); 093 MemcachedClient client = mock(MemcachedClient.class); 094 when(client.set(anyString(), anyInt(), any(), any())).then(inv -> { 095 String key = inv.getArgument(0); 096 HFileBlock block = inv.getArgument(2); 097 Transcoder<HFileBlock> tc = inv.getArgument(3); 098 CachedData cd = tc.encode(block); 099 backingMap.put(key, cd); 100 return createFuture(key, factory.getOperationTimeout(), true); 101 }); 102 when(client.delete(anyString())).then(inv -> { 103 String key = inv.getArgument(0); 104 backingMap.remove(key); 105 return createFuture(key, factory.getOperationTimeout(), true); 106 }); 107 when(client.get(anyString(), any())).then(inv -> { 108 String key = inv.getArgument(0); 109 Transcoder<HFileBlock> tc = inv.getArgument(1); 110 CachedData cd = backingMap.get(key); 111 return tc.decode(cd); 112 }); 113 return client; 114 } 115 }; 116 } 117 118 @Test 119 public void testCache() throws Exception { 120 final int numBlocks = 10; 121 HFileBlockPair[] blocks = 122 CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks); 123 for (int i = 0; i < numBlocks; i++) { 124 cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); 125 } 126 Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks); 127 for (int i = 0; i < numBlocks; i++) { 128 HFileBlock actual = (HFileBlock) cache.getBlock(blocks[i].getBlockName(), false, false, true); 129 HFileBlock expected = blocks[i].getBlock(); 130 assertEquals(expected.getBlockType(), actual.getBlockType()); 131 assertEquals(expected.getSerializedLength(), actual.getSerializedLength()); 132 } 133 134 CacheTestUtils.testConvertToJSON(cache); 135 } 136 137 @Test 138 public void testEviction() throws Exception { 139 final int numBlocks = 10; 140 HFileBlockPair[] blocks = 141 CacheTestUtils.generateHFileBlocks(HConstants.DEFAULT_BLOCKSIZE, numBlocks); 142 for (int i = 0; i < numBlocks; i++) { 143 cache.cacheBlock(blocks[i].getBlockName(), blocks[i].getBlock()); 144 } 145 Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == numBlocks); 146 for (int i = 0; i < numBlocks; i++) { 147 cache.evictBlock(blocks[i].getBlockName()); 148 } 149 Waiter.waitFor(new Configuration(), 10000, () -> backingMap.size() == 0); 150 } 151}