001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p> 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; 021import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; 022import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.RegionTooBusyException; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.regionserver.Store; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.junit.Assert; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049@Category(SmallTests.class) 050public class TestStoreHotnessProtector { 051 052 @ClassRule public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestStoreHotnessProtector.class); 054 055 @Test 056 public void testPreparePutCounter() throws Exception { 057 058 ExecutorService executorService = Executors.newFixedThreadPool(10); 059 060 Configuration conf = new Configuration(); 061 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); 062 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); 063 conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 064 Region mockRegion = mock(Region.class); 065 StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); 066 067 Store mockStore1 = mock(Store.class); 068 RegionInfo mockRegionInfo = mock(RegionInfo.class); 069 byte[] family = "testF1".getBytes(); 070 071 when(mockRegion.getStore(family)).thenReturn(mockStore1); 072 when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); 073 when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); 074 075 when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); 076 when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); 077 078 final Map<byte[], List<Cell>> familyMaps = new HashMap<>(); 079 familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); 080 081 final AtomicReference<Exception> exception = new AtomicReference<>(); 082 083 // PreparePutCounter not access limit 084 085 int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) * conf 086 .getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 087 CountDownLatch countDownLatch = new CountDownLatch(threadCount); 088 089 for (int i = 0; i < threadCount; i++) { 090 executorService.execute(() -> { 091 try { 092 storeHotnessProtector.start(familyMaps); 093 } catch (RegionTooBusyException e) { 094 e.printStackTrace(); 095 exception.set(e); 096 } finally { 097 countDownLatch.countDown(); 098 } 099 }); 100 } 101 102 countDownLatch.await(60, TimeUnit.SECONDS); 103 //no exception 104 Assert.assertEquals(exception.get(), null); 105 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 106 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 107 threadCount); 108 109 // access limit 110 111 try { 112 storeHotnessProtector.start(familyMaps); 113 } catch (RegionTooBusyException e) { 114 e.printStackTrace(); 115 exception.set(e); 116 } 117 118 Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); 119 120 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 121 // when access limit, counter will not changed. 122 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 123 threadCount + 1); 124 125 storeHotnessProtector.finish(familyMaps); 126 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 127 threadCount); 128 } 129 130}