001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MasterTests;
036import org.apache.hadoop.hbase.util.CommonFSUtils;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hdfs.MiniDFSCluster;
039import org.apache.hadoop.hdfs.server.datanode.DataNode;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047@Category({ MasterTests.class, LargeTests.class })
048public class TestWALProcedureStoreOnHDFS {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestWALProcedureStoreOnHDFS.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class);
055
056  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
057
058  private WALProcedureStore store;
059
060  private ProcedureStore.ProcedureStoreListener stopProcedureListener =
061    new ProcedureStore.ProcedureStoreListener() {
062      @Override
063      public void postSync() {
064      }
065
066      @Override
067      public void abortProcess() {
068        LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store");
069        store.stop(true);
070      }
071    };
072
073  @Before
074  public void initConfig() {
075    Configuration conf = UTIL.getConfiguration();
076
077    conf.setInt("dfs.replication", 3);
078    conf.setInt("dfs.namenode.replication.min", 3);
079
080    // increase the value for slow test-env
081    conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000);
082    conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10);
083    conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10);
084  }
085
086  // No @Before because some tests need to do additional config first
087  private void setupDFS() throws Exception {
088    Configuration conf = UTIL.getConfiguration();
089    MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
090    CommonFSUtils.setWALRootDir(conf, new Path(conf.get("fs.defaultFS"), "/tmp/wal"));
091
092    Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
093    store = ProcedureTestingUtility.createWalStore(conf, logDir);
094    store.registerListener(stopProcedureListener);
095    store.start(8);
096    store.recoverLease();
097  }
098
099  // No @After
100  @SuppressWarnings("JUnit4TearDownNotRun")
101  public void tearDown() throws Exception {
102    store.stop(false);
103    UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
104
105    try {
106      UTIL.shutdownMiniCluster();
107    } catch (Exception e) {
108      LOG.warn("failure shutting down cluster", e);
109    }
110  }
111
112  @Test(expected = RuntimeException.class)
113  public void testWalAbortOnLowReplication() throws Exception {
114    setupDFS();
115
116    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
117
118    LOG.info("Stop DataNode");
119    UTIL.getDFSCluster().stopDataNode(0);
120    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
121
122    store.insert(new TestProcedure(1, -1), null);
123    for (long i = 2; store.isRunning(); ++i) {
124      assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
125      store.insert(new TestProcedure(i, -1), null);
126      Thread.sleep(100);
127    }
128    assertFalse(store.isRunning());
129  }
130
131  @Test
132  public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
133    setupDFS();
134
135    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
136    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
137      @Override
138      public void postSync() {
139        Threads.sleepWithoutInterrupt(2000);
140      }
141
142      @Override
143      public void abortProcess() {
144      }
145    });
146
147    final AtomicInteger reCount = new AtomicInteger(0);
148    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
149    for (int i = 0; i < thread.length; ++i) {
150      final long procId = i + 1L;
151      thread[i] = new Thread(() -> {
152        try {
153          LOG.debug("[S] INSERT " + procId);
154          store.insert(new TestProcedure(procId, -1), null);
155          LOG.debug("[E] INSERT " + procId);
156        } catch (RuntimeException e) {
157          reCount.incrementAndGet();
158          LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
159        }
160      });
161      thread[i].start();
162    }
163
164    Thread.sleep(1000);
165    LOG.info("Stop DataNode");
166    UTIL.getDFSCluster().stopDataNode(0);
167    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
168
169    for (int i = 0; i < thread.length; ++i) {
170      thread[i].join();
171    }
172
173    assertFalse(store.isRunning());
174    assertTrue(reCount.toString(),
175      reCount.get() >= store.getNumThreads() && reCount.get() < thread.length);
176  }
177
178  @Test
179  public void testWalRollOnLowReplication() throws Exception {
180    UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
181    setupDFS();
182
183    int dnCount = 0;
184    store.insert(new TestProcedure(1, -1), null);
185    UTIL.getDFSCluster().restartDataNode(dnCount);
186    for (long i = 2; i < 100; ++i) {
187      store.insert(new TestProcedure(i, -1), null);
188      waitForNumReplicas(3);
189      Thread.sleep(100);
190      if ((i % 30) == 0) {
191        LOG.info("Restart Data Node");
192        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
193      }
194    }
195    assertTrue(store.isRunning());
196  }
197
198  public void waitForNumReplicas(int numReplicas) throws Exception {
199    while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
200      Thread.sleep(100);
201    }
202
203    for (int i = 0; i < numReplicas; ++i) {
204      for (DataNode dn : UTIL.getDFSCluster().getDataNodes()) {
205        while (!dn.isDatanodeFullyStarted()) {
206          Thread.sleep(100);
207        }
208      }
209    }
210  }
211}