001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.log.HBaseMarkers; 030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 034import org.apache.hadoop.hbase.testclassification.LargeTests; 035import org.apache.hadoop.hbase.testclassification.MasterTests; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.hadoop.hdfs.MiniDFSCluster; 038import org.apache.hadoop.hdfs.server.datanode.DataNode; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046@Category({MasterTests.class, LargeTests.class}) 047public class TestWALProcedureStoreOnHDFS { 048 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestWALProcedureStoreOnHDFS.class); 052 053 private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class); 054 055 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 056 057 private WALProcedureStore store; 058 059 private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() { 060 @Override 061 public void postSync() {} 062 063 @Override 064 public void abortProcess() { 065 LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store"); 066 store.stop(true); 067 } 068 }; 069 070 @Before 071 public void initConfig() { 072 Configuration conf = UTIL.getConfiguration(); 073 074 conf.setInt("dfs.replication", 3); 075 conf.setInt("dfs.namenode.replication.min", 3); 076 077 // increase the value for slow test-env 078 conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000); 079 conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10); 080 conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10); 081 } 082 083 // No @Before because some tests need to do additional config first 084 private void setupDFS() throws Exception { 085 MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3); 086 087 Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs"); 088 store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir); 089 store.registerListener(stopProcedureListener); 090 store.start(8); 091 store.recoverLease(); 092 } 093 094 // No @After 095 @SuppressWarnings("JUnit4TearDownNotRun") 096 public void tearDown() throws Exception { 097 store.stop(false); 098 UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true); 099 100 try { 101 UTIL.shutdownMiniCluster(); 102 } catch (Exception e) { 103 LOG.warn("failure shutting down cluster", e); 104 } 105 } 106 107 @Test(expected=RuntimeException.class) 108 public void testWalAbortOnLowReplication() throws Exception { 109 setupDFS(); 110 111 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 112 113 LOG.info("Stop DataNode"); 114 UTIL.getDFSCluster().stopDataNode(0); 115 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 116 117 store.insert(new TestProcedure(1, -1), null); 118 for (long i = 2; store.isRunning(); ++i) { 119 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 120 store.insert(new TestProcedure(i, -1), null); 121 Thread.sleep(100); 122 } 123 assertFalse(store.isRunning()); 124 } 125 126 @Test 127 public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception { 128 setupDFS(); 129 130 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size()); 131 store.registerListener(new ProcedureStore.ProcedureStoreListener() { 132 @Override 133 public void postSync() { Threads.sleepWithoutInterrupt(2000); } 134 135 @Override 136 public void abortProcess() {} 137 }); 138 139 final AtomicInteger reCount = new AtomicInteger(0); 140 Thread[] thread = new Thread[store.getNumThreads() * 2 + 1]; 141 for (int i = 0; i < thread.length; ++i) { 142 final long procId = i + 1L; 143 thread[i] = new Thread(() -> { 144 try { 145 LOG.debug("[S] INSERT " + procId); 146 store.insert(new TestProcedure(procId, -1), null); 147 LOG.debug("[E] INSERT " + procId); 148 } catch (RuntimeException e) { 149 reCount.incrementAndGet(); 150 LOG.debug("[F] INSERT " + procId + ": " + e.getMessage()); 151 } 152 }); 153 thread[i].start(); 154 } 155 156 Thread.sleep(1000); 157 LOG.info("Stop DataNode"); 158 UTIL.getDFSCluster().stopDataNode(0); 159 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size()); 160 161 for (int i = 0; i < thread.length; ++i) { 162 thread[i].join(); 163 } 164 165 assertFalse(store.isRunning()); 166 assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() && 167 reCount.get() < thread.length); 168 } 169 170 @Test 171 public void testWalRollOnLowReplication() throws Exception { 172 UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1); 173 setupDFS(); 174 175 int dnCount = 0; 176 store.insert(new TestProcedure(1, -1), null); 177 UTIL.getDFSCluster().restartDataNode(dnCount); 178 for (long i = 2; i < 100; ++i) { 179 store.insert(new TestProcedure(i, -1), null); 180 waitForNumReplicas(3); 181 Thread.sleep(100); 182 if ((i % 30) == 0) { 183 LOG.info("Restart Data Node"); 184 UTIL.getDFSCluster().restartDataNode(++dnCount % 3); 185 } 186 } 187 assertTrue(store.isRunning()); 188 } 189 190 public void waitForNumReplicas(int numReplicas) throws Exception { 191 while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) { 192 Thread.sleep(100); 193 } 194 195 for (int i = 0; i < numReplicas; ++i) { 196 for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) { 197 while (!dn.isDatanodeFullyStarted()) { 198 Thread.sleep(100); 199 } 200 } 201 } 202 } 203}