001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertThrows; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.log.HBaseMarkers; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.MasterTests; 036import org.apache.hadoop.hbase.util.CommonFSUtils; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hdfs.MiniDFSCluster; 039import org.apache.hadoop.hdfs.server.datanode.DataNode; 040import org.junit.jupiter.api.BeforeEach; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Tag(MasterTests.TAG) 047@Tag(LargeTests.TAG) 048public class TestWALProcedureStoreOnHDFS { 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class); 051 052 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 053 054 private WALProcedureStore store; 055 056 private ProcedureStore.ProcedureStoreListener stopProcedureListener = 057 new ProcedureStore.ProcedureStoreListener() { 058 @Override 059 public void postSync() { 060 } 061 062 @Override 063 public void abortProcess() { 064 LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store"); 065 store.stop(true); 066 } 067 }; 068 069 @BeforeEach 070 public void initConfig() { 071 Configuration conf = UTIL.getConfiguration(); 072 073 conf.setInt("dfs.replication", 3); 074 conf.setInt("dfs.namenode.replication.min", 3); 075 076 // increase the value for slow test-env 077 conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000); 078 conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10); 079 conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10); 080 } 081 082 // No @BeforeEach because some tests need to do additional config first 083 private void setupDFS() throws Exception { 084 Configuration conf = UTIL.getConfiguration(); 085 MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); 086 CommonFSUtils.setWALRootDir(conf, new Path(conf.get("fs.defaultFS"), "/tmp/wal")); 087 088 Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); 089 store = ProcedureTestingUtility.createWalStore(conf, logDir); 090 store.registerListener(stopProcedureListener); 091 store.start(8); 092 store.recoverLease(); 093 } 094 095 // No @AfterEach 096 @SuppressWarnings("JUnit4TearDownNotRun") 097 public void tearDown() throws Exception { 098 store.stop(false); 099 UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true); 100 101 try { 102 UTIL.shutdownMiniCluster(); 103 } catch (Exception e) { 104 LOG.warn("failure shutting down cluster", e); 105 } 106 } 107 108 @Test 109 public void testWalAbortOnLowReplication() throws Exception { 110 setupDFS(); 111 112 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 113 114 LOG.info("Stop DataNode"); 115 UTIL.getDFSCluster().stopDataNode(0); 116 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 117 118 assertThrows(RuntimeException.class, () -> { 119 store.insert(new TestProcedure(1, -1), null); 120 for (long i = 2; store.isRunning(); ++i) { 121 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 122 store.insert(new TestProcedure(i, -1), null); 123 Thread.sleep(100); 124 } 125 assertFalse(store.isRunning()); 126 }); 127 } 128 129 @Test 130 public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { 131 setupDFS(); 132 133 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 134 store.registerListener(new ProcedureStore.ProcedureStoreListener() { 135 @Override 136 public void postSync() { 137 Threads.sleepWithoutInterrupt(2000); 138 } 139 140 @Override 141 public void abortProcess() { 142 } 143 }); 144 145 final AtomicInteger reCount = new AtomicInteger(0); 146 Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; 147 for (int i = 0; i < thread.length; ++i) { 148 final long procId = i + 1L; 149 thread[i] = new Thread(() -> { 150 try { 151 LOG.debug("[S] INSERT " + procId); 152 store.insert(new TestProcedure(procId, -1), null); 153 LOG.debug("[E] INSERT " + procId); 154 } catch (RuntimeException e) { 155 reCount.incrementAndGet(); 156 LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); 157 } 158 }); 159 thread[i].start(); 160 } 161 162 Thread.sleep(1000); 163 LOG.info("Stop DataNode"); 164 UTIL.getDFSCluster().stopDataNode(0); 165 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 166 167 for (int i = 0; i < thread.length; ++i) { 168 thread[i].join(); 169 } 170 171 assertFalse(store.isRunning()); 172 assertTrue(reCount.get() >= store.getNumThreads() && reCount.get() < thread.length, 173 reCount.toString()); 174 } 175 176 @Test 177 public void testWalRollOnLowReplication() throws Exception { 178 UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); 179 setupDFS(); 180 181 int dnCount = 0; 182 store.insert(new TestProcedure(1, -1), null); 183 UTIL.getDFSCluster().restartDataNode(dnCount); 184 for (long i = 2; i < 100; ++i) { 185 store.insert(new TestProcedure(i, -1), null); 186 waitForNumReplicas(3); 187 Thread.sleep(100); 188 if ((i % 30) == 0) { 189 LOG.info("Restart Data Node"); 190 UTIL.getDFSCluster().restartDataNode(++dnCount % 3); 191 } 192 } 193 assertTrue(store.isRunning()); 194 } 195 196 public void waitForNumReplicas(int numReplicas) throws Exception { 197 while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) { 198 Thread.sleep(100); 199 } 200 201 for (int i = 0; i < numReplicas; ++i) { 202 for (DataNode dn : UTIL.getDFSCluster().getDataNodes()) { 203 while (!dn.isDatanodeFullyStarted()) { 204 Thread.sleep(100); 205 } 206 } 207 } 208 } 209}