001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.throttle; 019 020import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PREPARE_PUT_STORE_MULTIPLIER; 021import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT; 022import static org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector.PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.util.HashMap; 028import java.util.List; 029import java.util.Map; 030import java.util.concurrent.CountDownLatch; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Executors; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicReference; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Cell; 037import org.apache.hadoop.hbase.RegionTooBusyException; 038import org.apache.hadoop.hbase.client.RegionInfo; 039import org.apache.hadoop.hbase.regionserver.Region; 040import org.apache.hadoop.hbase.regionserver.Store; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.jupiter.api.Tag; 044import org.junit.jupiter.api.Test; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 047 048@Tag(SmallTests.TAG) 049public class TestStoreHotnessProtector { 050 051 @Test 052 public void testPreparePutCounter() throws Exception { 053 054 ExecutorService executorService = Executors.newFixedThreadPool(10); 055 056 Configuration conf = new Configuration(); 057 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT_MIN_COLUMN_COUNT, 0); 058 conf.setInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10); 059 conf.setInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 060 Region mockRegion = mock(Region.class); 061 StoreHotnessProtector storeHotnessProtector = new StoreHotnessProtector(mockRegion, conf); 062 063 Store mockStore1 = mock(Store.class); 064 RegionInfo mockRegionInfo = mock(RegionInfo.class); 065 byte[] family = Bytes.toBytes("testF1"); 066 067 when(mockRegion.getStore(family)).thenReturn(mockStore1); 068 when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); 069 when(mockRegionInfo.getRegionNameAsString()).thenReturn("test_region_1"); 070 071 when(mockStore1.getCurrentParallelPutCount()).thenReturn(1); 072 when(mockStore1.getColumnFamilyName()).thenReturn("test_Family_1"); 073 074 final Map<byte[], List<Cell>> familyMaps = new HashMap<>(); 075 familyMaps.put(family, Lists.newArrayList(mock(Cell.class), mock(Cell.class))); 076 077 final AtomicReference<Exception> exception = new AtomicReference<>(); 078 079 // PreparePutCounter not access limit 080 081 int threadCount = conf.getInt(PARALLEL_PUT_STORE_THREADS_LIMIT, 10) 082 * conf.getInt(PARALLEL_PREPARE_PUT_STORE_MULTIPLIER, 3); 083 CountDownLatch countDownLatch = new CountDownLatch(threadCount); 084 085 for (int i = 0; i < threadCount; i++) { 086 executorService.execute(() -> { 087 try { 088 storeHotnessProtector.start(familyMaps); 089 } catch (RegionTooBusyException e) { 090 e.printStackTrace(); 091 exception.set(e); 092 } finally { 093 countDownLatch.countDown(); 094 } 095 }); 096 } 097 098 countDownLatch.await(60, TimeUnit.SECONDS); 099 // no exception 100 assertEquals(exception.get(), null); 101 assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 102 assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), threadCount); 103 104 // access limit 105 106 try { 107 storeHotnessProtector.start(familyMaps); 108 } catch (RegionTooBusyException e) { 109 e.printStackTrace(); 110 exception.set(e); 111 } 112 113 assertEquals(exception.get().getClass(), RegionTooBusyException.class); 114 115 assertEquals(storeHotnessProtector.getPreparePutToStoreMap().size(), 1); 116 // when access limit, counter will not changed. 117 assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), 118 threadCount + 1); 119 120 storeHotnessProtector.finish(familyMaps); 121 assertEquals(storeHotnessProtector.getPreparePutToStoreMap().get(family).get(), threadCount); 122 } 123 124}