001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; 021import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; 022import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicReference; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.RegionTooBusyException; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.regionserver.Store; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.junit.Assert; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048 049@Category(SmallTests.class) 050public class TestStoreHotnessProtector { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestStoreHotnessProtector.class); 055 056 @Test 057 public void testPreparePutCounter() throws Exception { 058 059 ExecutorService executorService = Executors.newFixedThreadPool(10); 060 061 Configuration conf = new Configuration(); 062 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); 063 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); 064 conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 065 Region mockRegion = mock(Region.class); 066 StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); 067 068 Store mockStore1 = mock(Store.class); 069 RegionInfo mockRegionInfo = mock(RegionInfo.class); 070 byte[] family = "testF1".getBytes(); 071 072 when(mockRegion.getStore(family)).thenReturn(mockStore1); 073 when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); 074 when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); 075 076 when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); 077 when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); 078 079 final Map<byte[], List<Cell>> familyMaps = new HashMap<>(); 080 familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); 081 082 final AtomicReference<Exception> exception = new AtomicReference<>(); 083 084 // PreparePutCounter not access limit 085 086 int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) 087 * conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 088 CountDownLatch countDownLatch = new CountDownLatch(threadCount); 089 090 for (int i = 0; i < threadCount; i++) { 091 executorService.execute(() -> { 092 try { 093 storeHotnessProtector.start(familyMaps); 094 } catch (RegionTooBusyException e) { 095 e.printStackTrace(); 096 exception.set(e); 097 } finally { 098 countDownLatch.countDown(); 099 } 100 }); 101 } 102 103 countDownLatch.await(60, TimeUnit.SECONDS); 104 // no exception 105 Assert.assertEquals(exception.get(), null); 106 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 107 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 108 threadCount); 109 110 // access limit 111 112 try { 113 storeHotnessProtector.start(familyMaps); 114 } catch (RegionTooBusyException e) { 115 e.printStackTrace(); 116 exception.set(e); 117 } 118 119 Assert.assertEquals(exception.get().getClass(), RegionTooBusyException.class); 120 121 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 122 // when access limit, counter will not changed. 123 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 124 threadCount + 1); 125 126 storeHotnessProtector.finish(familyMaps); 127 Assert.assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 128 threadCount); 129 } 130 131}