001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.atomic.AtomicInteger;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
031import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
034import org.apache.hadoop.hbase.testclassification.LargeTests;
035import org.apache.hadoop.hbase.testclassification.MasterTests;
036import org.apache.hadoop.hbase.util.Threads;
037import org.apache.hadoop.hdfs.MiniDFSCluster;
038import org.apache.hadoop.hdfs.server.datanode.DataNode;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({MasterTests.class, LargeTests.class})
047public class TestWALProcedureStoreOnHDFS {
048
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051      HBaseClassTestRule.forClass(TestWALProcedureStoreOnHDFS.class);
052
053  private static final Logger LOG = LoggerFactory.getLogger(TestWALProcedureStoreOnHDFS.class);
054
055  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
056
057  private WALProcedureStore store;
058
059  private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() {
060    @Override
061    public void postSync() {}
062
063    @Override
064    public void abortProcess() {
065      LOG.error(HBaseMarkers.FATAL, "Abort the Procedure Store");
066      store.stop(true);
067    }
068  };
069
070  @Before
071  public void initConfig() {
072    Configuration conf = UTIL.getConfiguration();
073
074    conf.setInt("dfs.replication", 3);
075    conf.setInt("dfs.namenode.replication.min", 3);
076
077    // increase the value for slow test-env
078    conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 1000);
079    conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 10);
080    conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 10);
081  }
082
083  // No @Before because some tests need to do additional config first
084  private void setupDFS() throws Exception {
085    MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
086
087    Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
088    store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
089    store.registerListener(stopProcedureListener);
090    store.start(8);
091    store.recoverLease();
092  }
093
094  // No @After
095  @SuppressWarnings("JUnit4TearDownNotRun")
096  public void tearDown() throws Exception {
097    store.stop(false);
098    UTIL.getDFSCluster().getFileSystem().delete(store.getWALDir(), true);
099
100    try {
101      UTIL.shutdownMiniCluster();
102    } catch (Exception e) {
103      LOG.warn("failure shutting down cluster", e);
104    }
105  }
106
107  @Test(expected=RuntimeException.class)
108  public void testWalAbortOnLowReplication() throws Exception {
109    setupDFS();
110
111    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
112
113    LOG.info("Stop DataNode");
114    UTIL.getDFSCluster().stopDataNode(0);
115    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
116
117    store.insert(new TestProcedure(1, -1), null);
118    for (long i = 2; store.isRunning(); ++i) {
119      assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
120      store.insert(new TestProcedure(i, -1), null);
121      Thread.sleep(100);
122    }
123    assertFalse(store.isRunning());
124  }
125
126  @Test
127  public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
128    setupDFS();
129
130    assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
131    store.registerListener(new ProcedureStore.ProcedureStoreListener() {
132      @Override
133      public void postSync() { Threads.sleepWithoutInterrupt(2000); }
134
135      @Override
136      public void abortProcess() {}
137    });
138
139    final AtomicInteger reCount = new AtomicInteger(0);
140    Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
141    for (int i = 0; i < thread.length; ++i) {
142      final long procId = i + 1L;
143      thread[i] = new Thread(() -> {
144        try {
145          LOG.debug("[S] INSERT " + procId);
146          store.insert(new TestProcedure(procId, -1), null);
147          LOG.debug("[E] INSERT " + procId);
148        } catch (RuntimeException e) {
149          reCount.incrementAndGet();
150          LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
151        }
152      });
153      thread[i].start();
154    }
155
156    Thread.sleep(1000);
157    LOG.info("Stop DataNode");
158    UTIL.getDFSCluster().stopDataNode(0);
159    assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
160
161    for (int i = 0; i < thread.length; ++i) {
162      thread[i].join();
163    }
164
165    assertFalse(store.isRunning());
166    assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
167                                   reCount.get() < thread.length);
168  }
169
170  @Test
171  public void testWalRollOnLowReplication() throws Exception {
172    UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
173    setupDFS();
174
175    int dnCount = 0;
176    store.insert(new TestProcedure(1, -1), null);
177    UTIL.getDFSCluster().restartDataNode(dnCount);
178    for (long i = 2; i < 100; ++i) {
179      store.insert(new TestProcedure(i, -1), null);
180      waitForNumReplicas(3);
181      Thread.sleep(100);
182      if ((i % 30) == 0) {
183        LOG.info("Restart Data Node");
184        UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
185      }
186    }
187    assertTrue(store.isRunning());
188  }
189
190  public void waitForNumReplicas(int numReplicas) throws Exception {
191    while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
192      Thread.sleep(100);
193    }
194
195    for (int i = 0; i < numReplicas; ++i) {
196      for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) {
197        while (!dn.isDatanodeFullyStarted()) {
198          Thread.sleep(100);
199        }
200      }
201    }
202  }
203}