001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.region; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.mockito.ArgumentMatchers.anyBoolean; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.util.Collections; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.concurrent.atomic.AtomicLong; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.Waiter; 037import org.apache.hadoop.hbase.client.RegionInfoBuilder; 038import org.apache.hadoop.hbase.regionserver.HRegion; 039import org.apache.hadoop.hbase.regionserver.HStore; 040import org.apache.hadoop.hbase.testclassification.MasterTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.junit.jupiter.api.AfterEach; 043import org.junit.jupiter.api.BeforeEach; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046 047@Tag(MasterTests.TAG) 048@Tag(MediumTests.TAG) 049public class TestMasterRegionFlush { 050 051 private Configuration conf; 052 053 private HRegion region; 054 055 private MasterRegionFlusherAndCompactor flusher; 056 057 private AtomicInteger flushCalled; 058 059 private AtomicLong memstoreHeapSize; 060 061 private AtomicLong memstoreOffHeapSize; 062 063 @BeforeEach 064 public void setUp() throws IOException { 065 conf = HBaseConfiguration.create(); 066 region = mock(HRegion.class); 067 HStore store = mock(HStore.class); 068 when(store.getStorefilesCount()).thenReturn(1); 069 when(region.getStores()).thenReturn(Collections.singletonList(store)); 070 when(region.getRegionInfo()) 071 .thenReturn(RegionInfoBuilder.newBuilder(TableName.valueOf("hbase:local")).build()); 072 flushCalled = new AtomicInteger(0); 073 memstoreHeapSize = new AtomicLong(0); 074 memstoreOffHeapSize = new AtomicLong(0); 075 when(region.getMemStoreHeapSize()).thenAnswer(invocation -> memstoreHeapSize.get()); 076 when(region.getMemStoreOffHeapSize()).thenAnswer(invocation -> memstoreOffHeapSize.get()); 077 when(region.flush(anyBoolean())).thenAnswer(invocation -> { 078 assertTrue(invocation.getArgument(0, Boolean.class)); 079 memstoreHeapSize.set(0); 080 memstoreOffHeapSize.set(0); 081 flushCalled.incrementAndGet(); 082 // Return a mock FlushResult since FlushResultImpl constructor is package-private 083 HRegion.FlushResult mockResult = mock(HRegion.FlushResult.class); 084 when(mockResult.getResult()) 085 .thenReturn(HRegion.FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED); 086 when(mockResult.isFlushSucceeded()).thenReturn(true); 087 when(mockResult.isCompactionNeeded()).thenReturn(false); 088 return mockResult; 089 }); 090 } 091 092 @AfterEach 093 public void tearDown() { 094 if (flusher != null) { 095 flusher.close(); 096 flusher = null; 097 } 098 } 099 100 private void initFlusher(long flushSize, long flushPerChanges, long flushIntervalMs) { 101 flusher = new MasterRegionFlusherAndCompactor(conf, new Abortable() { 102 103 @Override 104 public boolean isAborted() { 105 return false; 106 } 107 108 @Override 109 public void abort(String why, Throwable e) { 110 } 111 }, region, flushSize, flushPerChanges, flushIntervalMs, 4, new Path("/tmp"), ""); 112 } 113 114 @Test 115 public void testTriggerFlushBySize() throws IOException, InterruptedException { 116 initFlusher(1024 * 1024, 1_000_000, TimeUnit.MINUTES.toMillis(15)); 117 memstoreHeapSize.set(1000 * 1024); 118 flusher.onUpdate(); 119 Thread.sleep(1000); 120 assertEquals(0, flushCalled.get()); 121 memstoreOffHeapSize.set(1000 * 1024); 122 flusher.onUpdate(); 123 Waiter.waitFor(conf, 2000, () -> flushCalled.get() == 1); 124 } 125 126 private void assertTriggerFlushByChanges(int changes) throws InterruptedException { 127 int currentFlushCalled = flushCalled.get(); 128 for (int i = 0; i < changes; i++) { 129 flusher.onUpdate(); 130 } 131 Thread.sleep(1000); 132 assertEquals(currentFlushCalled, flushCalled.get()); 133 flusher.onUpdate(); 134 Waiter.waitFor(conf, 5000, () -> flushCalled.get() == currentFlushCalled + 1); 135 } 136 137 @Test 138 public void testTriggerFlushByChanges() throws InterruptedException { 139 initFlusher(128 * 1024 * 1024, 10, TimeUnit.MINUTES.toMillis(15)); 140 assertTriggerFlushByChanges(10); 141 assertTriggerFlushByChanges(10); 142 } 143 144 @Test 145 public void testPeriodicalFlush() throws InterruptedException { 146 initFlusher(128 * 1024 * 1024, 1_000_000, TimeUnit.SECONDS.toMillis(1)); 147 assertEquals(0, flushCalled.get()); 148 Thread.sleep(1500); 149 assertEquals(1, flushCalled.get()); 150 Thread.sleep(1000); 151 assertEquals(2, flushCalled.get()); 152 } 153}