001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.log.HBaseMarkers; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.MasterTests; 036import org.apache.hadoop.hbase.util.CommonFSUtils; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hdfs.MiniDFSCluster; 039import org.apache.hadoop.hdfs.server.datanode.DataNode; 040import org.junit.Before; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047@Category({MasterTests.class, LargeTests.class}) 048public class TestWALProcedureStoreOnHDFS { 049 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestWALProcedureStoreOnHDFS.class); 053 054 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class); 055 056 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 057 058 private WALProcedureStore store; 059 060 private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() { 061 @Override 062 public void postSync() {} 063 064 @Override 065 public void abortProcess() { 066 LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store"); 067 store.stop(true); 068 } 069 }; 070 071 @Before 072 public void initConfig() { 073 Configuration conf = UTIL.getConfiguration(); 074 075 conf.setInt("dfs.replication", 3); 076 conf.setInt("dfs.namenode.replication.min", 3); 077 078 // increase the value for slow test-env 079 conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000); 080 conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10); 081 conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10); 082 } 083 084 // No @Before because some tests need to do additional config first 085 private void setupDFS() throws Exception { 086 Configuration conf = UTIL.getConfiguration(); 087 MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); 088 CommonFSUtils.setWALRootDir(conf, new Path(conf.get("fs.defaultFS"), "/tmp/wal")); 089 090 Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); 091 store = ProcedureTestingUtility.createWalStore(conf, logDir); 092 store.registerListener(stopProcedureListener); 093 store.start(8); 094 store.recoverLease(); 095 } 096 097 // No @After 098 @SuppressWarnings("JUnit4TearDownNotRun") 099 public void tearDown() throws Exception { 100 store.stop(false); 101 UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true); 102 103 try { 104 UTIL.shutdownMiniCluster(); 105 } catch (Exception e) { 106 LOG.warn("failure shutting down cluster", e); 107 } 108 } 109 110 @Test(expected=RuntimeException.class) 111 public void testWalAbortOnLowReplication() throws Exception { 112 setupDFS(); 113 114 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 115 116 LOG.info("Stop DataNode"); 117 UTIL.getDFSCluster().stopDataNode(0); 118 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 119 120 store.insert(new TestProcedure(1, -1), null); 121 for (long i = 2; store.isRunning(); ++i) { 122 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 123 store.insert(new TestProcedure(i, -1), null); 124 Thread.sleep(100); 125 } 126 assertFalse(store.isRunning()); 127 } 128 129 @Test 130 public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { 131 setupDFS(); 132 133 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 134 store.registerListener(new ProcedureStore.ProcedureStoreListener() { 135 @Override 136 public void postSync() { Threads.sleepWithoutInterrupt(2000); } 137 138 @Override 139 public void abortProcess() {} 140 }); 141 142 final AtomicInteger reCount = new AtomicInteger(0); 143 Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; 144 for (int i = 0; i < thread.length; ++i) { 145 final long procId = i + 1L; 146 thread[i] = new Thread(() -> { 147 try { 148 LOG.debug("[S] INSERT " + procId); 149 store.insert(new TestProcedure(procId, -1), null); 150 LOG.debug("[E] INSERT " + procId); 151 } catch (RuntimeException e) { 152 reCount.incrementAndGet(); 153 LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); 154 } 155 }); 156 thread[i].start(); 157 } 158 159 Thread.sleep(1000); 160 LOG.info("Stop DataNode"); 161 UTIL.getDFSCluster().stopDataNode(0); 162 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 163 164 for (int i = 0; i < thread.length; ++i) { 165 thread[i].join(); 166 } 167 168 assertFalse(store.isRunning()); 169 assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && 170 reCount.get() < thread.length); 171 } 172 173 @Test 174 public void testWalRollOnLowReplication() throws Exception { 175 UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); 176 setupDFS(); 177 178 int dnCount = 0; 179 store.insert(new TestProcedure(1, -1), null); 180 UTIL.getDFSCluster().restartDataNode(dnCount); 181 for (long i = 2; i < 100; ++i) { 182 store.insert(new TestProcedure(i, -1), null); 183 waitForNumReplicas(3); 184 Thread.sleep(100); 185 if ((i % 30) == 0) { 186 LOG.info("Restart Data Node"); 187 UTIL.getDFSCluster().restartDataNode(++dnCount % 3); 188 } 189 } 190 assertTrue(store.isRunning()); 191 } 192 193 public void waitForNumReplicas(int numReplicas) throws Exception { 194 while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) { 195 Thread.sleep(100); 196 } 197 198 for (int i = 0; i < numReplicas; ++i) { 199 for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) { 200 while (!dn.isDatanodeFullyStarted()) { 201 Thread.sleep(100); 202 } 203 } 204 } 205 } 206}