001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertSame; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.util.Optional; 029import java.util.concurrent.CountDownLatch; 030import org.apache.hadoop.hbase.Cell.Type; 031import org.apache.hadoop.hbase.CellBuilderFactory; 032import org.apache.hadoop.hbase.CellBuilderType; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 043import org.apache.hadoop.hbase.coprocessor.RegionObserver; 044import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.AfterEach; 049import org.junit.jupiter.api.BeforeAll; 050import org.junit.jupiter.api.BeforeEach; 051import org.junit.jupiter.api.Tag; 052import org.junit.jupiter.api.Test; 053 054/** 055 * Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code. 056 */ 057@Tag(CoprocessorTests.TAG) 058@Tag(MediumTests.TAG) 059public class TestFlushLifeCycleTracker { 060 061 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 062 063 private static final TableName NAME = 064 TableName.valueOf(TestFlushLifeCycleTracker.class.getSimpleName()); 065 066 private static final byte[] CF = Bytes.toBytes("CF"); 067 068 private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); 069 070 private HRegion region; 071 072 private static FlushLifeCycleTracker TRACKER; 073 074 private static volatile CountDownLatch ARRIVE; 075 076 private static volatile CountDownLatch BLOCK; 077 078 public static final class FlushObserver implements RegionObserver, RegionCoprocessor { 079 080 @Override 081 public Optional<RegionObserver> getRegionObserver() { 082 return Optional.of(this); 083 } 084 085 @Override 086 public void preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 087 FlushLifeCycleTracker tracker) throws IOException { 088 if (TRACKER != null) { 089 assertSame(tracker, TRACKER); 090 } 091 } 092 093 @Override 094 public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 095 Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { 096 if (TRACKER != null) { 097 assertSame(tracker, TRACKER); 098 } 099 return scanner; 100 } 101 102 @Override 103 public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, 104 FlushLifeCycleTracker tracker) throws IOException { 105 if (TRACKER != null) { 106 assertSame(tracker, TRACKER); 107 } 108 } 109 110 @Override 111 public void postFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c, Store store, 112 StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException { 113 if (TRACKER != null) { 114 assertSame(tracker, TRACKER); 115 } 116 // inject here so we can make a flush request to fail because of we already have a flush 117 // ongoing. 118 CountDownLatch arrive = ARRIVE; 119 if (arrive != null) { 120 arrive.countDown(); 121 try { 122 BLOCK.await(); 123 } catch (InterruptedException e) { 124 throw new InterruptedIOException(); 125 } 126 } 127 } 128 } 129 130 private static final class Tracker implements FlushLifeCycleTracker { 131 132 private String reason; 133 134 private boolean beforeExecutionCalled; 135 136 private boolean afterExecutionCalled; 137 138 private boolean completed = false; 139 140 @Override 141 public synchronized void notExecuted(String reason) { 142 this.reason = reason; 143 completed = true; 144 notifyAll(); 145 } 146 147 @Override 148 public void beforeExecution() { 149 this.beforeExecutionCalled = true; 150 } 151 152 @Override 153 public synchronized void afterExecution() { 154 this.afterExecutionCalled = true; 155 completed = true; 156 notifyAll(); 157 } 158 159 public synchronized void await() throws InterruptedException { 160 while (!completed) { 161 wait(); 162 } 163 } 164 } 165 166 @BeforeAll 167 public static void setUpBeforeClass() throws Exception { 168 UTIL.startMiniCluster(3); 169 } 170 171 @AfterAll 172 public static void tearDownAfterClass() throws Exception { 173 UTIL.shutdownMiniCluster(); 174 } 175 176 @BeforeEach 177 public void setUp() throws IOException { 178 UTIL.getAdmin() 179 .createTable(TableDescriptorBuilder.newBuilder(NAME) 180 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)) 181 .setCoprocessor(FlushObserver.class.getName()).build()); 182 region = UTIL.getHBaseCluster().getRegions(NAME).get(0); 183 } 184 185 @AfterEach 186 public void tearDown() throws IOException { 187 region = null; 188 TRACKER = null; 189 UTIL.deleteTable(NAME); 190 } 191 192 @Test 193 public void test() throws IOException, InterruptedException { 194 try (Table table = UTIL.getConnection().getTable(NAME)) { 195 for (int i = 0; i < 100; i++) { 196 byte[] row = Bytes.toBytes(i); 197 table.put( 198 new Put(row, true).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 199 .setFamily(CF).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 200 .setType(Type.Put).setValue(Bytes.toBytes(i)).build())); 201 } 202 } 203 Tracker tracker = new Tracker(); 204 TRACKER = tracker; 205 region.requestFlush(tracker); 206 tracker.await(); 207 assertNull(tracker.reason); 208 assertTrue(tracker.beforeExecutionCalled); 209 assertTrue(tracker.afterExecutionCalled); 210 211 // request flush on a region with empty memstore should still success 212 tracker = new Tracker(); 213 TRACKER = tracker; 214 region.requestFlush(tracker); 215 tracker.await(); 216 assertNull(tracker.reason); 217 assertTrue(tracker.beforeExecutionCalled); 218 assertTrue(tracker.afterExecutionCalled); 219 } 220 221 @Test 222 public void testNotExecuted() throws IOException, InterruptedException { 223 try (Table table = UTIL.getConnection().getTable(NAME)) { 224 for (int i = 0; i < 100; i++) { 225 byte[] row = Bytes.toBytes(i); 226 table.put( 227 new Put(row, true).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) 228 .setFamily(CF).setQualifier(QUALIFIER).setTimestamp(HConstants.LATEST_TIMESTAMP) 229 .setType(Type.Put).setValue(Bytes.toBytes(i)).build())); 230 } 231 } 232 // here we may have overlap when calling the CP hooks so we do not assert on TRACKER 233 Tracker tracker1 = new Tracker(); 234 ARRIVE = new CountDownLatch(1); 235 BLOCK = new CountDownLatch(1); 236 region.requestFlush(tracker1); 237 ARRIVE.await(); 238 239 Tracker tracker2 = new Tracker(); 240 region.requestFlush(tracker2); 241 tracker2.await(); 242 assertNotNull(tracker2.reason); 243 assertFalse(tracker2.beforeExecutionCalled); 244 assertFalse(tracker2.afterExecutionCalled); 245 246 BLOCK.countDown(); 247 tracker1.await(); 248 assertNull(tracker1.reason); 249 assertTrue(tracker1.beforeExecutionCalled); 250 assertTrue(tracker1.afterExecutionCalled); 251 } 252}